|
|
@ -23,7 +23,7 @@ import java.util.concurrent.BlockingQueue; |
|
|
|
|
|
|
|
|
|
|
|
import org.reactivestreams.Publisher; |
|
|
|
import org.reactivestreams.Publisher; |
|
|
|
import org.reactivestreams.Subscription; |
|
|
|
import org.reactivestreams.Subscription; |
|
|
|
import reactor.rx.Streams; |
|
|
|
import reactor.rx.Stream; |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* {@code InputStream} implementation based on a byte array {@link Publisher}. |
|
|
|
* {@code InputStream} implementation based on a byte array {@link Publisher}. |
|
|
@ -60,7 +60,7 @@ public class ByteBufferPublisherInputStream extends InputStream { |
|
|
|
public ByteBufferPublisherInputStream(Publisher<ByteBuffer> publisher, int requestSize) { |
|
|
|
public ByteBufferPublisherInputStream(Publisher<ByteBuffer> publisher, int requestSize) { |
|
|
|
Assert.notNull(publisher, "'publisher' must not be null"); |
|
|
|
Assert.notNull(publisher, "'publisher' must not be null"); |
|
|
|
|
|
|
|
|
|
|
|
this.queue = Streams.from(publisher).toBlockingQueue(requestSize); |
|
|
|
this.queue = Stream.from(publisher).toBlockingQueue(requestSize); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|