|
|
|
@ -977,36 +977,38 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe |
|
|
|
|
handleListenerSetupFailure(ex, true); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
synchronized (lifecycleMonitor) { |
|
|
|
|
decreaseActiveInvokerCount(); |
|
|
|
|
lifecycleMonitor.notifyAll(); |
|
|
|
|
} |
|
|
|
|
if (!messageReceived) { |
|
|
|
|
this.idleTaskExecutionCount++; |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
this.idleTaskExecutionCount = 0; |
|
|
|
|
} |
|
|
|
|
synchronized (lifecycleMonitor) { |
|
|
|
|
if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { |
|
|
|
|
// We're shutting down completely.
|
|
|
|
|
scheduledInvokers.remove(this); |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); |
|
|
|
|
} |
|
|
|
|
finally { |
|
|
|
|
synchronized (lifecycleMonitor) { |
|
|
|
|
decreaseActiveInvokerCount(); |
|
|
|
|
lifecycleMonitor.notifyAll(); |
|
|
|
|
clearResources(); |
|
|
|
|
} |
|
|
|
|
else if (isRunning()) { |
|
|
|
|
int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); |
|
|
|
|
if (nonPausedConsumers < 1) { |
|
|
|
|
logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " + |
|
|
|
|
"Check your thread pool configuration! Manual recovery necessary through a start() call."); |
|
|
|
|
if (!messageReceived) { |
|
|
|
|
this.idleTaskExecutionCount++; |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
this.idleTaskExecutionCount = 0; |
|
|
|
|
} |
|
|
|
|
synchronized (lifecycleMonitor) { |
|
|
|
|
if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { |
|
|
|
|
// We're shutting down completely.
|
|
|
|
|
scheduledInvokers.remove(this); |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); |
|
|
|
|
} |
|
|
|
|
lifecycleMonitor.notifyAll(); |
|
|
|
|
clearResources(); |
|
|
|
|
} |
|
|
|
|
else if (nonPausedConsumers < getConcurrentConsumers()) { |
|
|
|
|
logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " + |
|
|
|
|
"due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " + |
|
|
|
|
"to be triggered by remaining consumers."); |
|
|
|
|
else if (isRunning()) { |
|
|
|
|
int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); |
|
|
|
|
if (nonPausedConsumers < 1) { |
|
|
|
|
logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " + |
|
|
|
|
"Check your thread pool configuration! Manual recovery necessary through a start() call."); |
|
|
|
|
} |
|
|
|
|
else if (nonPausedConsumers < getConcurrentConsumers()) { |
|
|
|
|
logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " + |
|
|
|
|
"due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " + |
|
|
|
|
"to be triggered by remaining consumers."); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|