diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 40198ba9fa..b12f9434b2 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -524,13 +524,9 @@ public abstract class DataBufferUtils { long pos = this.position.addAndGet(read); dataBuffer.writePosition(read); this.sink.next(dataBuffer); - // It's possible for cancellation to happen right before the push into the sink + // onNext may have led to onCancel (e.g. downstream takeUntil) if (this.disposed.get()) { - // TODO: - // This is not ideal since we already passed the buffer into the sink and - // releasing may cause something reading to fail. Maybe we won't have to - // do this after https://github.com/reactor/reactor-core/issues/1634 - complete(dataBuffer); + complete(); } else { DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); @@ -539,12 +535,12 @@ public abstract class DataBufferUtils { } } else { - complete(dataBuffer); + release(dataBuffer); + complete(); } } - private void complete(DataBuffer dataBuffer) { - release(dataBuffer); + private void complete() { this.sink.complete(); closeChannel(this.channel); }