From 6a9b229249b9aeebc389d4cf1771e7411d5851fa Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 29 Apr 2014 22:15:18 -0400 Subject: [PATCH] Split destination cache into access vs update Issue: SPR-11657 --- .../broker/DefaultSubscriptionRegistry.java | 63 +++++++++++-------- .../DefaultSubscriptionRegistryTests.java | 2 + 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java index 569e4869f9..309a36feb8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java @@ -148,9 +148,13 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { */ private class DestinationCache { - /** Map from destination -> */ + /** Map from destination -> for fast look-ups */ + private final Map> accessCache = + new ConcurrentHashMap>(DEFAULT_CACHE_LIMIT); + + /** Map from destination -> with locking */ @SuppressWarnings("serial") - private final Map> cache = + private final Map> updateCache = new LinkedHashMap>(DEFAULT_CACHE_LIMIT, 0.75f, true) { @Override protected boolean removeEldestEntry(Map.Entry> eldest) { @@ -159,42 +163,45 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { }; - public MultiValueMap getSubscriptions(String destination) { - synchronized (this.cache) { - return this.cache.get(destination); - } + return this.accessCache.get(destination); } public void addSubscriptions(String destination, MultiValueMap subscriptions) { - synchronized (this.cache) { - this.cache.put(destination, subscriptions); + synchronized (this.updateCache) { + this.updateCache.put(destination, subscriptions); + this.accessCache.put(destination, new LinkedMultiValueMap(subscriptions)); } } public void updateAfterNewSubscription(String destination, String sessionId, String subsId) { - synchronized(this.cache) { - for (String cachedDestination : this.cache.keySet()) { + synchronized(this.updateCache) { + for (String cachedDestination : this.updateCache.keySet()) { if (getPathMatcher().match(destination, cachedDestination)) { - MultiValueMap subscriptions = this.cache.get(cachedDestination); - subscriptions.add(sessionId, subsId); + MultiValueMap subs = this.updateCache.get(cachedDestination); + subs.add(sessionId, subsId); + this.accessCache.put(cachedDestination, new LinkedMultiValueMap(subs)); } } } } public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) { - synchronized(this.cache) { - for (String cachedDestination : this.cache.keySet()) { + synchronized(this.updateCache) { + for (String cachedDestination : this.updateCache.keySet()) { if (getPathMatcher().match(destination, cachedDestination)) { - MultiValueMap subscriptions = this.cache.get(cachedDestination); - List subsIds = subscriptions.get(sessionId); + MultiValueMap subs = this.updateCache.get(cachedDestination); + List subsIds = subs.get(sessionId); subsIds.remove(subsId); if (subsIds.isEmpty()) { - subscriptions.remove(sessionId); + subs.remove(sessionId); } - if (subscriptions.isEmpty()) { - this.cache.remove(cachedDestination); + if (subs.isEmpty()) { + this.updateCache.remove(cachedDestination); + this.accessCache.remove(cachedDestination); + } + else { + this.accessCache.put(cachedDestination, new LinkedMultiValueMap(subs)); } } } @@ -202,14 +209,18 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { } public void updateAfterRemovedSession(SessionSubscriptionInfo info) { - synchronized(this.cache) { + synchronized(this.updateCache) { for (String destination : info.getDestinations()) { - for (String cachedDestination : this.cache.keySet()) { + for (String cachedDestination : this.updateCache.keySet()) { if (getPathMatcher().match(destination, cachedDestination)) { - MultiValueMap map = this.cache.get(cachedDestination); - map.remove(info.getSessionId()); - if (map.isEmpty()) { - this.cache.remove(cachedDestination); + MultiValueMap subs = this.updateCache.get(cachedDestination); + subs.remove(info.getSessionId()); + if (subs.isEmpty()) { + this.updateCache.remove(cachedDestination); + this.accessCache.remove(cachedDestination); + } + else { + this.accessCache.put(cachedDestination,new LinkedMultiValueMap(subs)); } } } @@ -219,7 +230,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { @Override public String toString() { - return "[cache=" + this.cache + "]"; + return "[cache=" + this.accessCache + "]"; } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java index b64336429c..633f06a6ff 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java @@ -176,10 +176,12 @@ public class DefaultSubscriptionRegistryTests { assertEquals(Arrays.asList(subs1), actual.get(sess2)); this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs1)); + actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM")); assertEquals(1, actual.size()); assertEquals(Arrays.asList(subs1), actual.get(sess2)); this.registry.unregisterSubscription(unsubscribeMessage(sess2, subs1)); + actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM")); assertEquals(0, actual.size()); }