diff --git a/README.md b/README.md index 9646d17e..8a6b8fbe 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,7 @@ XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是 - 30、跨平台:原生提供通用HTTP任务Handler(Bean任务,"HttpJobHandler");业务方只需要提供HTTP链接即可,不限制语言、平台; - 31、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文; - 32、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用; +- 33、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;; ## Development diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 70790885..38d6662d 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -47,6 +47,7 @@ XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是 - 30、跨平台:原生提供通用HTTP任务Handler(Bean任务,"HttpJobHandler");业务方只需要提供HTTP链接即可,不限制语言、平台; - 31、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文; - 32、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用; +- 33、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;; ### 1.3 发展 @@ -1442,9 +1443,9 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 19、执行器优雅停机优化; - 20、连接池配置优化,增强连接有效性验证; - 21、JobHandler#msg长度限制,修复异常情况下日志超长导致内存溢出的问题; -- 22、[迭代中]任务线程隔离: - - 执行器测异步响应,不存在阻塞不需要隔离; - - 调度中心共用单一调度线程池,可能导致调度阻塞需要线程隔离;调度线程池拆分为Fast/Slow两个,针对调度较慢的执行器地址请求,降级使用Slow线程池;考虑是否可以任务级隔离线程池; +- 22、Quartz触发线程池废弃并替换为 "XxlJobThreadPool",降低线程切换、内存占用带来的消耗,提高调度性能; +- 23、调度线程池隔离,拆分为"Fast"和"Slow"两个线程池,1分钟窗口期内任务耗时达500ms超过10次,该窗口期内判定为慢任务,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性; + ### TODO LIST - 1、任务分片路由:分片采用一致性Hash算法计算出尽量稳定的分片顺序,即使注册机器存在波动也不会引起分批分片顺序大的波动;目前采用IP自然排序,可以满足需求,待定; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/quartz/XxlJobThreadPool.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/quartz/XxlJobThreadPool.java new file mode 100644 index 00000000..07d44f1c --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/quartz/XxlJobThreadPool.java @@ -0,0 +1,58 @@ +package com.xxl.job.admin.core.quartz; + +import org.quartz.SchedulerConfigException; +import org.quartz.spi.ThreadPool; + +/** + * single thread pool, for async trigger + * + * @author xuxueli 2019-03-06 + */ +public class XxlJobThreadPool implements ThreadPool { + + @Override + public boolean runInThread(Runnable runnable) { + + // async run + runnable.run(); + return true; + + //return false; + } + + @Override + public int blockForAvailableThreads() { + return 1; + } + + @Override + public void initialize() throws SchedulerConfigException { + + } + + @Override + public void shutdown(boolean waitForJobsToComplete) { + + } + + @Override + public int getPoolSize() { + return 1; + } + + @Override + public void setInstanceId(String schedInstId) { + + } + + @Override + public void setInstanceName(String schedName) { + + } + + // support + public void setThreadCount(int count) { + // + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index 7baef43e..7a3a733c 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -5,10 +5,9 @@ import com.xxl.job.admin.core.trigger.XxlJobTrigger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * job trigger thread pool helper @@ -21,32 +20,91 @@ public class JobTriggerPoolHelper { // ---------------------- trigger pool ---------------------- - private ThreadPoolExecutor triggerPool = new ThreadPoolExecutor( - 32, - 256, + // fast/slow thread pool + private ThreadPoolExecutor fastTriggerPool = new ThreadPoolExecutor( + 8, + 200, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-triggerPool-" + r.hashCode()); + return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); } }); + private ThreadPoolExecutor slowTriggerPool = new ThreadPoolExecutor( + 0, + 100, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(2000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); + } + }); + + + // job timeout count + private volatile long minTim = System.currentTimeMillis()/60000; // ms > min + private volatile Map jobTimeoutCountMap = new ConcurrentHashMap<>(); + + /** + * add trigger + */ public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) { - triggerPool.execute(new Runnable() { + + // choose thread pool + ThreadPoolExecutor triggerPool_ = fastTriggerPool; + AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); + if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min + triggerPool_ = slowTriggerPool; + } + + // trigger + triggerPool_.execute(new Runnable() { @Override public void run() { - XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); + + long start = System.currentTimeMillis(); + + try { + // do trigger + XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + + // check timeout-count-map + long minTim_now = System.currentTimeMillis()/60000; + if (minTim != minTim_now) { + minTim = minTim_now; + jobTimeoutCountMap.clear(); + } + + // incr timeout-count-map + long cost = System.currentTimeMillis()-start; + if (cost > 500) { // ob-timeout threshold 500ms + AtomicInteger timeoutCount = jobTimeoutCountMap.put(jobId, new AtomicInteger(1)); + if (timeoutCount != null) { + timeoutCount.incrementAndGet(); + } + } + + } + } }); } public void stop() { //triggerPool.shutdown(); - triggerPool.shutdownNow(); + fastTriggerPool.shutdownNow(); + slowTriggerPool.shutdownNow(); logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); } diff --git a/xxl-job-admin/src/main/resources/quartz.properties b/xxl-job-admin/src/main/resources/quartz.properties index 7a2c53d1..ebe1192b 100644 --- a/xxl-job-admin/src/main/resources/quartz.properties +++ b/xxl-job-admin/src/main/resources/quartz.properties @@ -9,16 +9,19 @@ org.quartz.scheduler.rmi.export: false org.quartz.scheduler.rmi.proxy: false org.quartz.scheduler.wrapJobExecutionInUserTransaction: false -org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool -org.quartz.threadPool.threadCount: 50 -org.quartz.threadPool.threadPriority: 5 -org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true +#org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool +#org.quartz.threadPool.threadCount: 5 +#org.quartz.threadPool.threadPriority: 5 +#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true org.quartz.jobStore.misfireThreshold: 60000 org.quartz.jobStore.maxMisfiresToHandleAtATime: 1 #org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore +# for async trigger +org.quartz.threadPool.class: com.xxl.job.admin.core.quartz.XxlJobThreadPool + # for cluster org.quartz.jobStore.tablePrefix: XXL_JOB_QRTZ_ org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX