From d1d8d61c4915075d7431c3b170625e44e0aa89db Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Thu, 29 Jun 2017 21:42:47 +0800 Subject: [PATCH] =?UTF-8?q?=E9=99=8D=E4=BD=8E=E5=9B=9E=E8=B0=83=E9=A2=91?= =?UTF-8?q?=E7=8E=87=E6=8F=90=E5=8D=87=E6=89=A7=E8=A1=8C=E5=99=A8=E6=80=A7?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 3 ++- .../job/admin/controller/JobApiController.java | 12 +++++++++++- .../job/core/thread/TriggerCallbackThread.java | 15 ++++++++++++--- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index e5d9debc..0ff1e823 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -866,6 +866,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 5、路由策略代码重构; - 6、执行器重复注册问题修复; - 7、任务线程轮空30次后自动销毁,降低低频任务的无效线程消耗。 +- 8、执行器任务执行结果批量回调,降低回调频率提升执行器性能; #### TODO LIST - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限; @@ -876,7 +877,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 6、任务依赖,流程图,子任务+会签任务,各节点日志; - 7、调度任务优先级; - 8、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。 - +- 9、任务执行结果回调失败后重试:待定,防止回调死循环; ## 七、其他 diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java index e2fe2674..6ae94959 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java @@ -24,6 +24,7 @@ import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; import java.text.MessageFormat; import java.util.Date; +import java.util.List; /** * Created by xuxueli on 17/5/10. @@ -43,9 +44,18 @@ public class JobApiController { @RequestMapping(value= AdminApiUtil.CALLBACK, method = RequestMethod.POST, consumes = "application/json") @ResponseBody @PermessionLimit(limit=false) - public ReturnT callback(@RequestBody HandleCallbackParam handleCallbackParam){ + public ReturnT callback(@RequestBody List callbackParamList){ + for (HandleCallbackParam handleCallbackParam: callbackParamList) { + ReturnT callbackResult = callback(handleCallbackParam); + logger.info("JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", + (callbackResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult); + } + + return ReturnT.SUCCESS; + } + private ReturnT callback(HandleCallbackParam handleCallbackParam) { // valid log item XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId()); if (log == null) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 8fdecada..2f4b499d 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -6,6 +6,8 @@ import com.xxl.job.core.util.AdminApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.LinkedBlockingQueue; /** @@ -32,12 +34,19 @@ public class TriggerCallbackThread { try { HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { - // callback + + // callback list + List callbackParamList = new ArrayList(); + int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); + callbackParamList.add(callback); + + // callback, will retry if error try { - ReturnT callbackResult = AdminApiUtil.callApiFailover(AdminApiUtil.CALLBACK, callback); - logger.info(">>>>>>>>>>> xxl-job callback, HandleCallbackParam:{}, callbackResult:{}", new Object[]{callback.toString(), callbackResult.toString()}); + ReturnT callbackResult = AdminApiUtil.callApiFailover(AdminApiUtil.CALLBACK, callbackParamList); + logger.info(">>>>>>>>>>> xxl-job callback, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult}); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job TriggerCallbackThread Exception:", e); + //getInstance().callBackQueue.addAll(callbackParamList); } } } catch (Exception e) {