From ec2218c9670ab2e2b88a86dba21bef7f1bf59f9c Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 14 Sep 2017 01:13:14 +0300 Subject: [PATCH] AbstractListenerWriteFlushProcessor: Ensure the last flush will be performed When writing Publisher>, a flush operation is performed onComplete for every Publisher. If the flush operation is not able to be performed immediately it will be retried before starting to process data provided by the next Publisher. For the last Publisher the implementation needs to ensure that the flush will be performed only then whole operation will complete. Issue: SPR-15949 --- .../AbstractListenerWriteFlushProcessor.java | 56 +++++++++++++++++- .../reactive/ServletServerHttpResponse.java | 57 ++++++++++++------- .../reactive/UndertowServerHttpResponse.java | 31 +++++++--- 3 files changed, 116 insertions(+), 28 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 262751b773..6bde76013d 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -124,6 +124,29 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo */ protected abstract void flush() throws IOException; + /** + * Whether writing is possible. + */ + protected abstract boolean isWritePossible(); + + /** + * Whether flushing is pending. + */ + protected abstract boolean isFlushPending(); + + /** + * Listeners can call this to notify when flushing is possible. + */ + protected final void onFlushPossible() { + this.state.get().onFlushPossible(this); + } + + private void flushIfPossible() { + if (isWritePossible()) { + onFlushPossible(); + } + } + private boolean changeState(State oldState, State newState) { return this.state.compareAndSet(oldState, newState); @@ -181,7 +204,12 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo return; } if (processor.subscriberCompleted) { - if (processor.changeState(this, COMPLETED)) { + if (processor.isFlushPending()) { + // Ensure the final flush + processor.changeState(this, FLUSHING); + processor.flushIfPossible(); + } + else if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } } @@ -198,6 +226,28 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } }, + FLUSHING { + public void onFlushPossible(AbstractListenerWriteFlushProcessor processor) { + try { + processor.flush(); + } + catch (IOException ex) { + processor.flushingFailed(ex); + return; + } + if (processor.changeState(this, COMPLETED)) { + processor.resultPublisher.publishComplete(); + } + } + public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { + // ignore + } + @Override + public void onComplete(AbstractListenerWriteFlushProcessor processor) { + // ignore + } + }, + COMPLETED { @Override public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { @@ -235,6 +285,10 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo // ignore } + public void onFlushPossible(AbstractListenerWriteFlushProcessor processor) { + // ignore + } + private static class WriteSubscriber implements Subscriber { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index a4a02ad67a..05af6af643 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -18,7 +18,6 @@ package org.springframework.http.server.reactive; import java.io.IOException; import java.io.InputStream; -import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.util.List; import java.util.Map; @@ -52,6 +51,8 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons private final HttpServletResponse response; + private final ServletOutputStream outputStream; + private final int bufferSize; @Nullable @@ -73,6 +74,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons Assert.isTrue(bufferSize > 0, "Buffer size must be greater than 0"); this.response = response; + this.outputStream = response.getOutputStream(); this.bufferSize = bufferSize; asyncContext.addListener(new ResponseAsyncListener()); @@ -147,7 +149,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons * @return the number of bytes written */ protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException { - ServletOutputStream outputStream = response.getOutputStream(); + ServletOutputStream outputStream = this.outputStream; InputStream input = dataBuffer.asInputStream(); int bytesWritten = 0; byte[] buffer = new byte[this.bufferSize]; @@ -160,7 +162,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } private void flush() throws IOException { - ServletOutputStream outputStream = this.response.getOutputStream(); + ServletOutputStream outputStream = this.outputStream; if (outputStream.isReady()) { try { outputStream.flush(); @@ -176,6 +178,10 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } + private boolean isWritePossible() { + return this.outputStream.isReady(); + } + private final class ResponseAsyncListener implements AsyncListener { @@ -233,6 +239,12 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons if (processor != null) { processor.onWritePossible(); } + else { + ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor; + if (flushProcessor != null) { + flushProcessor.onFlushPossible(); + } + } } @Override @@ -242,6 +254,13 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons processor.cancel(); processor.onError(ex); } + else { + ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor; + if (flushProcessor != null) { + flushProcessor.cancel(); + flushProcessor.onError(ex); + } + } } } @@ -250,15 +269,9 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons @Override protected Processor createWriteProcessor() { - try { - ServletOutputStream outputStream = response.getOutputStream(); - ResponseBodyProcessor processor = new ResponseBodyProcessor(outputStream); - bodyProcessor = processor; - return processor; - } - catch (IOException ex) { - throw new UncheckedIOException(ex); - } + ResponseBodyProcessor processor = new ResponseBodyProcessor(); + bodyProcessor = processor; + return processor; } @Override @@ -268,20 +281,24 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } ServletServerHttpResponse.this.flush(); } - } + @Override + protected boolean isWritePossible() { + return ServletServerHttpResponse.this.isWritePossible(); + } - private class ResponseBodyProcessor extends AbstractListenerWriteProcessor { + @Override + protected boolean isFlushPending() { + return flushOnNext; + } + } - private final ServletOutputStream outputStream; - public ResponseBodyProcessor(ServletOutputStream outputStream) { - this.outputStream = outputStream; - } + private class ResponseBodyProcessor extends AbstractListenerWriteProcessor { @Override protected boolean isWritePossible() { - return this.outputStream.isReady(); + return ServletServerHttpResponse.this.isWritePossible(); } @Override @@ -306,7 +323,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } flush(); } - boolean ready = this.outputStream.isReady(); + boolean ready = ServletServerHttpResponse.this.isWritePossible(); if (this.logger.isTraceEnabled()) { this.logger.trace("write: " + dataBuffer + " ready: " + ready); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index ebc9fb2339..becbb59715 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -147,8 +147,20 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon return new ResponseBodyProcessor(this.responseChannel); } + private boolean isWritePossible() { + if (this.responseChannel == null) { + this.responseChannel = this.exchange.getResponseChannel(); + } + if (this.responseChannel.isWriteResumed()) { + return true; + } else { + this.responseChannel.resumeWrites(); + return false; + } + } + - private static class ResponseBodyProcessor extends AbstractListenerWriteProcessor { + private class ResponseBodyProcessor extends AbstractListenerWriteProcessor { private final StreamSinkChannel channel; @@ -164,12 +176,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon @Override protected boolean isWritePossible() { - if (this.channel.isWriteResumed()) { - return true; - } else { - this.channel.resumeWrites(); - return false; - } + return UndertowServerHttpResponse.this.isWritePossible(); } @Override @@ -264,6 +271,16 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon cancel(); onError(t); } + + @Override + protected boolean isWritePossible() { + return UndertowServerHttpResponse.this.isWritePossible(); + } + + @Override + protected boolean isFlushPending() { + return false; + } } }