|
|
|
@ -59,38 +59,42 @@ public abstract class DataBufferUtils { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Returns the given data buffer publisher as an input stream, streaming over all |
|
|
|
|
* underlying buffers when available. |
|
|
|
|
* Returns the given data buffer publisher as a blocking input stream, streaming over |
|
|
|
|
* all underlying buffers when available. |
|
|
|
|
* @param publisher the publisher to create the input stream for |
|
|
|
|
* @return the input stream |
|
|
|
|
*/ |
|
|
|
|
public static InputStream toInputStream(Publisher<DataBuffer> publisher) { |
|
|
|
|
Iterable<InputStream> streams = Flux.from(publisher). |
|
|
|
|
map(DataBuffer::asInputStream). |
|
|
|
|
toIterable(); |
|
|
|
|
toIterable(1); |
|
|
|
|
|
|
|
|
|
Enumeration<InputStream> enumeration = |
|
|
|
|
new IteratorEnumeration<InputStream>(streams); |
|
|
|
|
new IteratorEnumeration<InputStream>(streams.iterator()); |
|
|
|
|
|
|
|
|
|
return new SequenceInputStream(enumeration); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Enumeration wrapping an Iterator. |
|
|
|
|
*/ |
|
|
|
|
// TODO: move to CollectionUtils when we merge with Spring Framework?
|
|
|
|
|
private static class IteratorEnumeration<T> implements Enumeration<T> { |
|
|
|
|
|
|
|
|
|
private final Iterator<T> delegate; |
|
|
|
|
private final Iterator<T> iterator; |
|
|
|
|
|
|
|
|
|
public IteratorEnumeration(Iterable<T> iterable) { |
|
|
|
|
this.delegate = iterable.iterator(); |
|
|
|
|
public IteratorEnumeration(Iterator<T> iterator) { |
|
|
|
|
this.iterator = iterator; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public boolean hasMoreElements() { |
|
|
|
|
return delegate.hasNext(); |
|
|
|
|
return this.iterator.hasNext(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public T nextElement() { |
|
|
|
|
return delegate.next(); |
|
|
|
|
return this.iterator.next(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|