Use TaskExecutor instead of ThreadPoolTaskExecutor

Closes gh-22943
master
Rossen Stoyanchev 5 years ago
parent 0b2fcbfe8a
commit e09c5fd9e5
  1. 7
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java
  2. 22
      spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java

@ -28,6 +28,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
@ -139,7 +140,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
public TaskExecutor clientInboundChannelExecutor() {
TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor();
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
executor.setThreadNamePrefix("clientInboundChannel-");
@ -175,7 +176,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
public TaskExecutor clientOutboundChannelExecutor() {
TaskExecutorRegistration reg = getClientOutboundChannelRegistration().taskExecutor();
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
executor.setThreadNamePrefix("clientOutboundChannel-");
@ -211,7 +212,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public ThreadPoolTaskExecutor brokerChannelExecutor() {
public TaskExecutor brokerChannelExecutor() {
ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
ThreadPoolTaskExecutor executor;
if (reg.hasTaskExecutor()) {

@ -20,12 +20,12 @@ import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.scheduling.TaskScheduler;
@ -66,10 +66,10 @@ public class WebSocketMessageBrokerStats {
private StompBrokerRelayMessageHandler stompBrokerRelay;
@Nullable
private ThreadPoolExecutor inboundChannelExecutor;
private TaskExecutor inboundChannelExecutor;
@Nullable
private ThreadPoolExecutor outboundChannelExecutor;
private TaskExecutor outboundChannelExecutor;
@Nullable
private TaskScheduler sockJsTaskScheduler;
@ -106,12 +106,12 @@ public class WebSocketMessageBrokerStats {
this.stompBrokerRelay = stompBrokerRelay;
}
public void setInboundChannelExecutor(ThreadPoolTaskExecutor inboundChannelExecutor) {
this.inboundChannelExecutor = inboundChannelExecutor.getThreadPoolExecutor();
public void setInboundChannelExecutor(TaskExecutor inboundChannelExecutor) {
this.inboundChannelExecutor = inboundChannelExecutor;
}
public void setOutboundChannelExecutor(ThreadPoolTaskExecutor outboundChannelExecutor) {
this.outboundChannelExecutor = outboundChannelExecutor.getThreadPoolExecutor();
public void setOutboundChannelExecutor(TaskExecutor outboundChannelExecutor) {
this.outboundChannelExecutor = outboundChannelExecutor;
}
public void setSockJsTaskScheduler(TaskScheduler sockJsTaskScheduler) {
@ -174,14 +174,16 @@ public class WebSocketMessageBrokerStats {
* Get stats about the executor processing incoming messages from WebSocket clients.
*/
public String getClientInboundExecutorStatsInfo() {
return (this.inboundChannelExecutor != null ? getExecutorStatsInfo(this.inboundChannelExecutor) : "null");
return (this.inboundChannelExecutor != null ?
getExecutorStatsInfo(this.inboundChannelExecutor) : "null");
}
/**
* Get stats about the executor processing outgoing messages to WebSocket clients.
*/
public String getClientOutboundExecutorStatsInfo() {
return (this.outboundChannelExecutor != null ? getExecutorStatsInfo(this.outboundChannelExecutor) : "null");
return (this.outboundChannelExecutor != null ?
getExecutorStatsInfo(this.outboundChannelExecutor) : "null");
}
/**
@ -201,6 +203,8 @@ public class WebSocketMessageBrokerStats {
}
private String getExecutorStatsInfo(Executor executor) {
executor = executor instanceof ThreadPoolTaskExecutor ?
((ThreadPoolTaskExecutor) executor).getThreadPoolExecutor() : executor;
String str = executor.toString();
return str.substring(str.indexOf("pool"), str.length() - 1);
}

Loading…
Cancel
Save