From 57af56aeeb880dd69b1bd5135cd3ef5e871214b9 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 17 Mar 2014 17:11:15 -0400 Subject: [PATCH] Improve logging in STOMP broker relay Ignore DISCONNECT messages if already disconnected. This can occur if the client explicitly sends a DISCONNECT frame and then closes the socket quickly. The closing of the WebSocket sessions also sends a DISCONNECT upstream to ensure the broker is aware. --- .../stomp/StompBrokerRelayMessageHandler.java | 42 ++++++++++++------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 5ad02ce5a5..b0cfa55c06 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -18,6 +18,7 @@ package org.springframework.messaging.simp.stomp; import java.util.Collection; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import org.springframework.messaging.Message; @@ -36,6 +37,7 @@ import org.springframework.messaging.tcp.reactor.ReactorTcpClient; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.concurrent.ListenableFutureTask; /** * A {@link org.springframework.messaging.MessageHandler} that handles messages by forwarding them to a STOMP broker. @@ -68,6 +70,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private static final byte[] EMPTY_PAYLOAD = new byte[0]; + private static final ListenableFutureTask EMPTY_TASK = new ListenableFutureTask(new VoidCallable()); + // STOMP recommends error of margin for receiving heartbeats private static final long HEARTBEAT_MULTIPLIER = 3; @@ -627,6 +631,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (!this.isStompConnected) { if (this.isRemoteClientSession) { + if (StompCommand.DISCONNECT.equals(StompHeaderAccessor.wrap(message).getCommand())) { + return EMPTY_TASK; + } // Should never happen throw new IllegalStateException("Unexpected client message " + message + (this.tcpConnection != null ? @@ -681,23 +688,20 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler */ public void clearConnection() { - this.isStompConnected = false; - - try { - TcpConnection conn = this.tcpConnection; - this.tcpConnection = null; - if (conn != null) { - conn.close(); + if (this.isRemoteClientSession) { + if (logger.isDebugEnabled()) { + logger.debug("Removing session '" + sessionId + "' (total remaining=" + + (StompBrokerRelayMessageHandler.this.connectionHandlers.size() - 1) + ")"); } + StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId); } - finally { - if (this.isRemoteClientSession) { - if (logger.isDebugEnabled()) { - logger.debug("Removing session '" + sessionId + "' (total remaining=" + - (StompBrokerRelayMessageHandler.this.connectionHandlers.size() - 1) + ")"); - } - StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId); - } + + this.isStompConnected = false; + + TcpConnection conn = this.tcpConnection; + this.tcpConnection = null; + if (conn != null) { + conn.close(); } } @@ -754,4 +758,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private static class VoidCallable implements Callable { + + @Override + public Void call() throws Exception { + return null; + } + } + }