Fix locally failing test in FlushingIntegrationTests

master
Rossen Stoyanchev 6 years ago
parent 3165b3c024
commit 907a306ee2
  1. 2
      spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java
  2. 2
      spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java
  3. 72
      spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java
  4. 2
      spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java
  5. 2
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java
  6. 2
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java
  7. 2
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

@ -207,7 +207,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
DataBufferUtils.release(dataBuffer);
String value = charBuffer.toString();
if (logger.isDebugEnabled()) {
logger.debug("Decoded '" + "'");
logger.debug("Decoded '" + value + "'");
}
return value;
}

@ -88,7 +88,7 @@ public abstract class AbstractHttpHandlerIntegrationTests {
* set the number of buffered to an arbitrary number greater than N.
* </ul>
*/
public static Flux<Long> interval(Duration period, int count) {
public static Flux<Long> testInterval(Duration period, int count) {
return Flux.interval(period).take(count).onBackpressureBuffer(count);
}

@ -21,13 +21,11 @@ import java.time.Duration;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ServerHttpRequest;
@ -37,7 +35,9 @@ import org.springframework.web.reactive.function.client.WebClient;
import static org.junit.Assert.*;
/**
* Integration tests for server response flushing behavior.
* @author Sebastien Deleuze
* @author Rossen Stoyanchev
* @since 5.0
*/
public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@ -72,12 +72,11 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
Mono<String> result = this.webClient.get()
.uri("/write-and-complete")
.retrieve()
.bodyToFlux(String.class)
.reduce((s1, s2) -> s1 + s2);
.bodyToMono(String.class);
try {
StepVerifier.create(result)
.consumeNextWith(value -> assertTrue(value.length() >= 20000 * "0123456789".length()))
.consumeNextWith(value -> assertEquals(64 * 1024, value.length()))
.expectComplete()
.verify(Duration.ofSeconds(10L));
}
@ -119,35 +118,46 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
String path = request.getURI().getPath();
if (path.endsWith("write-and-flush")) {
Flux<Publisher<DataBuffer>> responseBody = interval(Duration.ofMillis(50), 2)
.map(l -> toDataBuffer("data" + l + "\n", response.bufferFactory()))
.map(Flux::just);
return response.writeAndFlushWith(responseBody.concatWith(Flux.never()));
}
else if (path.endsWith("write-and-complete")) {
Flux<DataBuffer> responseBody = Flux
.just("0123456789")
.repeat(20000)
.map(value -> toDataBuffer(value + "\n", response.bufferFactory()));
return response.writeWith(responseBody);
}
else if (path.endsWith("write-and-never-complete")) {
Flux<DataBuffer> responseBody = Flux
.just("0123456789")
.repeat(20000)
.map(value -> toDataBuffer(value + "\n", response.bufferFactory()));
return response.writeWith(responseBody.mergeWith(Flux.never()));
switch (path) {
case "/write-and-flush":
return response.writeAndFlushWith(
testInterval(Duration.ofMillis(50), 2)
.map(longValue -> wrap("data" + longValue + "\n", response))
.map(Flux::just)
.mergeWith(Flux.never()));
case "/write-and-complete":
return response.writeWith(
chunks1K().take(64).map(s -> wrap(s, response)));
case "/write-and-never-complete":
// Reactor requires at least 50 to flush, Tomcat/Undertow 8, Jetty 1
return response.writeWith(
chunks1K().take(64).map(s -> wrap(s, response)).mergeWith(Flux.never()));
default:
return response.writeWith(Flux.empty());
}
return response.writeWith(Flux.empty());
}
private DataBuffer toDataBuffer(String value, DataBufferFactory factory) {
byte[] data = (value).getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = factory.allocateBuffer(data.length);
buffer.write(data);
return buffer;
private Flux<String> chunks1K() {
return Flux.generate(sink -> {
StringBuilder sb = new StringBuilder();
do {
for (char c : "0123456789".toCharArray()) {
sb.append(c);
if (sb.length() + 1 == 1024) {
sink.next(sb.append("\n").toString());
return;
}
}
} while (true);
});
}
}
private DataBuffer wrap(String value, ServerHttpResponse response) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
return response.bufferFactory().wrap(bytes);
}
}
}

@ -117,7 +117,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn
private static class SseHandler {
private static final Flux<Long> INTERVAL = interval(Duration.ofMillis(100), 2);
private static final Flux<Long> INTERVAL = testInterval(Duration.ofMillis(100), 2);
Mono<ServerResponse> string(ServerRequest request) {
return ServerResponse.ok()

@ -102,7 +102,7 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra
@GetMapping(value = "/stream",
produces = { APPLICATION_STREAM_JSON_VALUE, "application/stream+x-jackson-smile" })
Flux<Person> person() {
return interval(Duration.ofMillis(100), 50).map(l -> new Person("foo " + l));
return testInterval(Duration.ofMillis(100), 50).map(l -> new Person("foo " + l));
}
}

@ -86,7 +86,7 @@ public class RequestMappingIntegrationTests extends AbstractRequestMappingIntegr
@GetMapping("/stream")
public Publisher<Long> stream() {
return interval(Duration.ofMillis(50), 5);
return testInterval(Duration.ofMillis(50), 5);
}
}

@ -177,7 +177,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@RequestMapping("/sse")
static class SseController {
private static final Flux<Long> INTERVAL = interval(Duration.ofMillis(100), 50);
private static final Flux<Long> INTERVAL = testInterval(Duration.ofMillis(100), 50);
private MonoProcessor<Void> cancellation = MonoProcessor.create();

Loading…
Cancel
Save