diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java index c1a16ac1b3..3d77bd7f48 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java @@ -21,26 +21,25 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.function.IntPredicate; -import org.springframework.util.Assert; - /** - * {@link DataBuffer} wrapper that indicates the file or the socket writing this buffer - * should be flushed. + * Empty {@link DataBuffer} that indicates to the file or the socket writing it that + * previously buffered data should be flushed. * * @author Sebastien Deleuze + * @see FlushingDataBuffer#INSTANCE */ public class FlushingDataBuffer implements DataBuffer { + /** Singleton instance of this class */ + public static final FlushingDataBuffer INSTANCE = new FlushingDataBuffer(); + private final DataBuffer buffer; - public FlushingDataBuffer() { + + private FlushingDataBuffer() { this.buffer = new DefaultDataBufferFactory().allocateBuffer(0); } - public FlushingDataBuffer(DataBuffer buffer) { - Assert.notNull(buffer); - this.buffer = buffer; - } @Override public DataBufferFactory factory() { diff --git a/spring-web-reactive/src/main/java/org/springframework/http/codec/SseEventEncoder.java b/spring-web-reactive/src/main/java/org/springframework/http/codec/SseEventEncoder.java index ffeae04ab8..505ee42cf0 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/codec/SseEventEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/codec/SseEventEncoder.java @@ -115,7 +115,8 @@ public class SseEventEncoder extends AbstractEncoder { return Flux.concat( encodeString(sb.toString(), bufferFactory), dataBuffer, - encodeString("\n", bufferFactory).map(b -> new FlushingDataBuffer(b)) + encodeString("\n", bufferFactory), + Mono.just(FlushingDataBuffer.INSTANCE) ); }); diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/SseEventEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/SseEventEncoderTests.java index 3d6807829c..6f64ecbbc5 100644 --- a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/SseEventEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/SseEventEncoderTests.java @@ -18,6 +18,7 @@ package org.springframework.core.codec.support; import java.util.Arrays; +import static org.junit.Assert.*; import org.junit.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -26,14 +27,11 @@ import reactor.core.test.TestSubscriber; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.FlushingDataBuffer; import org.springframework.http.codec.SseEventEncoder; import org.springframework.util.MimeType; import org.springframework.web.reactive.sse.SseEvent; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - - /** * @author Sebastien Deleuze */ @@ -77,7 +75,8 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { "event:foo\n" + "retry:123\n" + ":bla\n:bla bla\n:bla bla bla\n"), - stringConsumer("\n") + stringConsumer("\n"), + b -> assertEquals(FlushingDataBuffer.class, b.getClass()) ); } @@ -93,8 +92,10 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { .assertValuesWith( stringConsumer("data:foo\n"), stringConsumer("\n"), + b -> assertEquals(FlushingDataBuffer.class, b.getClass()), stringConsumer("data:bar\n"), - stringConsumer("\n") + stringConsumer("\n"), + b -> assertEquals(FlushingDataBuffer.class, b.getClass()) ); } @@ -110,12 +111,13 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { .assertValuesWith( stringConsumer("data:foo\ndata:bar\n"), stringConsumer("\n"), + b -> assertEquals(FlushingDataBuffer.class, b.getClass()), stringConsumer("data:foo\ndata:baz\n"), - stringConsumer("\n") + stringConsumer("\n"), + b -> assertEquals(FlushingDataBuffer.class, b.getClass()) ); } - @Test public void encodePojo() { SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); @@ -130,10 +132,12 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"), stringConsumer("\n"), stringConsumer("\n"), + b -> assertEquals(FlushingDataBuffer.class, b.getClass()), stringConsumer("data:"), stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"), stringConsumer("\n"), - stringConsumer("\n") + stringConsumer("\n"), + b -> assertEquals(FlushingDataBuffer.class, b.getClass()) ); } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java index 8dfc741a5e..2427abdca8 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java @@ -70,14 +70,15 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { Flux responseBody = Flux .interval(50) - .take(2) - .concatWith(Flux.never()) .map(l -> { byte[] data = ("data" + l).getBytes(); DataBuffer buffer = response.bufferFactory().allocateBuffer(data.length); buffer.write(data); - return new FlushingDataBuffer(buffer); - }); + return buffer; + }) + .take(2) + .concatWith(Mono.just(FlushingDataBuffer.INSTANCE)) + .concatWith(Flux.never()); return response.writeWith(responseBody); } }