diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index d9a2535fa3..40e5d9581b 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -207,7 +207,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { DataBufferUtils.release(dataBuffer); String value = charBuffer.toString(); if (logger.isDebugEnabled()) { - logger.debug("Decoded '" + "'"); + logger.debug("Decoded '" + value + "'"); } return value; } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java index d3aedd70a6..600ae763a0 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java @@ -88,7 +88,7 @@ public abstract class AbstractHttpHandlerIntegrationTests { * set the number of buffered to an arbitrary number greater than N. * */ - public static Flux interval(Duration period, int count) { + public static Flux testInterval(Duration period, int count) { return Flux.interval(period).take(count).onBackpressureBuffer(count); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java index 83a5e73547..e99d0f183e 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java @@ -21,13 +21,11 @@ import java.time.Duration; import org.junit.Before; import org.junit.Test; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -37,7 +35,9 @@ import org.springframework.web.reactive.function.client.WebClient; import static org.junit.Assert.*; /** + * Integration tests for server response flushing behavior. * @author Sebastien Deleuze + * @author Rossen Stoyanchev * @since 5.0 */ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTests { @@ -72,12 +72,11 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest Mono result = this.webClient.get() .uri("/write-and-complete") .retrieve() - .bodyToFlux(String.class) - .reduce((s1, s2) -> s1 + s2); + .bodyToMono(String.class); try { StepVerifier.create(result) - .consumeNextWith(value -> assertTrue(value.length() >= 20000 * "0123456789".length())) + .consumeNextWith(value -> assertEquals(64 * 1024, value.length())) .expectComplete() .verify(Duration.ofSeconds(10L)); } @@ -119,35 +118,46 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest @Override public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { String path = request.getURI().getPath(); - if (path.endsWith("write-and-flush")) { - Flux> responseBody = interval(Duration.ofMillis(50), 2) - .map(l -> toDataBuffer("data" + l + "\n", response.bufferFactory())) - .map(Flux::just); - return response.writeAndFlushWith(responseBody.concatWith(Flux.never())); - } - else if (path.endsWith("write-and-complete")) { - Flux responseBody = Flux - .just("0123456789") - .repeat(20000) - .map(value -> toDataBuffer(value + "\n", response.bufferFactory())); - return response.writeWith(responseBody); - } - else if (path.endsWith("write-and-never-complete")) { - Flux responseBody = Flux - .just("0123456789") - .repeat(20000) - .map(value -> toDataBuffer(value + "\n", response.bufferFactory())); - return response.writeWith(responseBody.mergeWith(Flux.never())); + switch (path) { + case "/write-and-flush": + return response.writeAndFlushWith( + testInterval(Duration.ofMillis(50), 2) + .map(longValue -> wrap("data" + longValue + "\n", response)) + .map(Flux::just) + .mergeWith(Flux.never())); + + case "/write-and-complete": + return response.writeWith( + chunks1K().take(64).map(s -> wrap(s, response))); + + case "/write-and-never-complete": + // Reactor requires at least 50 to flush, Tomcat/Undertow 8, Jetty 1 + return response.writeWith( + chunks1K().take(64).map(s -> wrap(s, response)).mergeWith(Flux.never())); + + default: + return response.writeWith(Flux.empty()); } - return response.writeWith(Flux.empty()); } - private DataBuffer toDataBuffer(String value, DataBufferFactory factory) { - byte[] data = (value).getBytes(StandardCharsets.UTF_8); - DataBuffer buffer = factory.allocateBuffer(data.length); - buffer.write(data); - return buffer; + private Flux chunks1K() { + return Flux.generate(sink -> { + StringBuilder sb = new StringBuilder(); + do { + for (char c : "0123456789".toCharArray()) { + sb.append(c); + if (sb.length() + 1 == 1024) { + sink.next(sb.append("\n").toString()); + return; + } + } + } while (true); + }); } - } + private DataBuffer wrap(String value, ServerHttpResponse response) { + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + return response.bufferFactory().wrap(bytes); + } + } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java index 24653246ec..3300875611 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java @@ -117,7 +117,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn private static class SseHandler { - private static final Flux INTERVAL = interval(Duration.ofMillis(100), 2); + private static final Flux INTERVAL = testInterval(Duration.ofMillis(100), 2); Mono string(ServerRequest request) { return ServerResponse.ok() diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java index 44cfd898b0..0f5b75c362 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java @@ -102,7 +102,7 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra @GetMapping(value = "/stream", produces = { APPLICATION_STREAM_JSON_VALUE, "application/stream+x-jackson-smile" }) Flux person() { - return interval(Duration.ofMillis(100), 50).map(l -> new Person("foo " + l)); + return testInterval(Duration.ofMillis(100), 50).map(l -> new Person("foo " + l)); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java index b126b32d11..b325ee0e08 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java @@ -86,7 +86,7 @@ public class RequestMappingIntegrationTests extends AbstractRequestMappingIntegr @GetMapping("/stream") public Publisher stream() { - return interval(Duration.ofMillis(50), 5); + return testInterval(Duration.ofMillis(50), 5); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index 932566a7f4..c43534f257 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -177,7 +177,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @RequestMapping("/sse") static class SseController { - private static final Flux INTERVAL = interval(Duration.ofMillis(100), 50); + private static final Flux INTERVAL = testInterval(Duration.ofMillis(100), 50); private MonoProcessor cancellation = MonoProcessor.create();