diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java index 2427abdca8..d3a45693ec 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java @@ -47,7 +47,9 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest Mono result = this.webClient .perform(get("http://localhost:" + port)) .extract(bodyStream(String.class)) - .take(2) + .takeUntil(s -> { + return s.endsWith("data1"); + }) .reduce((s1, s2) -> s1 + s2); TestSubscriber diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index 30cbbecc3d..f25d567c7a 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -86,17 +86,18 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @Test public void sseAsString() throws Exception { - Mono result = this.webClient + Flux result = this.webClient .perform(get("http://localhost:" + port + "/sse/string") .accept(new MediaType("text", "event-stream"))) .extract(bodyStream(String.class)) - .take(Duration.ofMillis(1000)) - .reduce((s1, s2) -> s1 + s2); + .filter(s -> !s.equals("\n")) + .map(s -> (s.replace("\n", ""))) + .take(2); TestSubscriber .subscribe(result) .await() - .assertValues("data:foo 0\n\ndata:foo 1\n\n"); + .assertValues("data:foo 0", "data:foo 1"); } @Test @@ -105,28 +106,36 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { .perform(get("http://localhost:" + port + "/sse/person") .accept(new MediaType("text", "event-stream"))) .extract(bodyStream(String.class)) - .take(Duration.ofMillis(1000)) + .filter(s -> !s.equals("\n")) + .map(s -> (s.replace("\n", ""))) + .takeUntil(s -> { + return s.endsWith("foo 1\"}"); + }) .reduce((s1, s2) -> s1 + s2); TestSubscriber .subscribe(result) .await() - .assertValues("data:{\"name\":\"foo 0\"}\n\ndata:{\"name\":\"foo 1\"}\n\n"); + .assertValues("data:{\"name\":\"foo 0\"}data:{\"name\":\"foo 1\"}"); } @Test public void sseAsEvent() throws Exception { - Mono result = this.webClient + Flux result = this.webClient .perform(get("http://localhost:" + port + "/sse/event") .accept(new MediaType("text", "event-stream"))) .extract(bodyStream(String.class)) - .take(Duration.ofMillis(1000)) - .reduce((s1, s2) -> s1 + s2); + .filter(s -> !s.equals("\n")) + .map(s -> (s.replace("\n", ""))) + .take(2); TestSubscriber .subscribe(result) .await() - .assertValues("id:0\n:bar\ndata:foo\n\nid:1\n:bar\ndata:foo\n\n"); + .assertValues( + "id:0:bardata:foo", + "id:1:bardata:foo" + ); } @RestController