From c879796e0092a188fa54117639067cfb29a2e858 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 29 Apr 2014 16:23:13 -0400 Subject: [PATCH] Add cache limit to DefaultSessionRegistry and polish Issue: SPR-11657 --- .../broker/DefaultSubscriptionRegistry.java | 169 +++++++++++------- .../DefaultSubscriptionRegistryTests.java | 100 +++++------ 2 files changed, 147 insertions(+), 122 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java index bf3ae44287..569e4869f9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java @@ -16,7 +16,12 @@ package org.springframework.messaging.simp.broker; -import java.util.*; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -25,6 +30,7 @@ import org.springframework.util.AntPathMatcher; import org.springframework.util.Assert; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import org.springframework.util.PathMatcher; /** * A default, simple in-memory implementation of {@link SubscriptionRegistry}. @@ -35,28 +41,54 @@ import org.springframework.util.MultiValueMap; */ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { + /** Default maximum number of entries for the destination cache: 1024 */ + public static final int DEFAULT_CACHE_LIMIT = 1024; + + + /** The maximum number of entries in the cache */ + private volatile int cacheLimit = DEFAULT_CACHE_LIMIT; + private final DestinationCache destinationCache = new DestinationCache(); private final SessionSubscriptionRegistry subscriptionRegistry = new SessionSubscriptionRegistry(); - private AntPathMatcher pathMatcher = new AntPathMatcher(); + private PathMatcher pathMatcher = new AntPathMatcher(); + /** - * @param pathMatcher the pathMatcher to set + * Specify the maximum number of entries for the resolved destination cache. + * Default is 1024. */ - public void setPathMatcher(AntPathMatcher pathMatcher) { + public void setCacheLimit(int cacheLimit) { + this.cacheLimit = cacheLimit; + } + + /** + * Return the maximum number of entries for the resolved destination cache. + */ + public int getCacheLimit() { + return this.cacheLimit; + } + + /** + * The PathMatcher to use. + */ + public void setPathMatcher(PathMatcher pathMatcher) { this.pathMatcher = pathMatcher; } - public AntPathMatcher getPathMatcher() { + /** + * The configured PathMatcher. + */ + public PathMatcher getPathMatcher() { return this.pathMatcher; } @Override protected void addSubscriptionInternal(String sessionId, String subsId, String destination, Message message) { - SessionSubscriptionInfo info = this.subscriptionRegistry.addSubscription(sessionId, subsId, destination); - this.destinationCache.mapToDestination(destination, sessionId, subsId); + this.subscriptionRegistry.addSubscription(sessionId, subsId, destination); + this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId); } @Override @@ -65,7 +97,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { if (info != null) { String destination = info.removeSubscription(subsId); if (info.getSubscriptions(destination) == null) { - this.destinationCache.unmapFromDestination(destination, sessionId, subsId); + this.destinationCache.updateAfterRemovedSubscription(destination, sessionId, subsId); } } } @@ -77,30 +109,28 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { if (logger.isDebugEnabled()) { logger.debug("Unregistering subscriptions for sessionId=" + sessionId); } - this.destinationCache.removeSessionSubscriptions(info); + this.destinationCache.updateAfterRemovedSession(info); } } @Override protected MultiValueMap findSubscriptionsInternal(String destination, Message message) { - MultiValueMap result; - if (this.destinationCache.isCachedDestination(destination)) { - result = this.destinationCache.getSubscriptions(destination); + MultiValueMap result = this.destinationCache.getSubscriptions(destination); + if (result != null) { + return result; } - else { - result = new LinkedMultiValueMap(); - for (SessionSubscriptionInfo info : this.subscriptionRegistry.getAllSubscriptions()) { - for (String destinationPattern : info.getDestinations()) { - if (this.pathMatcher.match(destinationPattern, destination)) { - for (String subscriptionId : info.getSubscriptions(destinationPattern)) { - result.add(info.sessionId, subscriptionId); - } + result = new LinkedMultiValueMap(); + for (SessionSubscriptionInfo info : this.subscriptionRegistry.getAllSubscriptions()) { + for (String destinationPattern : info.getDestinations()) { + if (this.pathMatcher.match(destinationPattern, destination)) { + for (String subscriptionId : info.getSubscriptions(destinationPattern)) { + result.add(info.sessionId, subscriptionId); } } } - if(!result.isEmpty()) { - this.destinationCache.addSubscriptions(destination, result); - } + } + if(!result.isEmpty()) { + this.destinationCache.addSubscriptions(destination, result); } return result; } @@ -112,67 +142,74 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { } - - /** - * Provide direct lookup of session subscriptions by destination + * A cache for destinations previously resolved via + * {@link DefaultSubscriptionRegistry#findSubscriptionsInternal(String, Message)} */ - private static class DestinationCache { - - private AntPathMatcher pathMatcher = new AntPathMatcher(); + private class DestinationCache { + + /** Map from destination -> */ + @SuppressWarnings("serial") + private final Map> cache = + new LinkedHashMap>(DEFAULT_CACHE_LIMIT, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry> eldest) { + return size() > getCacheLimit(); + } + }; - // destination -> .. - private final Map> subscriptionsByDestination = - new ConcurrentHashMap>(); - private final Object monitor = new Object(); + public MultiValueMap getSubscriptions(String destination) { + synchronized (this.cache) { + return this.cache.get(destination); + } + } public void addSubscriptions(String destination, MultiValueMap subscriptions) { - this.subscriptionsByDestination.put(destination, subscriptions); + synchronized (this.cache) { + this.cache.put(destination, subscriptions); + } } - public void mapToDestination(String destination, String sessionId, String subsId) { - synchronized(this.monitor) { - for (String cachedDestination : this.subscriptionsByDestination.keySet()) { - if (this.pathMatcher.match(destination, cachedDestination)) { - MultiValueMap registrations = this.subscriptionsByDestination.get(cachedDestination); - if (registrations == null) { - registrations = new LinkedMultiValueMap(); - } - registrations.add(sessionId, subsId); + public void updateAfterNewSubscription(String destination, String sessionId, String subsId) { + synchronized(this.cache) { + for (String cachedDestination : this.cache.keySet()) { + if (getPathMatcher().match(destination, cachedDestination)) { + MultiValueMap subscriptions = this.cache.get(cachedDestination); + subscriptions.add(sessionId, subsId); } } } } - public void unmapFromDestination(String destination, String sessionId, String subsId) { - synchronized(this.monitor) { - for (String cachedDestination : this.subscriptionsByDestination.keySet()) { - if (this.pathMatcher.match(destination, cachedDestination)) { - MultiValueMap registrations = this.subscriptionsByDestination.get(cachedDestination); - List subscriptions = registrations.get(sessionId); - while(subscriptions.remove(subsId)); - if (subscriptions.isEmpty()) { - registrations.remove(sessionId); + public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) { + synchronized(this.cache) { + for (String cachedDestination : this.cache.keySet()) { + if (getPathMatcher().match(destination, cachedDestination)) { + MultiValueMap subscriptions = this.cache.get(cachedDestination); + List subsIds = subscriptions.get(sessionId); + subsIds.remove(subsId); + if (subsIds.isEmpty()) { + subscriptions.remove(sessionId); } - if (registrations.isEmpty()) { - this.subscriptionsByDestination.remove(cachedDestination); + if (subscriptions.isEmpty()) { + this.cache.remove(cachedDestination); } } } } } - public void removeSessionSubscriptions(SessionSubscriptionInfo info) { - synchronized(this.monitor) { + public void updateAfterRemovedSession(SessionSubscriptionInfo info) { + synchronized(this.cache) { for (String destination : info.getDestinations()) { - for (String cachedDestination : this.subscriptionsByDestination.keySet()) { - if (this.pathMatcher.match(destination, cachedDestination)) { - MultiValueMap map = this.subscriptionsByDestination.get(cachedDestination); + for (String cachedDestination : this.cache.keySet()) { + if (getPathMatcher().match(destination, cachedDestination)) { + MultiValueMap map = this.cache.get(cachedDestination); map.remove(info.getSessionId()); if (map.isEmpty()) { - this.subscriptionsByDestination.remove(cachedDestination); + this.cache.remove(cachedDestination); } } } @@ -180,17 +217,9 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { } } - public MultiValueMap getSubscriptions(String destination) { - return this.subscriptionsByDestination.get(destination); - } - - public boolean isCachedDestination(String destination) { - return subscriptionsByDestination.containsKey(destination); - } - @Override public String toString() { - return "[subscriptionsByDestination=" + this.subscriptionsByDestination + "]"; + return "[cache=" + this.cache + "]"; } } @@ -199,6 +228,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { */ private static class SessionSubscriptionRegistry { + // sessionId -> SessionSubscriptionInfo private final ConcurrentMap sessions = new ConcurrentHashMap(); @@ -241,6 +271,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { private final String sessionId; + // destination -> subscriptionIds private final Map> subscriptions = new ConcurrentHashMap>(4); private final Object monitor = new Object(); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java index 5fc8c63260..b64336429c 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java @@ -134,59 +134,53 @@ public class DefaultSubscriptionRegistryTests { // SPR-11657 @Test - public void registerMultipleSubscriptionsWithOneUsingDestinationPattern() { - - String sessId1 = "sess01"; - String sessId2 = "sess02"; - - String destPatternIbm = "/topic/PRICE.STOCK.*.IBM"; - String destNasdaqIbm = "/topic/PRICE.STOCK.NASDAQ.IBM"; - String destNyseIdm = "/topic/PRICE.STOCK.NYSE.IBM"; - String destNasdaqGoogle = "/topic/PRICE.STOCK.NASDAQ.GOOG"; - - String sessId1ToDestPatternIbm = "subs01"; - String sessId1ToDestNasdaqIbm = "subs02"; - String sessId2TodestNasdaqIbm = "subs03"; - String sessId2ToDestNyseIdm = "subs04"; - String sessId2ToDestNasdaqGoogle = "subs05"; - - this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestNasdaqIbm, destNasdaqIbm)); - this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestPatternIbm, destPatternIbm)); - MultiValueMap actual = this.registry.findSubscriptions(message(destNasdaqIbm)); - assertEquals("Expected 1 elements " + actual, 1, actual.size()); - assertEquals(Arrays.asList(sessId1ToDestNasdaqIbm, sessId1ToDestPatternIbm), actual.get(sessId1)); - - this.registry.registerSubscription(subscribeMessage(sessId2, sessId2TodestNasdaqIbm, destNasdaqIbm)); - this.registry.registerSubscription(subscribeMessage(sessId2, sessId2ToDestNyseIdm, destNyseIdm)); - this.registry.registerSubscription(subscribeMessage(sessId2, sessId2ToDestNasdaqGoogle, destNasdaqGoogle)); - actual = this.registry.findSubscriptions(message(destNasdaqIbm)); - assertEquals("Expected 2 elements " + actual, 2, actual.size()); - assertEquals(Arrays.asList(sessId1ToDestNasdaqIbm, sessId1ToDestPatternIbm), actual.get(sessId1)); - assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2)); - - this.registry.unregisterAllSubscriptions(sessId1); - actual = this.registry.findSubscriptions(message(destNasdaqIbm)); - assertEquals("Expected 1 elements " + actual, 1, actual.size()); - assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2)); - - this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestPatternIbm, destPatternIbm)); - this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestNasdaqIbm, destNasdaqIbm)); - actual = this.registry.findSubscriptions(message(destNasdaqIbm)); - assertEquals("Expected 2 elements " + actual, 2, actual.size()); - assertEquals(Arrays.asList(sessId1ToDestPatternIbm, sessId1ToDestNasdaqIbm), actual.get(sessId1)); - assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2)); - - this.registry.unregisterSubscription(unsubscribeMessage(sessId1, sessId1ToDestNasdaqIbm)); - actual = this.registry.findSubscriptions(message(destNasdaqIbm)); - assertEquals("Expected 2 elements " + actual, 2, actual.size()); - assertEquals(Arrays.asList(sessId1ToDestPatternIbm), actual.get(sessId1)); - assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2)); - this.registry.unregisterSubscription(unsubscribeMessage(sessId1, sessId1ToDestPatternIbm)); - assertEquals("Expected 1 elements " + actual, 1, actual.size()); - assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2)); - - this.registry.unregisterSubscription(unsubscribeMessage(sessId2, sessId2TodestNasdaqIbm)); - assertEquals("Expected 0 element " + actual, 0, actual.size()); + public void registerSubscriptionsWithSimpleAndPatternDestinations() { + + String sess1 = "sess01"; + String sess2 = "sess02"; + + String subs1 = "subs01"; + String subs2 = "subs02"; + String subs3 = "subs03"; + + this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NASDAQ.IBM")); + this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.*.IBM")); + MultiValueMap actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM")); + assertEquals(1, actual.size()); + assertEquals(Arrays.asList(subs2, subs1), actual.get(sess1)); + + this.registry.registerSubscription(subscribeMessage(sess2, subs1, "/topic/PRICE.STOCK.NASDAQ.IBM")); + this.registry.registerSubscription(subscribeMessage(sess2, subs2, "/topic/PRICE.STOCK.NYSE.IBM")); + this.registry.registerSubscription(subscribeMessage(sess2, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG")); + actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM")); + assertEquals(2, actual.size()); + assertEquals(Arrays.asList(subs2, subs1), actual.get(sess1)); + assertEquals(Arrays.asList(subs1), actual.get(sess2)); + + this.registry.unregisterAllSubscriptions(sess1); + actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM")); + assertEquals(1, actual.size()); + assertEquals(Arrays.asList(subs1), actual.get(sess2)); + + this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.*.IBM")); + this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NASDAQ.IBM")); + actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM")); + assertEquals(2, actual.size()); + assertEquals(Arrays.asList(subs1, subs2), actual.get(sess1)); + assertEquals(Arrays.asList(subs1), actual.get(sess2)); + + this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs2)); + actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM")); + assertEquals(2, actual.size()); + assertEquals(Arrays.asList(subs1), actual.get(sess1)); + assertEquals(Arrays.asList(subs1), actual.get(sess2)); + + this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs1)); + assertEquals(1, actual.size()); + assertEquals(Arrays.asList(subs1), actual.get(sess2)); + + this.registry.unregisterSubscription(unsubscribeMessage(sess2, subs1)); + assertEquals(0, actual.size()); } @Test