diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java index f778964bf3..6effe2e870 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java @@ -33,9 +33,13 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { private int relayPort = 61613; - private String applicationLogin = "guest"; + private String clientLogin = "guest"; - private String applicationPasscode = "guest"; + private String clientPasscode = "guest"; + + private String systemLogin = "guest"; + + private String systemPasscode = "guest"; private Long systemHeartbeatSendInterval; @@ -68,23 +72,54 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { return this; } + /** - * Set the login for the "system" relay session used to send messages to the STOMP - * broker without having a client session (e.g. REST/HTTP request handling method). + * Set the login to use when creating connections to the STOMP broker on + * behalf of connected clients. + *

+ * By default this is set to "guest". */ - public StompBrokerRelayRegistration setApplicationLogin(String login) { - Assert.hasText(login, "applicationLogin must not be empty"); - this.applicationLogin = login; + public StompBrokerRelayRegistration setClientLogin(String login) { + Assert.hasText(login, "clientLogin must not be empty"); + this.clientLogin = login; return this; } /** - * Set the passcode for the "system" relay session used to send messages to the STOMP - * broker without having a client session (e.g. REST/HTTP request handling method). + * Set the passcode to use when creating connections to the STOMP broker on + * behalf of connected clients. + *

+ * By default this is set to "guest". */ - public StompBrokerRelayRegistration setApplicationPasscode(String passcode) { - Assert.hasText(passcode, "applicationPasscode must not be empty"); - this.applicationPasscode = passcode; + public StompBrokerRelayRegistration setClientPasscode(String passcode) { + Assert.hasText(passcode, "clientPasscode must not be empty"); + this.clientPasscode = passcode; + return this; + } + + /** + * Set the login for the shared "system" connection used to send messages to + * the STOMP broker from within the application, i.e. messages not associated + * with a specific client session (e.g. REST/HTTP request handling method). + *

+ * By default this is set to "guest". + */ + public StompBrokerRelayRegistration setSystemLogin(String login) { + Assert.hasText(login, "systemLogin must not be empty"); + this.systemLogin = login; + return this; + } + + /** + * Set the passcode for the shared "system" connection used to send messages to + * the STOMP broker from within the application, i.e. messages not associated + * with a specific client session (e.g. REST/HTTP request handling method). + *

+ * By default this is set to "guest". + */ + public StompBrokerRelayRegistration setSystemPasscode(String passcode) { + Assert.hasText(passcode, "systemPasscode must not be empty"); + this.systemPasscode = passcode; return this; } @@ -129,18 +164,22 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { handler.setRelayHost(this.relayHost); handler.setRelayPort(this.relayPort); - handler.setSystemLogin(this.applicationLogin); - handler.setSystemPasscode(this.applicationPasscode); + + handler.setClientLogin(this.clientLogin); + handler.setClientPasscode(this.clientPasscode); + + handler.setSystemLogin(this.systemLogin); + handler.setSystemPasscode(this.systemPasscode); if (this.systemHeartbeatSendInterval != null) { handler.setSystemHeartbeatSendInterval(this.systemHeartbeatSendInterval); } - if (this.systemHeartbeatReceiveInterval != null) { handler.setSystemHeartbeatReceiveInterval(this.systemHeartbeatReceiveInterval); } handler.setAutoStartup(this.autoStartup); + return handler; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index e6f10320f1..5dd5d92075 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -33,7 +33,6 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -51,13 +50,16 @@ import org.springframework.util.concurrent.ListenableFutureTask; * *

This class also automatically opens a default "system" TCP connection to the message * broker that is used for sending messages that originate from the server application (as - * opposed to from a client). Such messages are recognized because they are not associated - * with any client and therefore do not have a session id header. The "system" connection - * is effectively shared and cannot be used to receive messages. Several properties are - * provided to configure the "system" connection including the the - * {@link #setSystemLogin(String) login} {@link #setSystemPasscode(String) passcode}, - * heartbeat {@link #setSystemHeartbeatSendInterval(long) send} and - * {@link #setSystemHeartbeatReceiveInterval(long) receive} intervals. + * opposed to from a client). Such messages are are not associated with any client and + * therefore do not have a session id header. The "system" connection is effectively + * shared and cannot be used to receive messages. Several properties are provided to + * configure the "system" connection including: + *

* * @author Rossen Stoyanchev * @author Andy Wilkinson @@ -87,6 +89,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private int relayPort = 61613; + private String clientLogin = "guest"; + + private String clientPasscode = "guest"; + private String systemLogin = "guest"; private String systemPasscode = "guest"; @@ -198,9 +204,53 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Set the login for the "system" connection used to send messages to the STOMP - * broker without having a client session (e.g. REST/HTTP request handling method). - *

See class-level documentation for more information on the "system" connection. + * Set the login to use when creating connections to the STOMP broker on + * behalf of connected clients. + *

+ * By default this is set to "guest". + * @see #setSystemLogin(String) + */ + public void setClientLogin(String clientLogin) { + Assert.hasText(clientLogin, "clientLogin must not be empty"); + this.clientLogin = clientLogin; + } + + /** + * @return the configured login to use for connections to the STOMP broker + * on behalf of connected clients. + * @see #getSystemLogin() + */ + public String getClientLogin() { + return this.clientLogin; + } + + /** + * Set the clientPasscode to use to create connections to the STOMP broker on + * behalf of connected clients. + *

+ * By default this is set to "guest". + * @see #setSystemPasscode(String) + */ + public void setClientPasscode(String clientPasscode) { + Assert.hasText(clientPasscode, "clientPasscode must not be empty"); + this.clientPasscode = clientPasscode; + } + + /** + * @return the configured passocde to use for connections to the STOMP broker on + * behalf of connected clients. + * @see #getSystemPasscode() + */ + public String getClientPasscode() { + return this.clientPasscode; + } + + /** + * Set the login for the shared "system" connection used to send messages to + * the STOMP broker from within the application, i.e. messages not associated + * with a specific client session (e.g. REST/HTTP request handling method). + *

+ * By default this is set to "guest". */ public void setSystemLogin(String systemLogin) { Assert.hasText(systemLogin, "systemLogin must not be empty"); @@ -208,23 +258,25 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return the login used by the "system" connection to connect to the STOMP broker + * @return the login used for the shared "system" connection to the STOMP broker */ public String getSystemLogin() { return this.systemLogin; } /** - * Set the passcode for the "system" connection used to send messages to the STOMP - * broker without having a client session (e.g. REST/HTTP request handling method). - *

See class-level documentation for more information on the "system" connection. + * Set the passcode for the shared "system" connection used to send messages to + * the STOMP broker from within the application, i.e. messages not associated + * with a specific client session (e.g. REST/HTTP request handling method). + *

+ * By default this is set to "guest". */ public void setSystemPasscode(String systemPasscode) { this.systemPasscode = systemPasscode; } /** - * @return the passcode used by the "system" connection to connect to the STOMP broker + * @return the passcode used for the shared "system" connection to the STOMP broker */ public String getSystemPasscode() { return this.systemPasscode; @@ -348,6 +400,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (SimpMessageType.CONNECT.equals(messageType)) { logger.debug("Processing CONNECT in session=" + sessionId); + headers.setLogin(this.clientLogin); + headers.setPasscode(this.clientPasscode); if (getVirtualHost() != null) { headers.setHost(getVirtualHost()); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java new file mode 100644 index 0000000000..60ee3fe6a7 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java @@ -0,0 +1,69 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.StubMessageChannel; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; + +import java.util.Arrays; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for {@link org.springframework.messaging.simp.config.StompBrokerRelayRegistration}. + * + * @author Rossen Stoyanchev + */ +public class StompBrokerRelayRegistrationTests { + + + @Test + public void test() { + + SubscribableChannel clientInboundChannel = new StubMessageChannel(); + MessageChannel clientOutboundChannel = new StubMessageChannel(); + SubscribableChannel brokerChannel = new StubMessageChannel(); + + String[] destinationPrefixes = new String[] { "/foo", "/bar" }; + + StompBrokerRelayRegistration registration = new StompBrokerRelayRegistration( + clientInboundChannel, clientOutboundChannel, destinationPrefixes); + + registration.setClientLogin("clientlogin"); + registration.setClientPasscode("clientpasscode"); + registration.setSystemLogin("syslogin"); + registration.setSystemPasscode("syspasscode"); + registration.setSystemHeartbeatReceiveInterval(123); + registration.setSystemHeartbeatSendInterval(456); + + StompBrokerRelayMessageHandler relayMessageHandler = registration.getMessageHandler(brokerChannel); + + assertEquals(Arrays.asList(destinationPrefixes), relayMessageHandler.getDestinationPrefixes()); + assertEquals("clientlogin", relayMessageHandler.getClientLogin()); + assertEquals("clientpasscode", relayMessageHandler.getClientPasscode()); + assertEquals("syslogin", relayMessageHandler.getSystemLogin()); + assertEquals("syspasscode", relayMessageHandler.getSystemPasscode()); + assertEquals(123, relayMessageHandler.getSystemHeartbeatReceiveInterval()); + assertEquals(456, relayMessageHandler.getSystemHeartbeatSendInterval()); + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java index 432296d9ad..49365385a2 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java @@ -50,9 +50,7 @@ public class StompBrokerRelayMessageHandlerTests { @Before public void setup() { - this.tcpClient = new StubTcpOperations(); - this.brokerRelay = new StompBrokerRelayMessageHandler(new StubMessageChannel(), new StubMessageChannel(), new StubMessageChannel(), Arrays.asList("/topic")); this.brokerRelay.setTcpClient(this.tcpClient); @@ -83,6 +81,35 @@ public class StompBrokerRelayMessageHandlerTests { assertEquals(virtualHost, headers2.getHost()); } + @Test + public void testLoginPasscode() { + + String sessionId = "sess1"; + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); + headers.setSessionId(sessionId); + + this.brokerRelay.setClientLogin("clientlogin"); + this.brokerRelay.setClientPasscode("clientpasscode"); + + this.brokerRelay.setSystemLogin("syslogin"); + this.brokerRelay.setSystemPasscode("syspasscode"); + + this.brokerRelay.start(); + this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); + + List> sent = this.tcpClient.connection.messages; + assertEquals(2, sent.size()); + + StompHeaderAccessor headers1 = StompHeaderAccessor.wrap(sent.get(0)); + assertEquals("syslogin", headers1.getLogin()); + assertEquals("syspasscode", headers1.getPasscode()); + + StompHeaderAccessor headers2 = StompHeaderAccessor.wrap(sent.get(1)); + assertEquals("clientlogin", headers2.getLogin()); + assertEquals("clientpasscode", headers2.getPasscode()); + } + @Test public void testDestinationExcluded() { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java index c152e84ec6..64dcbe97f1 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java @@ -303,11 +303,19 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { if(!relayPort.isEmpty()) { mpvs.add("relayPort", Integer.valueOf(relayPort)); } - String attrValue = brokerRelayElem.getAttribute("login"); + String attrValue = brokerRelayElem.getAttribute("client-login"); + if(!attrValue.isEmpty()) { + mpvs.add("clientLogin",attrValue); + } + attrValue = brokerRelayElem.getAttribute("client-passcode"); + if(!attrValue.isEmpty()) { + mpvs.add("clientPasscode", attrValue); + } + attrValue = brokerRelayElem.getAttribute("system-login"); if(!attrValue.isEmpty()) { mpvs.add("systemLogin",attrValue); } - attrValue = brokerRelayElem.getAttribute("passcode"); + attrValue = brokerRelayElem.getAttribute("system-passcode"); if(!attrValue.isEmpty()) { mpvs.add("systemPasscode", attrValue); } diff --git a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd index 470822417a..08fa728841 100644 --- a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd +++ b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd @@ -243,17 +243,39 @@ ]]> - + - + + + + + + + + + + + diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index b5fc7f4fbb..600bc517f9 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -168,8 +168,10 @@ public class MessageBrokerBeanDefinitionParserTests { StompBrokerRelayMessageHandler messageBroker = this.appContext.getBean(StompBrokerRelayMessageHandler.class); assertNotNull(messageBroker); - assertEquals("login", messageBroker.getSystemLogin()); - assertEquals("pass", messageBroker.getSystemPasscode()); + assertEquals("clientlogin", messageBroker.getClientLogin()); + assertEquals("clientpass", messageBroker.getClientPasscode()); + assertEquals("syslogin", messageBroker.getSystemLogin()); + assertEquals("syspass", messageBroker.getSystemPasscode()); assertEquals("relayhost", messageBroker.getRelayHost()); assertEquals(1234, messageBroker.getRelayPort()); assertEquals("spring.io", messageBroker.getVirtualHost()); diff --git a/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-relay.xml b/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-relay.xml index 11746ff9d3..a5fd205b84 100644 --- a/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-relay.xml +++ b/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-relay.xml @@ -10,7 +10,9 @@