Sync BaseSubscriber

master
Stephane Maldini 9 years ago
parent 24d9e99de1
commit 999dfe3925
  1. 11
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java

@ -320,8 +320,8 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
}
}
private static class ResponseBodySubscriber extends BaseSubscriber<DataBuffer>
implements ChannelListener<StreamSinkChannel> {
private static class ResponseBodySubscriber
implements ChannelListener<StreamSinkChannel>, BaseSubscriber<DataBuffer>{
private final HttpServerExchange exchange;
@ -342,14 +342,14 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void onSubscribe(Subscription subscription) {
super.onSubscribe(subscription);
BaseSubscriber.super.onSubscribe(subscription);
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(DataBuffer dataBuffer) {
super.onNext(dataBuffer);
BaseSubscriber.super.onNext(dataBuffer);
ByteBuffer buffer = dataBuffer.asByteBuffer();
@ -439,7 +439,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void onError(Throwable ex) {
super.onError(ex);
BaseSubscriber.super.onError(ex);
logger.error("ResponseBodySubscriber error", ex);
if (!exchange.isResponseStarted() && exchange.getStatusCode() < 500) {
exchange.setStatusCode(500);
@ -448,7 +448,6 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void onComplete() {
super.onComplete();
if (this.responseChannel != null) {
this.closing.set(true);
closeIfDone();

Loading…
Cancel
Save