|
|
@ -92,6 +92,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
this.activeMQBroker.addConnector("stomp://localhost:" + this.port); |
|
|
|
this.activeMQBroker.addConnector("stomp://localhost:" + this.port); |
|
|
|
this.activeMQBroker.setStartAsync(false); |
|
|
|
this.activeMQBroker.setStartAsync(false); |
|
|
|
this.activeMQBroker.setPersistent(false); |
|
|
|
this.activeMQBroker.setPersistent(false); |
|
|
|
|
|
|
|
this.activeMQBroker.setUseJmx(false); |
|
|
|
this.activeMQBroker.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 5); |
|
|
|
this.activeMQBroker.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 5); |
|
|
|
this.activeMQBroker.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 5); |
|
|
|
this.activeMQBroker.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 5); |
|
|
|
this.activeMQBroker.start(); |
|
|
|
this.activeMQBroker.start(); |
|
|
@ -139,6 +140,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
@Test |
|
|
|
@Test |
|
|
|
public void publishSubscribe() throws Exception { |
|
|
|
public void publishSubscribe() throws Exception { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Starting test publishSubscribe()"); |
|
|
|
|
|
|
|
|
|
|
|
String sess1 = "sess1"; |
|
|
|
String sess1 = "sess1"; |
|
|
|
String sess2 = "sess2"; |
|
|
|
String sess2 = "sess2"; |
|
|
|
String subs1 = "subs1"; |
|
|
|
String subs1 = "subs1"; |
|
|
@ -161,6 +164,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
|
|
|
|
|
|
|
|
@Test(expected=MessageDeliveryException.class) |
|
|
|
@Test(expected=MessageDeliveryException.class) |
|
|
|
public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception { |
|
|
|
public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()"); |
|
|
|
|
|
|
|
|
|
|
|
stopActiveMqBrokerAndAwait(); |
|
|
|
stopActiveMqBrokerAndAwait(); |
|
|
|
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); |
|
|
|
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); |
|
|
|
this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders())); |
|
|
|
this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders())); |
|
|
@ -169,6 +175,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
@Test |
|
|
|
@Test |
|
|
|
public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception { |
|
|
|
public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Starting test brokerBecomingUnvailableTriggersErrorFrame()"); |
|
|
|
|
|
|
|
|
|
|
|
String sess1 = "sess1"; |
|
|
|
String sess1 = "sess1"; |
|
|
|
MessageExchange connect = MessageExchangeBuilder.connect(sess1).build(); |
|
|
|
MessageExchange connect = MessageExchangeBuilder.connect(sess1).build(); |
|
|
|
this.relay.handleMessage(connect.message); |
|
|
|
this.relay.handleMessage(connect.message); |
|
|
@ -181,6 +189,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
|
public void brokerAvailabilityEventWhenStopped() throws Exception { |
|
|
|
public void brokerAvailabilityEventWhenStopped() throws Exception { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Starting test brokerAvailabilityEventWhenStopped()"); |
|
|
|
|
|
|
|
|
|
|
|
stopActiveMqBrokerAndAwait(); |
|
|
|
stopActiveMqBrokerAndAwait(); |
|
|
|
this.eventPublisher.expectBrokerAvailabilityEvent(false); |
|
|
|
this.eventPublisher.expectBrokerAvailabilityEvent(false); |
|
|
|
} |
|
|
|
} |
|
|
@ -188,6 +199,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
@Test |
|
|
|
@Test |
|
|
|
public void relayReconnectsIfBrokerComesBackUp() throws Exception { |
|
|
|
public void relayReconnectsIfBrokerComesBackUp() throws Exception { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Starting test relayReconnectsIfBrokerComesBackUp()"); |
|
|
|
|
|
|
|
|
|
|
|
String sess1 = "sess1"; |
|
|
|
String sess1 = "sess1"; |
|
|
|
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); |
|
|
|
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); |
|
|
|
this.relay.handleMessage(conn1.message); |
|
|
|
this.relay.handleMessage(conn1.message); |
|
|
@ -217,6 +230,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
@Test |
|
|
|
@Test |
|
|
|
public void disconnectClosesRelaySessionCleanly() throws Exception { |
|
|
|
public void disconnectClosesRelaySessionCleanly() throws Exception { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Starting test disconnectClosesRelaySessionCleanly()"); |
|
|
|
|
|
|
|
|
|
|
|
MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); |
|
|
|
MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); |
|
|
|
this.relay.handleMessage(connect.message); |
|
|
|
this.relay.handleMessage(connect.message); |
|
|
|
this.responseHandler.expectMessages(connect); |
|
|
|
this.responseHandler.expectMessages(connect); |
|
|
@ -246,6 +261,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
|
|
|
|
|
|
|
|
public void expectBrokerAvailabilityEvent(boolean isBrokerAvailable) throws InterruptedException { |
|
|
|
public void expectBrokerAvailabilityEvent(boolean isBrokerAvailable) throws InterruptedException { |
|
|
|
BrokerAvailabilityEvent event = this.eventQueue.poll(10000, TimeUnit.MILLISECONDS); |
|
|
|
BrokerAvailabilityEvent event = this.eventQueue.poll(10000, TimeUnit.MILLISECONDS); |
|
|
|
|
|
|
|
assertNotNull("Times out waiting for BrokerAvailabilityEvent[" + isBrokerAvailable + "]", event); |
|
|
|
assertEquals(isBrokerAvailable, event.isBrokerAvailable()); |
|
|
|
assertEquals(isBrokerAvailable, event.isBrokerAvailable()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -265,6 +281,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
public void expectMessages(MessageExchange... messageExchanges) throws InterruptedException { |
|
|
|
public void expectMessages(MessageExchange... messageExchanges) throws InterruptedException { |
|
|
|
for (MessageExchange exchange : messageExchanges) { |
|
|
|
for (MessageExchange exchange : messageExchanges) { |
|
|
|
Message<?> message = this.queue.poll(10000, TimeUnit.MILLISECONDS); |
|
|
|
Message<?> message = this.queue.poll(10000, TimeUnit.MILLISECONDS); |
|
|
|
|
|
|
|
assertNotNull("Timed out waiting for: " + exchange, message); |
|
|
|
assertTrue("Expected: " + exchange + " but got: " + message, exchange.matchMessage(message)); |
|
|
|
assertTrue("Expected: " + exchange + " but got: " + message, exchange.matchMessage(message)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|