Move "handlers" field to AbstractSubscribableChannel

Move the management of subscribers to the abstract parent class where
it belongs.
master
Rossen Stoyanchev 11 years ago
parent 4e933b4765
commit e2feed494b
  1. 40
      spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java
  2. 19
      spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java
  3. 26
      spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java
  4. 16
      spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java

@ -19,6 +19,11 @@ package org.springframework.messaging.support;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* Abstract base class for {@link SubscribableChannel} implementations.
*
@ -27,9 +32,20 @@ import org.springframework.messaging.SubscribableChannel;
*/
public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel {
private final Set<MessageHandler> handlers = new CopyOnWriteArraySet<MessageHandler>();
public Set<MessageHandler> getSubscribers() {
return Collections.<MessageHandler>unmodifiableSet(this.handlers);
}
public boolean hasSubscription(MessageHandler handler) {
return this.handlers.contains(handler);
}
@Override
public final boolean subscribe(MessageHandler handler) {
boolean result = subscribeInternal(handler);
public boolean subscribe(MessageHandler handler) {
boolean result = this.handlers.add(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug("[" + getBeanName() + "] subscribed " + handler);
@ -39,8 +55,8 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
}
@Override
public final boolean unsubscribe(MessageHandler handler) {
boolean result = unsubscribeInternal(handler);
public boolean unsubscribe(MessageHandler handler) {
boolean result = this.handlers.remove(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug("[" + getBeanName() + "] unsubscribed " + handler);
@ -49,20 +65,4 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
return result;
}
/**
* Whether the given {@link MessageHandler} is already subscribed.
*/
public abstract boolean hasSubscription(MessageHandler handler);
/**
* Subscribe the given {@link MessageHandler}.
*/
protected abstract boolean subscribeInternal(MessageHandler handler);
/**
* Unsubscribe the given {@link MessageHandler}.
*/
protected abstract boolean unsubscribeInternal(MessageHandler handler);
}

@ -35,8 +35,6 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
private final Executor executor;
private final Set<MessageHandler> handlers = new CopyOnWriteArraySet<MessageHandler>();
/**
* Create a new {@link ExecutorSubscribableChannel} instance where messages will be sent
@ -61,14 +59,9 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
return this.executor;
}
@Override
public boolean hasSubscription(MessageHandler handler) {
return this.handlers.contains(handler);
}
@Override
public boolean sendInternal(final Message<?> message, long timeout) {
for (final MessageHandler handler : this.handlers) {
for (final MessageHandler handler : getSubscribers()) {
if (this.executor == null) {
handler.handleMessage(message);
}
@ -84,14 +77,4 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
return true;
}
@Override
public boolean subscribeInternal(MessageHandler handler) {
return this.handlers.add(handler);
}
@Override
public boolean unsubscribeInternal(MessageHandler handler) {
return this.handlers.remove(handler);
}
}

@ -19,6 +19,7 @@ package org.springframework.messaging.simp.config;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.hamcrest.Matchers;
import org.junit.Before;
@ -93,7 +94,7 @@ public class MessageBrokerConfigurationTests {
public void clientInboundChannel() {
TestChannel channel = this.simpleContext.getBean("clientInboundChannel", TestChannel.class);
List<MessageHandler> handlers = channel.handlers;
Set<MessageHandler> handlers = channel.getSubscribers();
assertEquals(3, handlers.size());
assertTrue(handlers.contains(simpleContext.getBean(SimpAnnotationMethodMessageHandler.class)));
@ -104,12 +105,12 @@ public class MessageBrokerConfigurationTests {
@Test
public void clientInboundChannelWithBrokerRelay() {
TestChannel channel = this.brokerRelayContext.getBean("clientInboundChannel", TestChannel.class);
List<MessageHandler> values = channel.handlers;
Set<MessageHandler> handlers = channel.getSubscribers();
assertEquals(3, values.size());
assertTrue(values.contains(brokerRelayContext.getBean(SimpAnnotationMethodMessageHandler.class)));
assertTrue(values.contains(brokerRelayContext.getBean(UserDestinationMessageHandler.class)));
assertTrue(values.contains(brokerRelayContext.getBean(StompBrokerRelayMessageHandler.class)));
assertEquals(3, handlers.size());
assertTrue(handlers.contains(brokerRelayContext.getBean(SimpAnnotationMethodMessageHandler.class)));
assertTrue(handlers.contains(brokerRelayContext.getBean(UserDestinationMessageHandler.class)));
assertTrue(handlers.contains(brokerRelayContext.getBean(StompBrokerRelayMessageHandler.class)));
}
@Test
@ -197,7 +198,7 @@ public class MessageBrokerConfigurationTests {
@Test
public void brokerChannel() {
TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class);
List<MessageHandler> handlers = channel.handlers;
Set<MessageHandler> handlers = channel.getSubscribers();
assertEquals(2, handlers.size());
assertTrue(handlers.contains(simpleContext.getBean(UserDestinationMessageHandler.class)));
@ -207,7 +208,7 @@ public class MessageBrokerConfigurationTests {
@Test
public void brokerChannelWithBrokerRelay() {
TestChannel channel = this.brokerRelayContext.getBean("brokerChannel", TestChannel.class);
List<MessageHandler> handlers = channel.handlers;
Set<MessageHandler> handlers = channel.getSubscribers();
assertEquals(2, handlers.size());
assertTrue(handlers.contains(brokerRelayContext.getBean(UserDestinationMessageHandler.class)));
@ -451,17 +452,8 @@ public class MessageBrokerConfigurationTests {
private static class TestChannel extends ExecutorSubscribableChannel {
private final List<MessageHandler> handlers = new ArrayList<>();
private final List<Message<?>> messages = new ArrayList<>();
@Override
public boolean subscribeInternal(MessageHandler handler) {
this.handlers.add(handler);
return super.subscribeInternal(handler);
}
@Override
public boolean sendInternal(Message<?> message, long timeout) {
this.messages.add(message);

@ -19,6 +19,7 @@ package org.springframework.web.socket.config.annotation;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
@ -95,10 +96,10 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
@Test
public void clientOutboundChannelChannel() {
TestChannel channel = this.config.getBean("clientOutboundChannel", TestChannel.class);
List<MessageHandler> values = channel.handlers;
Set<MessageHandler> handlers = channel.getSubscribers();
assertEquals(1, values.size());
assertTrue(values.get(0) instanceof SubProtocolWebSocketHandler);
assertEquals(1, handlers.size());
assertTrue(handlers.iterator().next() instanceof SubProtocolWebSocketHandler);
}
@ -155,17 +156,8 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
private static class TestChannel extends ExecutorSubscribableChannel {
private final List<MessageHandler> handlers = new ArrayList<>();
private final List<Message<?>> messages = new ArrayList<>();
@Override
public boolean subscribeInternal(MessageHandler handler) {
this.handlers.add(handler);
return super.subscribeInternal(handler);
}
@Override
public boolean sendInternal(Message<?> message, long timeout) {
this.messages.add(message);

Loading…
Cancel
Save