diff --git a/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java b/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java index 6ba4154e11..835b800f9a 100644 --- a/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java +++ b/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java @@ -172,6 +172,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe private int maxMessagesPerTask = Integer.MIN_VALUE; + private int idleConsumerLimit = 1; + private int idleTaskExecutionLimit = 1; private final Set scheduledInvokers = new HashSet(); @@ -237,9 +239,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe * However, if you want to optimize for a specific server, consider switching * this setting to at least CACHE_CONNECTION or CACHE_SESSION even in * conjunction with an external transaction manager. - *

Currently known servers that absolutely require CACHE_NONE for XA - * transaction processing: JBoss 4. For any others, consider raising the - * cache level. + *

Currently known servers that absolutely require CACHE_NONE for XA transaction + * processing: JBoss 4. For any others, consider raising the cache level. * @see #CACHE_NONE * @see #CACHE_CONNECTION * @see #CACHE_SESSION @@ -393,17 +394,43 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe /** * Return the maximum number of messages to process in one task. */ - public int getMaxMessagesPerTask() { + public final int getMaxMessagesPerTask() { synchronized (this.lifecycleMonitor) { return this.maxMessagesPerTask; } } /** - * Specify the limit for idle executions of a receive task, not having + * Specify the limit for the number of consumers that are allowed to be idle + * at any given time. + *

This limit is used by the {@link #scheduleNewInvokerIfAppropriate} method + * to determine if a new invoker should be created. Increasing the limit causes + * invokers to be created more aggressively. This can be useful to ramp up the + * number of invokers faster. + *

The default is 1, only scheduling a new invoker (which is likely to + * be idle initially) if none of the existing invokers is currently idle. + */ + public void setIdleConsumerLimit(int idleConsumerLimit) { + Assert.isTrue(idleConsumerLimit > 0, "'idleConsumerLimit' must be 1 or higher"); + synchronized (this.lifecycleMonitor) { + this.idleConsumerLimit = idleConsumerLimit; + } + } + + /** + * Return the limit for the number of idle consumers. + */ + public final int getIdleConsumerLimit() { + synchronized (this.lifecycleMonitor) { + return this.idleConsumerLimit; + } + } + + /** + * Specify the limit for idle executions of a consumer task, not having * received any message within its execution. If this limit is reached, * the task will shut down and leave receiving to other executing tasks. - *

Default is 1, closing idle resources early once a task didn't + *

The default is 1, closing idle resources early once a task didn't * receive a message. This applies to dynamic scheduling only; see the * {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} setting. * The minimum number of consumers @@ -434,9 +461,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } /** - * Return the limit for idle executions of a receive task. + * Return the limit for idle executions of a consumer task. */ - public int getIdleTaskExecutionLimit() { + public final int getIdleTaskExecutionLimit() { synchronized (this.lifecycleMonitor) { return this.idleTaskExecutionLimit; } @@ -662,18 +689,19 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe * Schedule a new invoker, increasing the total number of scheduled * invokers for this listener container, but only if the specified * "maxConcurrentConsumers" limit has not been reached yet, and only - * if this listener container does not currently have idle invokers - * that are waiting for new messages already. - *

Called once a message has been received, to scale up while + * if the specified "idleConsumerLimit" has not been reached either. + *

Called once a message has been received, in order to scale up while * processing the message in the invoker that originally received it. * @see #setTaskExecutor * @see #getMaxConcurrentConsumers() + * @see #getIdleConsumerLimit() */ protected void scheduleNewInvokerIfAppropriate() { if (isRunning()) { resumePausedTasks(); synchronized (this.lifecycleMonitor) { - if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && getIdleInvokerCount() == 0) { + if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && + getIdleInvokerCount() < this.idleConsumerLimit) { scheduleNewInvoker(); if (logger.isDebugEnabled()) { logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size());