From e2feed494b314a3291ae53ebd91922ac60e843b4 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 17 Jan 2014 11:24:28 -0500 Subject: [PATCH] Move "handlers" field to AbstractSubscribableChannel Move the management of subscribers to the abstract parent class where it belongs. --- .../support/AbstractSubscribableChannel.java | 40 +++++++++---------- .../support/ExecutorSubscribableChannel.java | 19 +-------- .../MessageBrokerConfigurationTests.java | 26 +++++------- ...essageBrokerConfigurationSupportTests.java | 16 ++------ 4 files changed, 34 insertions(+), 67 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java index 6faed3af0d..4b3693d07b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java @@ -19,6 +19,11 @@ package org.springframework.messaging.support; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.SubscribableChannel; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + /** * Abstract base class for {@link SubscribableChannel} implementations. * @@ -27,9 +32,20 @@ import org.springframework.messaging.SubscribableChannel; */ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel { + private final Set handlers = new CopyOnWriteArraySet(); + + + public Set getSubscribers() { + return Collections.unmodifiableSet(this.handlers); + } + + public boolean hasSubscription(MessageHandler handler) { + return this.handlers.contains(handler); + } + @Override - public final boolean subscribe(MessageHandler handler) { - boolean result = subscribeInternal(handler); + public boolean subscribe(MessageHandler handler) { + boolean result = this.handlers.add(handler); if (result) { if (logger.isDebugEnabled()) { logger.debug("[" + getBeanName() + "] subscribed " + handler); @@ -39,8 +55,8 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel } @Override - public final boolean unsubscribe(MessageHandler handler) { - boolean result = unsubscribeInternal(handler); + public boolean unsubscribe(MessageHandler handler) { + boolean result = this.handlers.remove(handler); if (result) { if (logger.isDebugEnabled()) { logger.debug("[" + getBeanName() + "] unsubscribed " + handler); @@ -49,20 +65,4 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel return result; } - - /** - * Whether the given {@link MessageHandler} is already subscribed. - */ - public abstract boolean hasSubscription(MessageHandler handler); - - /** - * Subscribe the given {@link MessageHandler}. - */ - protected abstract boolean subscribeInternal(MessageHandler handler); - - /** - * Unsubscribe the given {@link MessageHandler}. - */ - protected abstract boolean unsubscribeInternal(MessageHandler handler); - } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java index 6d8c109d6d..e93ac1f3b4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java @@ -35,8 +35,6 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { private final Executor executor; - private final Set handlers = new CopyOnWriteArraySet(); - /** * Create a new {@link ExecutorSubscribableChannel} instance where messages will be sent @@ -61,14 +59,9 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { return this.executor; } - @Override - public boolean hasSubscription(MessageHandler handler) { - return this.handlers.contains(handler); - } - @Override public boolean sendInternal(final Message message, long timeout) { - for (final MessageHandler handler : this.handlers) { + for (final MessageHandler handler : getSubscribers()) { if (this.executor == null) { handler.handleMessage(message); } @@ -84,14 +77,4 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { return true; } - @Override - public boolean subscribeInternal(MessageHandler handler) { - return this.handlers.add(handler); - } - - @Override - public boolean unsubscribeInternal(MessageHandler handler) { - return this.handlers.remove(handler); - } - } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java index b7c931e743..55ecc33a14 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java @@ -19,6 +19,7 @@ package org.springframework.messaging.simp.config; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.hamcrest.Matchers; import org.junit.Before; @@ -93,7 +94,7 @@ public class MessageBrokerConfigurationTests { public void clientInboundChannel() { TestChannel channel = this.simpleContext.getBean("clientInboundChannel", TestChannel.class); - List handlers = channel.handlers; + Set handlers = channel.getSubscribers(); assertEquals(3, handlers.size()); assertTrue(handlers.contains(simpleContext.getBean(SimpAnnotationMethodMessageHandler.class))); @@ -104,12 +105,12 @@ public class MessageBrokerConfigurationTests { @Test public void clientInboundChannelWithBrokerRelay() { TestChannel channel = this.brokerRelayContext.getBean("clientInboundChannel", TestChannel.class); - List values = channel.handlers; + Set handlers = channel.getSubscribers(); - assertEquals(3, values.size()); - assertTrue(values.contains(brokerRelayContext.getBean(SimpAnnotationMethodMessageHandler.class))); - assertTrue(values.contains(brokerRelayContext.getBean(UserDestinationMessageHandler.class))); - assertTrue(values.contains(brokerRelayContext.getBean(StompBrokerRelayMessageHandler.class))); + assertEquals(3, handlers.size()); + assertTrue(handlers.contains(brokerRelayContext.getBean(SimpAnnotationMethodMessageHandler.class))); + assertTrue(handlers.contains(brokerRelayContext.getBean(UserDestinationMessageHandler.class))); + assertTrue(handlers.contains(brokerRelayContext.getBean(StompBrokerRelayMessageHandler.class))); } @Test @@ -197,7 +198,7 @@ public class MessageBrokerConfigurationTests { @Test public void brokerChannel() { TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class); - List handlers = channel.handlers; + Set handlers = channel.getSubscribers(); assertEquals(2, handlers.size()); assertTrue(handlers.contains(simpleContext.getBean(UserDestinationMessageHandler.class))); @@ -207,7 +208,7 @@ public class MessageBrokerConfigurationTests { @Test public void brokerChannelWithBrokerRelay() { TestChannel channel = this.brokerRelayContext.getBean("brokerChannel", TestChannel.class); - List handlers = channel.handlers; + Set handlers = channel.getSubscribers(); assertEquals(2, handlers.size()); assertTrue(handlers.contains(brokerRelayContext.getBean(UserDestinationMessageHandler.class))); @@ -451,17 +452,8 @@ public class MessageBrokerConfigurationTests { private static class TestChannel extends ExecutorSubscribableChannel { - private final List handlers = new ArrayList<>(); - private final List> messages = new ArrayList<>(); - - @Override - public boolean subscribeInternal(MessageHandler handler) { - this.handlers.add(handler); - return super.subscribeInternal(handler); - } - @Override public boolean sendInternal(Message message, long timeout) { this.messages.add(message); diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java index eb95b538b1..a2af8b6e1f 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java @@ -19,6 +19,7 @@ package org.springframework.web.socket.config.annotation; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import org.junit.Before; import org.junit.Test; @@ -95,10 +96,10 @@ public class WebSocketMessageBrokerConfigurationSupportTests { @Test public void clientOutboundChannelChannel() { TestChannel channel = this.config.getBean("clientOutboundChannel", TestChannel.class); - List values = channel.handlers; + Set handlers = channel.getSubscribers(); - assertEquals(1, values.size()); - assertTrue(values.get(0) instanceof SubProtocolWebSocketHandler); + assertEquals(1, handlers.size()); + assertTrue(handlers.iterator().next() instanceof SubProtocolWebSocketHandler); } @@ -155,17 +156,8 @@ public class WebSocketMessageBrokerConfigurationSupportTests { private static class TestChannel extends ExecutorSubscribableChannel { - private final List handlers = new ArrayList<>(); - private final List> messages = new ArrayList<>(); - - @Override - public boolean subscribeInternal(MessageHandler handler) { - this.handlers.add(handler); - return super.subscribeInternal(handler); - } - @Override public boolean sendInternal(Message message, long timeout) { this.messages.add(message);