diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index b10e0ecf13..a3cc4f10c1 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -28,6 +28,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.event.SmartApplicationListener; +import org.springframework.core.task.TaskExecutor; import org.springframework.lang.Nullable; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.converter.ByteArrayMessageConverter; @@ -139,7 +140,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } @Bean - public ThreadPoolTaskExecutor clientInboundChannelExecutor() { + public TaskExecutor clientInboundChannelExecutor() { TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor(); ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); executor.setThreadNamePrefix("clientInboundChannel-"); @@ -175,7 +176,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } @Bean - public ThreadPoolTaskExecutor clientOutboundChannelExecutor() { + public TaskExecutor clientOutboundChannelExecutor() { TaskExecutorRegistration reg = getClientOutboundChannelRegistration().taskExecutor(); ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); executor.setThreadNamePrefix("clientOutboundChannel-"); @@ -211,7 +212,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } @Bean - public ThreadPoolTaskExecutor brokerChannelExecutor() { + public TaskExecutor brokerChannelExecutor() { ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); ThreadPoolTaskExecutor executor; if (reg.hasTaskExecutor()) { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java index ce536e6ba2..91182e1b23 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java @@ -20,12 +20,12 @@ import java.time.Duration; import java.time.Instant; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.core.task.TaskExecutor; import org.springframework.lang.Nullable; import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; import org.springframework.scheduling.TaskScheduler; @@ -66,10 +66,10 @@ public class WebSocketMessageBrokerStats { private StompBrokerRelayMessageHandler stompBrokerRelay; @Nullable - private ThreadPoolExecutor inboundChannelExecutor; + private TaskExecutor inboundChannelExecutor; @Nullable - private ThreadPoolExecutor outboundChannelExecutor; + private TaskExecutor outboundChannelExecutor; @Nullable private TaskScheduler sockJsTaskScheduler; @@ -106,12 +106,12 @@ public class WebSocketMessageBrokerStats { this.stompBrokerRelay = stompBrokerRelay; } - public void setInboundChannelExecutor(ThreadPoolTaskExecutor inboundChannelExecutor) { - this.inboundChannelExecutor = inboundChannelExecutor.getThreadPoolExecutor(); + public void setInboundChannelExecutor(TaskExecutor inboundChannelExecutor) { + this.inboundChannelExecutor = inboundChannelExecutor; } - public void setOutboundChannelExecutor(ThreadPoolTaskExecutor outboundChannelExecutor) { - this.outboundChannelExecutor = outboundChannelExecutor.getThreadPoolExecutor(); + public void setOutboundChannelExecutor(TaskExecutor outboundChannelExecutor) { + this.outboundChannelExecutor = outboundChannelExecutor; } public void setSockJsTaskScheduler(TaskScheduler sockJsTaskScheduler) { @@ -174,14 +174,16 @@ public class WebSocketMessageBrokerStats { * Get stats about the executor processing incoming messages from WebSocket clients. */ public String getClientInboundExecutorStatsInfo() { - return (this.inboundChannelExecutor != null ? getExecutorStatsInfo(this.inboundChannelExecutor) : "null"); + return (this.inboundChannelExecutor != null ? + getExecutorStatsInfo(this.inboundChannelExecutor) : "null"); } /** * Get stats about the executor processing outgoing messages to WebSocket clients. */ public String getClientOutboundExecutorStatsInfo() { - return (this.outboundChannelExecutor != null ? getExecutorStatsInfo(this.outboundChannelExecutor) : "null"); + return (this.outboundChannelExecutor != null ? + getExecutorStatsInfo(this.outboundChannelExecutor) : "null"); } /** @@ -201,6 +203,8 @@ public class WebSocketMessageBrokerStats { } private String getExecutorStatsInfo(Executor executor) { + executor = executor instanceof ThreadPoolTaskExecutor ? + ((ThreadPoolTaskExecutor) executor).getThreadPoolExecutor() : executor; String str = executor.toString(); return str.substring(str.indexOf("pool"), str.length() - 1); }