Add cache limit to DefaultSessionRegistry and polish

Issue: SPR-11657
master
Rossen Stoyanchev 11 years ago
parent 1054080b24
commit c879796e00
  1. 169
      spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java
  2. 100
      spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

@ -16,7 +16,12 @@
package org.springframework.messaging.simp.broker;
import java.util.*;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -25,6 +30,7 @@ import org.springframework.util.AntPathMatcher;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.PathMatcher;
/**
* A default, simple in-memory implementation of {@link SubscriptionRegistry}.
@ -35,28 +41,54 @@ import org.springframework.util.MultiValueMap;
*/
public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
/** Default maximum number of entries for the destination cache: 1024 */
public static final int DEFAULT_CACHE_LIMIT = 1024;
/** The maximum number of entries in the cache */
private volatile int cacheLimit = DEFAULT_CACHE_LIMIT;
private final DestinationCache destinationCache = new DestinationCache();
private final SessionSubscriptionRegistry subscriptionRegistry = new SessionSubscriptionRegistry();
private AntPathMatcher pathMatcher = new AntPathMatcher();
private PathMatcher pathMatcher = new AntPathMatcher();
/**
* @param pathMatcher the pathMatcher to set
* Specify the maximum number of entries for the resolved destination cache.
* Default is 1024.
*/
public void setPathMatcher(AntPathMatcher pathMatcher) {
public void setCacheLimit(int cacheLimit) {
this.cacheLimit = cacheLimit;
}
/**
* Return the maximum number of entries for the resolved destination cache.
*/
public int getCacheLimit() {
return this.cacheLimit;
}
/**
* The PathMatcher to use.
*/
public void setPathMatcher(PathMatcher pathMatcher) {
this.pathMatcher = pathMatcher;
}
public AntPathMatcher getPathMatcher() {
/**
* The configured PathMatcher.
*/
public PathMatcher getPathMatcher() {
return this.pathMatcher;
}
@Override
protected void addSubscriptionInternal(String sessionId, String subsId, String destination, Message<?> message) {
SessionSubscriptionInfo info = this.subscriptionRegistry.addSubscription(sessionId, subsId, destination);
this.destinationCache.mapToDestination(destination, sessionId, subsId);
this.subscriptionRegistry.addSubscription(sessionId, subsId, destination);
this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
}
@Override
@ -65,7 +97,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
if (info != null) {
String destination = info.removeSubscription(subsId);
if (info.getSubscriptions(destination) == null) {
this.destinationCache.unmapFromDestination(destination, sessionId, subsId);
this.destinationCache.updateAfterRemovedSubscription(destination, sessionId, subsId);
}
}
}
@ -77,30 +109,28 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
if (logger.isDebugEnabled()) {
logger.debug("Unregistering subscriptions for sessionId=" + sessionId);
}
this.destinationCache.removeSessionSubscriptions(info);
this.destinationCache.updateAfterRemovedSession(info);
}
}
@Override
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
MultiValueMap<String,String> result;
if (this.destinationCache.isCachedDestination(destination)) {
result = this.destinationCache.getSubscriptions(destination);
MultiValueMap<String,String> result = this.destinationCache.getSubscriptions(destination);
if (result != null) {
return result;
}
else {
result = new LinkedMultiValueMap<String, String>();
for (SessionSubscriptionInfo info : this.subscriptionRegistry.getAllSubscriptions()) {
for (String destinationPattern : info.getDestinations()) {
if (this.pathMatcher.match(destinationPattern, destination)) {
for (String subscriptionId : info.getSubscriptions(destinationPattern)) {
result.add(info.sessionId, subscriptionId);
}
result = new LinkedMultiValueMap<String, String>();
for (SessionSubscriptionInfo info : this.subscriptionRegistry.getAllSubscriptions()) {
for (String destinationPattern : info.getDestinations()) {
if (this.pathMatcher.match(destinationPattern, destination)) {
for (String subscriptionId : info.getSubscriptions(destinationPattern)) {
result.add(info.sessionId, subscriptionId);
}
}
}
if(!result.isEmpty()) {
this.destinationCache.addSubscriptions(destination, result);
}
}
if(!result.isEmpty()) {
this.destinationCache.addSubscriptions(destination, result);
}
return result;
}
@ -112,67 +142,74 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
/**
* Provide direct lookup of session subscriptions by destination
* A cache for destinations previously resolved via
* {@link DefaultSubscriptionRegistry#findSubscriptionsInternal(String, Message)}
*/
private static class DestinationCache {
private AntPathMatcher pathMatcher = new AntPathMatcher();
private class DestinationCache {
/** Map from destination -> <sessionId, subscriptionId> */
@SuppressWarnings("serial")
private final Map<String, MultiValueMap<String, String>> cache =
new LinkedHashMap<String, MultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<String, MultiValueMap<String, String>> eldest) {
return size() > getCacheLimit();
}
};
// destination -> ..
private final Map<String, MultiValueMap<String, String>> subscriptionsByDestination =
new ConcurrentHashMap<String, MultiValueMap<String, String>>();
private final Object monitor = new Object();
public MultiValueMap<String, String> getSubscriptions(String destination) {
synchronized (this.cache) {
return this.cache.get(destination);
}
}
public void addSubscriptions(String destination, MultiValueMap<String, String> subscriptions) {
this.subscriptionsByDestination.put(destination, subscriptions);
synchronized (this.cache) {
this.cache.put(destination, subscriptions);
}
}
public void mapToDestination(String destination, String sessionId, String subsId) {
synchronized(this.monitor) {
for (String cachedDestination : this.subscriptionsByDestination.keySet()) {
if (this.pathMatcher.match(destination, cachedDestination)) {
MultiValueMap<String, String> registrations = this.subscriptionsByDestination.get(cachedDestination);
if (registrations == null) {
registrations = new LinkedMultiValueMap<String, String>();
}
registrations.add(sessionId, subsId);
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
synchronized(this.cache) {
for (String cachedDestination : this.cache.keySet()) {
if (getPathMatcher().match(destination, cachedDestination)) {
MultiValueMap<String, String> subscriptions = this.cache.get(cachedDestination);
subscriptions.add(sessionId, subsId);
}
}
}
}
public void unmapFromDestination(String destination, String sessionId, String subsId) {
synchronized(this.monitor) {
for (String cachedDestination : this.subscriptionsByDestination.keySet()) {
if (this.pathMatcher.match(destination, cachedDestination)) {
MultiValueMap<String, String> registrations = this.subscriptionsByDestination.get(cachedDestination);
List<String> subscriptions = registrations.get(sessionId);
while(subscriptions.remove(subsId));
if (subscriptions.isEmpty()) {
registrations.remove(sessionId);
public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) {
synchronized(this.cache) {
for (String cachedDestination : this.cache.keySet()) {
if (getPathMatcher().match(destination, cachedDestination)) {
MultiValueMap<String, String> subscriptions = this.cache.get(cachedDestination);
List<String> subsIds = subscriptions.get(sessionId);
subsIds.remove(subsId);
if (subsIds.isEmpty()) {
subscriptions.remove(sessionId);
}
if (registrations.isEmpty()) {
this.subscriptionsByDestination.remove(cachedDestination);
if (subscriptions.isEmpty()) {
this.cache.remove(cachedDestination);
}
}
}
}
}
public void removeSessionSubscriptions(SessionSubscriptionInfo info) {
synchronized(this.monitor) {
public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
synchronized(this.cache) {
for (String destination : info.getDestinations()) {
for (String cachedDestination : this.subscriptionsByDestination.keySet()) {
if (this.pathMatcher.match(destination, cachedDestination)) {
MultiValueMap<String, String> map = this.subscriptionsByDestination.get(cachedDestination);
for (String cachedDestination : this.cache.keySet()) {
if (getPathMatcher().match(destination, cachedDestination)) {
MultiValueMap<String, String> map = this.cache.get(cachedDestination);
map.remove(info.getSessionId());
if (map.isEmpty()) {
this.subscriptionsByDestination.remove(cachedDestination);
this.cache.remove(cachedDestination);
}
}
}
@ -180,17 +217,9 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
}
public MultiValueMap<String, String> getSubscriptions(String destination) {
return this.subscriptionsByDestination.get(destination);
}
public boolean isCachedDestination(String destination) {
return subscriptionsByDestination.containsKey(destination);
}
@Override
public String toString() {
return "[subscriptionsByDestination=" + this.subscriptionsByDestination + "]";
return "[cache=" + this.cache + "]";
}
}
@ -199,6 +228,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
*/
private static class SessionSubscriptionRegistry {
// sessionId -> SessionSubscriptionInfo
private final ConcurrentMap<String, SessionSubscriptionInfo> sessions =
new ConcurrentHashMap<String, SessionSubscriptionInfo>();
@ -241,6 +271,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
private final String sessionId;
// destination -> subscriptionIds
private final Map<String, Set<String>> subscriptions = new ConcurrentHashMap<String, Set<String>>(4);
private final Object monitor = new Object();

@ -134,59 +134,53 @@ public class DefaultSubscriptionRegistryTests {
// SPR-11657
@Test
public void registerMultipleSubscriptionsWithOneUsingDestinationPattern() {
String sessId1 = "sess01";
String sessId2 = "sess02";
String destPatternIbm = "/topic/PRICE.STOCK.*.IBM";
String destNasdaqIbm = "/topic/PRICE.STOCK.NASDAQ.IBM";
String destNyseIdm = "/topic/PRICE.STOCK.NYSE.IBM";
String destNasdaqGoogle = "/topic/PRICE.STOCK.NASDAQ.GOOG";
String sessId1ToDestPatternIbm = "subs01";
String sessId1ToDestNasdaqIbm = "subs02";
String sessId2TodestNasdaqIbm = "subs03";
String sessId2ToDestNyseIdm = "subs04";
String sessId2ToDestNasdaqGoogle = "subs05";
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestNasdaqIbm, destNasdaqIbm));
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestPatternIbm, destPatternIbm));
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(destNasdaqIbm));
assertEquals("Expected 1 elements " + actual, 1, actual.size());
assertEquals(Arrays.asList(sessId1ToDestNasdaqIbm, sessId1ToDestPatternIbm), actual.get(sessId1));
this.registry.registerSubscription(subscribeMessage(sessId2, sessId2TodestNasdaqIbm, destNasdaqIbm));
this.registry.registerSubscription(subscribeMessage(sessId2, sessId2ToDestNyseIdm, destNyseIdm));
this.registry.registerSubscription(subscribeMessage(sessId2, sessId2ToDestNasdaqGoogle, destNasdaqGoogle));
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
assertEquals("Expected 2 elements " + actual, 2, actual.size());
assertEquals(Arrays.asList(sessId1ToDestNasdaqIbm, sessId1ToDestPatternIbm), actual.get(sessId1));
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
this.registry.unregisterAllSubscriptions(sessId1);
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
assertEquals("Expected 1 elements " + actual, 1, actual.size());
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestPatternIbm, destPatternIbm));
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestNasdaqIbm, destNasdaqIbm));
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
assertEquals("Expected 2 elements " + actual, 2, actual.size());
assertEquals(Arrays.asList(sessId1ToDestPatternIbm, sessId1ToDestNasdaqIbm), actual.get(sessId1));
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
this.registry.unregisterSubscription(unsubscribeMessage(sessId1, sessId1ToDestNasdaqIbm));
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
assertEquals("Expected 2 elements " + actual, 2, actual.size());
assertEquals(Arrays.asList(sessId1ToDestPatternIbm), actual.get(sessId1));
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
this.registry.unregisterSubscription(unsubscribeMessage(sessId1, sessId1ToDestPatternIbm));
assertEquals("Expected 1 elements " + actual, 1, actual.size());
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
this.registry.unregisterSubscription(unsubscribeMessage(sessId2, sessId2TodestNasdaqIbm));
assertEquals("Expected 0 element " + actual, 0, actual.size());
public void registerSubscriptionsWithSimpleAndPatternDestinations() {
String sess1 = "sess01";
String sess2 = "sess02";
String subs1 = "subs01";
String subs2 = "subs02";
String subs3 = "subs03";
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NASDAQ.IBM"));
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.*.IBM"));
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
assertEquals(1, actual.size());
assertEquals(Arrays.asList(subs2, subs1), actual.get(sess1));
this.registry.registerSubscription(subscribeMessage(sess2, subs1, "/topic/PRICE.STOCK.NASDAQ.IBM"));
this.registry.registerSubscription(subscribeMessage(sess2, subs2, "/topic/PRICE.STOCK.NYSE.IBM"));
this.registry.registerSubscription(subscribeMessage(sess2, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
assertEquals(2, actual.size());
assertEquals(Arrays.asList(subs2, subs1), actual.get(sess1));
assertEquals(Arrays.asList(subs1), actual.get(sess2));
this.registry.unregisterAllSubscriptions(sess1);
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
assertEquals(1, actual.size());
assertEquals(Arrays.asList(subs1), actual.get(sess2));
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.*.IBM"));
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NASDAQ.IBM"));
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
assertEquals(2, actual.size());
assertEquals(Arrays.asList(subs1, subs2), actual.get(sess1));
assertEquals(Arrays.asList(subs1), actual.get(sess2));
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs2));
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
assertEquals(2, actual.size());
assertEquals(Arrays.asList(subs1), actual.get(sess1));
assertEquals(Arrays.asList(subs1), actual.get(sess2));
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs1));
assertEquals(1, actual.size());
assertEquals(Arrays.asList(subs1), actual.get(sess2));
this.registry.unregisterSubscription(unsubscribeMessage(sess2, subs1));
assertEquals(0, actual.size());
}
@Test

Loading…
Cancel
Save