|
|
|
@ -86,17 +86,18 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void sseAsString() throws Exception { |
|
|
|
|
Mono<String> result = this.webClient |
|
|
|
|
Flux<String> result = this.webClient |
|
|
|
|
.perform(get("http://localhost:" + port + "/sse/string") |
|
|
|
|
.accept(new MediaType("text", "event-stream"))) |
|
|
|
|
.extract(bodyStream(String.class)) |
|
|
|
|
.take(Duration.ofMillis(1000)) |
|
|
|
|
.reduce((s1, s2) -> s1 + s2); |
|
|
|
|
.filter(s -> !s.equals("\n")) |
|
|
|
|
.map(s -> (s.replace("\n", ""))) |
|
|
|
|
.take(2); |
|
|
|
|
|
|
|
|
|
TestSubscriber |
|
|
|
|
.subscribe(result) |
|
|
|
|
.await() |
|
|
|
|
.assertValues("data:foo 0\n\ndata:foo 1\n\n"); |
|
|
|
|
.assertValues("data:foo 0", "data:foo 1"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@ -105,28 +106,36 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { |
|
|
|
|
.perform(get("http://localhost:" + port + "/sse/person") |
|
|
|
|
.accept(new MediaType("text", "event-stream"))) |
|
|
|
|
.extract(bodyStream(String.class)) |
|
|
|
|
.take(Duration.ofMillis(1000)) |
|
|
|
|
.filter(s -> !s.equals("\n")) |
|
|
|
|
.map(s -> (s.replace("\n", ""))) |
|
|
|
|
.takeUntil(s -> { |
|
|
|
|
return s.endsWith("foo 1\"}"); |
|
|
|
|
}) |
|
|
|
|
.reduce((s1, s2) -> s1 + s2); |
|
|
|
|
|
|
|
|
|
TestSubscriber |
|
|
|
|
.subscribe(result) |
|
|
|
|
.await() |
|
|
|
|
.assertValues("data:{\"name\":\"foo 0\"}\n\ndata:{\"name\":\"foo 1\"}\n\n"); |
|
|
|
|
.assertValues("data:{\"name\":\"foo 0\"}data:{\"name\":\"foo 1\"}"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void sseAsEvent() throws Exception { |
|
|
|
|
Mono<String> result = this.webClient |
|
|
|
|
Flux<String> result = this.webClient |
|
|
|
|
.perform(get("http://localhost:" + port + "/sse/event") |
|
|
|
|
.accept(new MediaType("text", "event-stream"))) |
|
|
|
|
.extract(bodyStream(String.class)) |
|
|
|
|
.take(Duration.ofMillis(1000)) |
|
|
|
|
.reduce((s1, s2) -> s1 + s2); |
|
|
|
|
.filter(s -> !s.equals("\n")) |
|
|
|
|
.map(s -> (s.replace("\n", ""))) |
|
|
|
|
.take(2); |
|
|
|
|
|
|
|
|
|
TestSubscriber |
|
|
|
|
.subscribe(result) |
|
|
|
|
.await() |
|
|
|
|
.assertValues("id:0\n:bar\ndata:foo\n\nid:1\n:bar\ndata:foo\n\n"); |
|
|
|
|
.assertValues( |
|
|
|
|
"id:0:bardata:foo", |
|
|
|
|
"id:1:bardata:foo" |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@RestController |
|
|
|
|