@ -18,6 +18,7 @@ package org.springframework.messaging.simp.stomp;
import java.util.Collection ;
import java.util.Map ;
import java.util.concurrent.Callable ;
import java.util.concurrent.ConcurrentHashMap ;
import org.springframework.messaging.Message ;
@ -36,6 +37,7 @@ import org.springframework.messaging.tcp.reactor.ReactorTcpClient;
import org.springframework.util.Assert ;
import org.springframework.util.concurrent.ListenableFuture ;
import org.springframework.util.concurrent.ListenableFutureCallback ;
import org.springframework.util.concurrent.ListenableFutureTask ;
/ * *
* A { @link org . springframework . messaging . MessageHandler } that handles messages by forwarding them to a STOMP broker .
@ -68,6 +70,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private static final byte [ ] EMPTY_PAYLOAD = new byte [ 0 ] ;
private static final ListenableFutureTask < Void > EMPTY_TASK = new ListenableFutureTask < Void > ( new VoidCallable ( ) ) ;
// STOMP recommends error of margin for receiving heartbeats
private static final long HEARTBEAT_MULTIPLIER = 3 ;
@ -627,6 +631,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if ( ! this . isStompConnected ) {
if ( this . isRemoteClientSession ) {
if ( StompCommand . DISCONNECT . equals ( StompHeaderAccessor . wrap ( message ) . getCommand ( ) ) ) {
return EMPTY_TASK ;
}
// Should never happen
throw new IllegalStateException ( "Unexpected client message " + message +
( this . tcpConnection ! = null ?
@ -681,23 +688,20 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* /
public void clearConnection ( ) {
this . isStompConnected = false ;
try {
TcpConnection < byte [ ] > conn = this . tcpConnection ;
this . tcpConnection = null ;
if ( conn ! = null ) {
conn . close ( ) ;
if ( this . isRemoteClientSession ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( "Removing session '" + sessionId + "' (total remaining=" +
( StompBrokerRelayMessageHandler . this . connectionHandlers . size ( ) - 1 ) + ")" ) ;
}
StompBrokerRelayMessageHandler . this . connectionHandlers . remove ( this . sessionId ) ;
}
finally {
if ( this . isRemoteClientSession ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( "Removing session '" + sessionId + "' (total remaining=" +
( StompBrokerRelayMessageHandler . this . connectionHandlers . size ( ) - 1 ) + ")" ) ;
}
StompBrokerRelayMessageHandler . this . connectionHandlers . remove ( this . sessionId ) ;
}
this . isStompConnected = false ;
TcpConnection < byte [ ] > conn = this . tcpConnection ;
this . tcpConnection = null ;
if ( conn ! = null ) {
conn . close ( ) ;
}
}
@ -754,4 +758,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private static class VoidCallable implements Callable < Void > {
@Override
public Void call ( ) throws Exception {
return null ;
}
}
}