|
|
|
@ -82,11 +82,9 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { |
|
|
|
|
protected Mono<Void> writeAndFlushWithInternal( |
|
|
|
|
Publisher<Publisher<DataBuffer>> body) { |
|
|
|
|
Flux<ByteBuf> bodyWithFlushSignals = Flux.from(body). |
|
|
|
|
flatMap(publisher -> { |
|
|
|
|
return Flux.from(publisher). |
|
|
|
|
flatMap(publisher -> Flux.from(publisher). |
|
|
|
|
map(NettyDataBufferFactory::toByteBuf). |
|
|
|
|
concatWith(Mono.just(FLUSH_SIGNAL)); |
|
|
|
|
}); |
|
|
|
|
concatWith(Mono.just(FLUSH_SIGNAL))); |
|
|
|
|
Observable<ByteBuf> content = RxJava1Adapter.publisherToObservable(bodyWithFlushSignals); |
|
|
|
|
ResponseContentWriter<ByteBuf> writer = this.response.write(content, bb -> bb == FLUSH_SIGNAL); |
|
|
|
|
return RxJava1Adapter.observableToFlux(writer).then(); |
|
|
|
|