Add client login/passcode to broker relay

Issue: SPR-11154
master
Rossen Stoyanchev 11 years ago
parent 0a12f28b58
commit 4e5e700213
  1. 69
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java
  2. 86
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  3. 69
      spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java
  4. 31
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java
  5. 12
      spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java
  6. 30
      spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd
  7. 6
      spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java
  8. 4
      spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-relay.xml

@ -33,9 +33,13 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
private int relayPort = 61613;
private String applicationLogin = "guest";
private String clientLogin = "guest";
private String applicationPasscode = "guest";
private String clientPasscode = "guest";
private String systemLogin = "guest";
private String systemPasscode = "guest";
private Long systemHeartbeatSendInterval;
@ -68,23 +72,54 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
return this;
}
/**
* Set the login for the "system" relay session used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
* Set the login to use when creating connections to the STOMP broker on
* behalf of connected clients.
* <p>
* By default this is set to "guest".
*/
public StompBrokerRelayRegistration setApplicationLogin(String login) {
Assert.hasText(login, "applicationLogin must not be empty");
this.applicationLogin = login;
public StompBrokerRelayRegistration setClientLogin(String login) {
Assert.hasText(login, "clientLogin must not be empty");
this.clientLogin = login;
return this;
}
/**
* Set the passcode for the "system" relay session used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
* Set the passcode to use when creating connections to the STOMP broker on
* behalf of connected clients.
* <p>
* By default this is set to "guest".
*/
public StompBrokerRelayRegistration setApplicationPasscode(String passcode) {
Assert.hasText(passcode, "applicationPasscode must not be empty");
this.applicationPasscode = passcode;
public StompBrokerRelayRegistration setClientPasscode(String passcode) {
Assert.hasText(passcode, "clientPasscode must not be empty");
this.clientPasscode = passcode;
return this;
}
/**
* Set the login for the shared "system" connection used to send messages to
* the STOMP broker from within the application, i.e. messages not associated
* with a specific client session (e.g. REST/HTTP request handling method).
* <p>
* By default this is set to "guest".
*/
public StompBrokerRelayRegistration setSystemLogin(String login) {
Assert.hasText(login, "systemLogin must not be empty");
this.systemLogin = login;
return this;
}
/**
* Set the passcode for the shared "system" connection used to send messages to
* the STOMP broker from within the application, i.e. messages not associated
* with a specific client session (e.g. REST/HTTP request handling method).
* <p>
* By default this is set to "guest".
*/
public StompBrokerRelayRegistration setSystemPasscode(String passcode) {
Assert.hasText(passcode, "systemPasscode must not be empty");
this.systemPasscode = passcode;
return this;
}
@ -129,18 +164,22 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
handler.setRelayHost(this.relayHost);
handler.setRelayPort(this.relayPort);
handler.setSystemLogin(this.applicationLogin);
handler.setSystemPasscode(this.applicationPasscode);
handler.setClientLogin(this.clientLogin);
handler.setClientPasscode(this.clientPasscode);
handler.setSystemLogin(this.systemLogin);
handler.setSystemPasscode(this.systemPasscode);
if (this.systemHeartbeatSendInterval != null) {
handler.setSystemHeartbeatSendInterval(this.systemHeartbeatSendInterval);
}
if (this.systemHeartbeatReceiveInterval != null) {
handler.setSystemHeartbeatReceiveInterval(this.systemHeartbeatReceiveInterval);
}
handler.setAutoStartup(this.autoStartup);
return handler;
}

@ -33,7 +33,6 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@ -51,13 +50,16 @@ import org.springframework.util.concurrent.ListenableFutureTask;
*
* <p>This class also automatically opens a default "system" TCP connection to the message
* broker that is used for sending messages that originate from the server application (as
* opposed to from a client). Such messages are recognized because they are not associated
* with any client and therefore do not have a session id header. The "system" connection
* is effectively shared and cannot be used to receive messages. Several properties are
* provided to configure the "system" connection including the the
* {@link #setSystemLogin(String) login} {@link #setSystemPasscode(String) passcode},
* heartbeat {@link #setSystemHeartbeatSendInterval(long) send} and
* {@link #setSystemHeartbeatReceiveInterval(long) receive} intervals.
* opposed to from a client). Such messages are are not associated with any client and
* therefore do not have a session id header. The "system" connection is effectively
* shared and cannot be used to receive messages. Several properties are provided to
* configure the "system" connection including:
* <ul>
* <li>{@link #setSystemLogin(String)}</li>
* <li>{@link #setSystemPasscode(String)}</li>
* <li>{@link #setSystemHeartbeatSendInterval(long)}</li>
* <li>{@link #setSystemHeartbeatReceiveInterval(long)}</li>
* </ul>
*
* @author Rossen Stoyanchev
* @author Andy Wilkinson
@ -87,6 +89,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private int relayPort = 61613;
private String clientLogin = "guest";
private String clientPasscode = "guest";
private String systemLogin = "guest";
private String systemPasscode = "guest";
@ -198,9 +204,53 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* Set the login for the "system" connection used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
* <p>See class-level documentation for more information on the "system" connection.
* Set the login to use when creating connections to the STOMP broker on
* behalf of connected clients.
* <p>
* By default this is set to "guest".
* @see #setSystemLogin(String)
*/
public void setClientLogin(String clientLogin) {
Assert.hasText(clientLogin, "clientLogin must not be empty");
this.clientLogin = clientLogin;
}
/**
* @return the configured login to use for connections to the STOMP broker
* on behalf of connected clients.
* @see #getSystemLogin()
*/
public String getClientLogin() {
return this.clientLogin;
}
/**
* Set the clientPasscode to use to create connections to the STOMP broker on
* behalf of connected clients.
* <p>
* By default this is set to "guest".
* @see #setSystemPasscode(String)
*/
public void setClientPasscode(String clientPasscode) {
Assert.hasText(clientPasscode, "clientPasscode must not be empty");
this.clientPasscode = clientPasscode;
}
/**
* @return the configured passocde to use for connections to the STOMP broker on
* behalf of connected clients.
* @see #getSystemPasscode()
*/
public String getClientPasscode() {
return this.clientPasscode;
}
/**
* Set the login for the shared "system" connection used to send messages to
* the STOMP broker from within the application, i.e. messages not associated
* with a specific client session (e.g. REST/HTTP request handling method).
* <p>
* By default this is set to "guest".
*/
public void setSystemLogin(String systemLogin) {
Assert.hasText(systemLogin, "systemLogin must not be empty");
@ -208,23 +258,25 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return the login used by the "system" connection to connect to the STOMP broker
* @return the login used for the shared "system" connection to the STOMP broker
*/
public String getSystemLogin() {
return this.systemLogin;
}
/**
* Set the passcode for the "system" connection used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
* <p>See class-level documentation for more information on the "system" connection.
* Set the passcode for the shared "system" connection used to send messages to
* the STOMP broker from within the application, i.e. messages not associated
* with a specific client session (e.g. REST/HTTP request handling method).
* <p>
* By default this is set to "guest".
*/
public void setSystemPasscode(String systemPasscode) {
this.systemPasscode = systemPasscode;
}
/**
* @return the passcode used by the "system" connection to connect to the STOMP broker
* @return the passcode used for the shared "system" connection to the STOMP broker
*/
public String getSystemPasscode() {
return this.systemPasscode;
@ -348,6 +400,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (SimpMessageType.CONNECT.equals(messageType)) {
logger.debug("Processing CONNECT in session=" + sessionId);
headers.setLogin(this.clientLogin);
headers.setPasscode(this.clientPasscode);
if (getVirtualHost() != null) {
headers.setHost(getVirtualHost());
}

@ -0,0 +1,69 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import org.junit.Before;
import org.junit.Test;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.StubMessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import java.util.Arrays;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
/**
* Unit tests for {@link org.springframework.messaging.simp.config.StompBrokerRelayRegistration}.
*
* @author Rossen Stoyanchev
*/
public class StompBrokerRelayRegistrationTests {
@Test
public void test() {
SubscribableChannel clientInboundChannel = new StubMessageChannel();
MessageChannel clientOutboundChannel = new StubMessageChannel();
SubscribableChannel brokerChannel = new StubMessageChannel();
String[] destinationPrefixes = new String[] { "/foo", "/bar" };
StompBrokerRelayRegistration registration = new StompBrokerRelayRegistration(
clientInboundChannel, clientOutboundChannel, destinationPrefixes);
registration.setClientLogin("clientlogin");
registration.setClientPasscode("clientpasscode");
registration.setSystemLogin("syslogin");
registration.setSystemPasscode("syspasscode");
registration.setSystemHeartbeatReceiveInterval(123);
registration.setSystemHeartbeatSendInterval(456);
StompBrokerRelayMessageHandler relayMessageHandler = registration.getMessageHandler(brokerChannel);
assertEquals(Arrays.asList(destinationPrefixes), relayMessageHandler.getDestinationPrefixes());
assertEquals("clientlogin", relayMessageHandler.getClientLogin());
assertEquals("clientpasscode", relayMessageHandler.getClientPasscode());
assertEquals("syslogin", relayMessageHandler.getSystemLogin());
assertEquals("syspasscode", relayMessageHandler.getSystemPasscode());
assertEquals(123, relayMessageHandler.getSystemHeartbeatReceiveInterval());
assertEquals(456, relayMessageHandler.getSystemHeartbeatSendInterval());
}
}

@ -50,9 +50,7 @@ public class StompBrokerRelayMessageHandlerTests {
@Before
public void setup() {
this.tcpClient = new StubTcpOperations();
this.brokerRelay = new StompBrokerRelayMessageHandler(new StubMessageChannel(),
new StubMessageChannel(), new StubMessageChannel(), Arrays.asList("/topic"));
this.brokerRelay.setTcpClient(this.tcpClient);
@ -83,6 +81,35 @@ public class StompBrokerRelayMessageHandlerTests {
assertEquals(virtualHost, headers2.getHost());
}
@Test
public void testLoginPasscode() {
String sessionId = "sess1";
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
this.brokerRelay.setClientLogin("clientlogin");
this.brokerRelay.setClientPasscode("clientpasscode");
this.brokerRelay.setSystemLogin("syslogin");
this.brokerRelay.setSystemPasscode("syspasscode");
this.brokerRelay.start();
this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build());
List<Message<byte[]>> sent = this.tcpClient.connection.messages;
assertEquals(2, sent.size());
StompHeaderAccessor headers1 = StompHeaderAccessor.wrap(sent.get(0));
assertEquals("syslogin", headers1.getLogin());
assertEquals("syspasscode", headers1.getPasscode());
StompHeaderAccessor headers2 = StompHeaderAccessor.wrap(sent.get(1));
assertEquals("clientlogin", headers2.getLogin());
assertEquals("clientpasscode", headers2.getPasscode());
}
@Test
public void testDestinationExcluded() {

@ -303,11 +303,19 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
if(!relayPort.isEmpty()) {
mpvs.add("relayPort", Integer.valueOf(relayPort));
}
String attrValue = brokerRelayElem.getAttribute("login");
String attrValue = brokerRelayElem.getAttribute("client-login");
if(!attrValue.isEmpty()) {
mpvs.add("clientLogin",attrValue);
}
attrValue = brokerRelayElem.getAttribute("client-passcode");
if(!attrValue.isEmpty()) {
mpvs.add("clientPasscode", attrValue);
}
attrValue = brokerRelayElem.getAttribute("system-login");
if(!attrValue.isEmpty()) {
mpvs.add("systemLogin",attrValue);
}
attrValue = brokerRelayElem.getAttribute("passcode");
attrValue = brokerRelayElem.getAttribute("system-passcode");
if(!attrValue.isEmpty()) {
mpvs.add("systemPasscode", attrValue);
}

@ -243,17 +243,39 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="login" type="xsd:string">
<xsd:attribute name="client-login" type="xsd:string">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler"><![CDATA[
The login for the "system" connection used to send messages to the STOMP broker.
The login to use when creating connections to the STOMP broker on behalf of connected clients.
By default this is set to "guest".
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="passcode" type="xsd:string">
<xsd:attribute name="client-passcode" type="xsd:string">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler"><![CDATA[
The passcode for the "system" connection used to send messages to the STOMP broker.
The passcode to use when creating connections to the STOMP broker on behalf of connected clients.
By default this is set to "guest".
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="system-login" type="xsd:string">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler"><![CDATA[
The login for the shared "system" connection used to send messages to
the STOMP broker from within the application, i.e. messages not associated
with a specific client session (e.g. REST/HTTP request handling method).
By default this is set to "guest".
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="system-passcode" type="xsd:string">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler"><![CDATA[
The passcode for the shared "system" connection used to send messages to
the STOMP broker from within the application, i.e. messages not associated
with a specific client session (e.g. REST/HTTP request handling method).
By default this is set to "guest".
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>

@ -168,8 +168,10 @@ public class MessageBrokerBeanDefinitionParserTests {
StompBrokerRelayMessageHandler messageBroker = this.appContext.getBean(StompBrokerRelayMessageHandler.class);
assertNotNull(messageBroker);
assertEquals("login", messageBroker.getSystemLogin());
assertEquals("pass", messageBroker.getSystemPasscode());
assertEquals("clientlogin", messageBroker.getClientLogin());
assertEquals("clientpass", messageBroker.getClientPasscode());
assertEquals("syslogin", messageBroker.getSystemLogin());
assertEquals("syspass", messageBroker.getSystemPasscode());
assertEquals("relayhost", messageBroker.getRelayHost());
assertEquals(1234, messageBroker.getRelayPort());
assertEquals("spring.io", messageBroker.getVirtualHost());

@ -10,7 +10,9 @@
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:stomp-broker-relay prefix="/topic,/queue" relay-host="relayhost" relay-port="1234"
login="login" passcode="pass" heartbeat-send-interval="5000" heartbeat-receive-interval="5000"
client-login="clientlogin" client-passcode="clientpass"
system-login="syslogin" system-passcode="syspass"
heartbeat-send-interval="5000" heartbeat-receive-interval="5000"
virtual-host="spring.io"/>
</websocket:message-broker>

Loading…
Cancel
Save