Split destination cache into access vs update

Issue: SPR-11657
master
Rossen Stoyanchev 11 years ago
parent c879796e00
commit 6a9b229249
  1. 63
      spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java
  2. 2
      spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

@ -148,9 +148,13 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
*/
private class DestinationCache {
/** Map from destination -> <sessionId, subscriptionId> */
/** Map from destination -> <sessionId, subscriptionId> for fast look-ups */
private final Map<String, MultiValueMap<String, String>> accessCache =
new ConcurrentHashMap<String, MultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT);
/** Map from destination -> <sessionId, subscriptionId> with locking */
@SuppressWarnings("serial")
private final Map<String, MultiValueMap<String, String>> cache =
private final Map<String, MultiValueMap<String, String>> updateCache =
new LinkedHashMap<String, MultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<String, MultiValueMap<String, String>> eldest) {
@ -159,42 +163,45 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
};
public MultiValueMap<String, String> getSubscriptions(String destination) {
synchronized (this.cache) {
return this.cache.get(destination);
}
return this.accessCache.get(destination);
}
public void addSubscriptions(String destination, MultiValueMap<String, String> subscriptions) {
synchronized (this.cache) {
this.cache.put(destination, subscriptions);
synchronized (this.updateCache) {
this.updateCache.put(destination, subscriptions);
this.accessCache.put(destination, new LinkedMultiValueMap<String, String>(subscriptions));
}
}
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
synchronized(this.cache) {
for (String cachedDestination : this.cache.keySet()) {
synchronized(this.updateCache) {
for (String cachedDestination : this.updateCache.keySet()) {
if (getPathMatcher().match(destination, cachedDestination)) {
MultiValueMap<String, String> subscriptions = this.cache.get(cachedDestination);
subscriptions.add(sessionId, subsId);
MultiValueMap<String, String> subs = this.updateCache.get(cachedDestination);
subs.add(sessionId, subsId);
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
}
}
}
}
public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) {
synchronized(this.cache) {
for (String cachedDestination : this.cache.keySet()) {
synchronized(this.updateCache) {
for (String cachedDestination : this.updateCache.keySet()) {
if (getPathMatcher().match(destination, cachedDestination)) {
MultiValueMap<String, String> subscriptions = this.cache.get(cachedDestination);
List<String> subsIds = subscriptions.get(sessionId);
MultiValueMap<String, String> subs = this.updateCache.get(cachedDestination);
List<String> subsIds = subs.get(sessionId);
subsIds.remove(subsId);
if (subsIds.isEmpty()) {
subscriptions.remove(sessionId);
subs.remove(sessionId);
}
if (subscriptions.isEmpty()) {
this.cache.remove(cachedDestination);
if (subs.isEmpty()) {
this.updateCache.remove(cachedDestination);
this.accessCache.remove(cachedDestination);
}
else {
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
}
}
}
@ -202,14 +209,18 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
synchronized(this.cache) {
synchronized(this.updateCache) {
for (String destination : info.getDestinations()) {
for (String cachedDestination : this.cache.keySet()) {
for (String cachedDestination : this.updateCache.keySet()) {
if (getPathMatcher().match(destination, cachedDestination)) {
MultiValueMap<String, String> map = this.cache.get(cachedDestination);
map.remove(info.getSessionId());
if (map.isEmpty()) {
this.cache.remove(cachedDestination);
MultiValueMap<String, String> subs = this.updateCache.get(cachedDestination);
subs.remove(info.getSessionId());
if (subs.isEmpty()) {
this.updateCache.remove(cachedDestination);
this.accessCache.remove(cachedDestination);
}
else {
this.accessCache.put(cachedDestination,new LinkedMultiValueMap<String, String>(subs));
}
}
}
@ -219,7 +230,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
@Override
public String toString() {
return "[cache=" + this.cache + "]";
return "[cache=" + this.accessCache + "]";
}
}

@ -176,10 +176,12 @@ public class DefaultSubscriptionRegistryTests {
assertEquals(Arrays.asList(subs1), actual.get(sess2));
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs1));
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
assertEquals(1, actual.size());
assertEquals(Arrays.asList(subs1), actual.get(sess2));
this.registry.unregisterSubscription(unsubscribeMessage(sess2, subs1));
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
assertEquals(0, actual.size());
}

Loading…
Cancel
Save