diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java index 297b9ea9a7..108d149299 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java @@ -246,7 +246,15 @@ class ChannelSendOperator extends Mono implements Scannable { else if (this.state == State.NEW) { this.completed = true; this.state = State.FIRST_SIGNAL_RECEIVED; - writeFunction.apply(this).subscribe(this.writeCompletionBarrier); + Publisher result; + try { + result = writeFunction.apply(this); + } + catch (Throwable ex) { + this.writeCompletionBarrier.onError(ex); + return; + } + result.subscribe(this.writeCompletionBarrier); } else { this.completed = true;