From 8360707bb98e6e1f11125f9c9e19b72a6f55b633 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Mon, 13 Apr 2020 20:03:52 +0800 Subject: [PATCH] biz thread pool --- .../com/xxl/job/core/server/EmbedServer.java | 45 +++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java index 4f344d2e..a3fc9af9 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java @@ -20,7 +20,7 @@ import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * Copy from : https://github.com/xuxueli/xxl-rpc @@ -43,6 +43,25 @@ public class EmbedServer { // param EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); + ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( + 0, + 200, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(2000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode()); + } + }, + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); + } + }); + try { // start server @@ -56,7 +75,7 @@ public class EmbedServer { .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL - .addLast(new EmbedHttpServerHandler(executorBiz, accessToken)); + .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); @@ -121,9 +140,11 @@ public class EmbedServer { private ExecutorBiz executorBiz; private String accessToken; - public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken) { + private ThreadPoolExecutor bizThreadPool; + public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) { this.executorBiz = executorBiz; this.accessToken = accessToken; + this.bizThreadPool = bizThreadPool; } @Override @@ -137,14 +158,20 @@ public class EmbedServer { boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); - // do invoke - Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); + // invoke + bizThreadPool.execute(new Runnable() { + @Override + public void run() { + // do invoke + Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); - // to json - String responseJson = GsonTool.toJson(responseObj); + // to json + String responseJson = GsonTool.toJson(responseObj); - // write response - writeResponse(ctx, keepAlive, responseJson); + // write response + writeResponse(ctx, keepAlive, responseJson); + } + }); } private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {