diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferPublisherInputStream.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferPublisherInputStream.java index abcd0ddfea..5405b9b65b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferPublisherInputStream.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferPublisherInputStream.java @@ -18,12 +18,12 @@ package org.springframework.core.io.buffer.support; import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.BlockingQueue; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; -import reactor.rx.Stream; +import reactor.core.publisher.Flux; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.util.Assert; @@ -35,7 +35,7 @@ class DataBufferPublisherInputStream extends InputStream { private final AtomicBoolean completed = new AtomicBoolean(); - private final BlockingQueue queue; + private final Iterator queue; private InputStream currentStream; @@ -57,8 +57,7 @@ class DataBufferPublisherInputStream extends InputStream { int requestSize) { Assert.notNull(publisher, "'publisher' must not be null"); - // TODO Avoid using Reactor Stream, it should not be a mandatory dependency of Spring Reactive - this.queue = Stream.from(publisher).toBlockingQueue(requestSize); + this.queue = Flux.from(publisher).toIterable(requestSize).iterator(); } @Override @@ -126,25 +125,22 @@ class DataBufferPublisherInputStream extends InputStream { return this.currentStream; } else { - // take() blocks until next or complete() then return null, - // but that's OK since this is a *blocking* InputStream - DataBuffer signal = this.queue.take(); - if (signal == null) { + // if upstream Publisher has completed, then complete() and return null, + if (!this.queue.hasNext()) { this.completed.set(true); return null; } + // next() blocks until next + // but that's OK since this is a *blocking* InputStream + DataBuffer signal = this.queue.next(); this.currentStream = signal.asInputStream(); return this.currentStream; } } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } catch (Throwable error) { this.completed.set(true); throw new IOException(error); } - throw new IOException(); }