新增任务属性 "XxlJobContent" ,统一维护任务上下文信息,包括任务ID、分片参数等,方便运行时存取任务相关信息;

- 废弃 "ShardingUtil" 组件:改用 "XxlJobContext.getXxlJobContext().getShardIndex()/getShardTotal();" 获取分片参数;
master
xuxueli 5 years ago
parent 668411f8b3
commit e17797888c
  1. 3
      doc/XXL-JOB-English-Documentation.md
  2. 6
      doc/XXL-JOB官方文档.md
  3. 68
      xxl-job-core/src/main/java/com/xxl/job/core/context/XxlJobContext.java
  4. 7
      xxl-job-core/src/main/java/com/xxl/job/core/handler/impl/ScriptJobHandler.java
  5. 9
      xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java
  6. 92
      xxl-job-core/src/main/java/com/xxl/job/core/util/ShardingUtil.java
  7. 12
      xxl-job-executor-samples/xxl-job-executor-sample-frameless/src/main/java/com/xuxueli/executor/sample/frameless/jobhandler/ShardingJobHandler.java
  8. 12
      xxl-job-executor-samples/xxl-job-executor-sample-jboot/src/main/java/com/xuxueli/executor/sample/jboot/jobhandler/ShardingJobHandler.java
  9. 12
      xxl-job-executor-samples/xxl-job-executor-sample-jfinal/src/main/java/com/xuxueli/executor/sample/jfinal/jobhandler/ShardingJobHandler.java
  10. 12
      xxl-job-executor-samples/xxl-job-executor-sample-nutz/src/main/java/com/xuxueli/executor/sample/nutz/jobhandler/ShardingJobHandler.java
  11. 12
      xxl-job-executor-samples/xxl-job-executor-sample-spring/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java
  12. 12
      xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java

@ -974,7 +974,8 @@ When “分片广播” is selected as route policy in executor cluster, one tas
The develop process of "分片广播" is the same as general task, The difference is that you can get slice parameters,code as shown below(go and see ShardingJobHandler in execuotr example ): The develop process of "分片广播" is the same as general task, The difference is that you can get slice parameters,code as shown below(go and see ShardingJobHandler in execuotr example ):
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();
This slice parameter object has two properties: This slice parameter object has two properties:

@ -1207,7 +1207,8 @@ XXL-JOB会为每次调度请求生成一个单独的日志文件,需要通过
- Java语言任务获取分片参数方式:BEAN、GLUE模式(Java) - Java语言任务获取分片参数方式:BEAN、GLUE模式(Java)
``` ```
// 可参考Sample示例执行器中的示例任务"ShardingJobHandler"了解试用 // 可参考Sample示例执行器中的示例任务"ShardingJobHandler"了解试用
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();
``` ```
- 脚本语言任务获取分片参数方式:GLUE模式(Shell)、GLUE模式(Python)、GLUE模式(Nodejs) - 脚本语言任务获取分片参数方式:GLUE模式(Shell)、GLUE模式(Python)、GLUE模式(Nodejs)
``` ```
@ -1990,7 +1991,8 @@ data: post-data
- 3、邮箱告警配置优化:将"spring.mail.from"与"spring.mail.username"属性拆分开,更加灵活的支持一些无密码邮箱服务; - 3、邮箱告警配置优化:将"spring.mail.from"与"spring.mail.username"属性拆分开,更加灵活的支持一些无密码邮箱服务;
- 4、多个项目依赖升级至较新稳定版本,如netty、spring、springboot等; - 4、多个项目依赖升级至较新稳定版本,如netty、spring、springboot等;
- 5、通用HTTP任务Handler(httpJobHandler)优化:修复 "setDoOutput(true)" 导致任务请求GetMethod失效问题; - 5、通用HTTP任务Handler(httpJobHandler)优化:修复 "setDoOutput(true)" 导致任务请求GetMethod失效问题;
- 6、[迭代中] 新增任务属性 "XxlJobContent" ,统一维护任务上下文信息,方便运行时存取任务相关信息; - 6、新增任务属性 "XxlJobContent" ,统一维护任务上下文信息,包括任务ID、分片参数等,方便运行时存取任务相关信息;
- 废弃 "ShardingUtil" 组件:改用 "XxlJobContext.getXxlJobContext().getShardIndex()/getShardTotal();" 获取分片参数;
- 7、[规划中]任务触发参数优化:支持选择 "Cron触发"、"固定间隔时间触发"、"指定时间点触发"、"不选择" 等; - 7、[规划中]任务触发参数优化:支持选择 "Cron触发"、"固定间隔时间触发"、"指定时间点触发"、"不选择" 等;
### 7.32 版本 v2.3.0 Release Notes[规划中] ### 7.32 版本 v2.3.0 Release Notes[规划中]

@ -0,0 +1,68 @@
package com.xxl.job.core.context;
/**
* xxl-job context
*
* @author xuxueli 2020-05-21
* [Dear hj]
*/
public class XxlJobContext {
/**
* job id
*/
private final long jobId;
/**
* job log filename
*/
private final String jobLogFileName;
/**
* shard index
*/
private final int shardIndex;
/**
* shard total
*/
private final int shardTotal;
public XxlJobContext(long jobId, String jobLogFileName, int shardIndex, int shardTotal) {
this.jobId = jobId;
this.jobLogFileName = jobLogFileName;
this.shardIndex = shardIndex;
this.shardTotal = shardTotal;
}
public long getJobId() {
return jobId;
}
public String getJobLogFileName() {
return jobLogFileName;
}
public int getShardIndex() {
return shardIndex;
}
public int getShardTotal() {
return shardTotal;
}
// ---------------------- tool ----------------------
private static InheritableThreadLocal<XxlJobContext> contextHolder = new InheritableThreadLocal<XxlJobContext>();
public static void setXxlJobContext(XxlJobContext xxlJobContext){
contextHolder.set(xxlJobContext);
}
public static XxlJobContext getXxlJobContext(){
return contextHolder.get();
}
}

@ -1,12 +1,12 @@
package com.xxl.job.core.handler.impl; package com.xxl.job.core.handler.impl;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.glue.GlueTypeEnum; import com.xxl.job.core.glue.GlueTypeEnum;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ScriptUtil; import com.xxl.job.core.util.ScriptUtil;
import com.xxl.job.core.util.ShardingUtil;
import java.io.File; import java.io.File;
@ -71,11 +71,10 @@ public class ScriptJobHandler extends IJobHandler {
String logFileName = XxlJobFileAppender.contextHolder.get(); String logFileName = XxlJobFileAppender.contextHolder.get();
// script params:0=param、1=分片序号、2=分片总数 // script params:0=param、1=分片序号、2=分片总数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
String[] scriptParams = new String[3]; String[] scriptParams = new String[3];
scriptParams[0] = param; scriptParams[0] = param;
scriptParams[1] = String.valueOf(shardingVO.getIndex()); scriptParams[1] = String.valueOf(XxlJobContext.getXxlJobContext().getShardIndex());
scriptParams[2] = String.valueOf(shardingVO.getTotal()); scriptParams[2] = String.valueOf(XxlJobContext.getXxlJobContext().getShardTotal());
// invoke // invoke
XxlJobLogger.log("----------- script file:"+ scriptFileName +" -----------"); XxlJobLogger.log("----------- script file:"+ scriptFileName +" -----------");

@ -3,11 +3,11 @@ package com.xxl.job.core.thread;
import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -117,8 +117,13 @@ public class JobThread extends Thread{
// log filename, like "logPath/yyyy-MM-dd/9999.log" // log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobContext.setXxlJobContext(new XxlJobContext(
triggerParam.getLogId(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal()));
XxlJobFileAppender.contextHolder.set(logFileName); XxlJobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
// execute // execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams()); XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());

@ -1,46 +1,46 @@
package com.xxl.job.core.util; //package com.xxl.job.core.util;
//
/** ///**
* sharding vo // * sharding vo
* @author xuxueli 2017-07-25 21:26:38 // * @author xuxueli 2017-07-25 21:26:38
*/ // */
public class ShardingUtil { //public class ShardingUtil {
//
private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>(); // private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();
//
public static class ShardingVO { // public static class ShardingVO {
//
private int index; // sharding index // private int index; // sharding index
private int total; // sharding total // private int total; // sharding total
//
public ShardingVO(int index, int total) { // public ShardingVO(int index, int total) {
this.index = index; // this.index = index;
this.total = total; // this.total = total;
} // }
//
public int getIndex() { // public int getIndex() {
return index; // return index;
} // }
//
public void setIndex(int index) { // public void setIndex(int index) {
this.index = index; // this.index = index;
} // }
//
public int getTotal() { // public int getTotal() {
return total; // return total;
} // }
//
public void setTotal(int total) { // public void setTotal(int total) {
this.total = total; // this.total = total;
} // }
} // }
//
public static void setShardingVo(ShardingVO shardingVo){ // public static void setShardingVo(ShardingVO shardingVo){
contextHolder.set(shardingVo); // contextHolder.set(shardingVo);
} // }
//
public static ShardingVO getShardingVo(){ // public static ShardingVO getShardingVo(){
return contextHolder.get(); // return contextHolder.get();
} // }
//
} //}

@ -1,9 +1,9 @@
package com.xuxueli.executor.sample.frameless.jobhandler; package com.xuxueli.executor.sample.frameless.jobhandler;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
/** /**
* 分片广播任务 * 分片广播任务
@ -16,12 +16,14 @@ public class ShardingJobHandler extends IJobHandler {
public ReturnT<String> execute(String param) throws Exception { public ReturnT<String> execute(String param) throws Exception {
// 分片参数 // 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal()); int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 业务逻辑 // 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) { for (int i = 0; i < shardTotal; i++) {
if (i == shardingVO.getIndex()) { if (i == shardIndex) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i); XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else { } else {
XxlJobLogger.log("第 {} 片, 忽略", i); XxlJobLogger.log("第 {} 片, 忽略", i);

@ -1,9 +1,9 @@
package com.xuxueli.executor.sample.jboot.jobhandler; package com.xuxueli.executor.sample.jboot.jobhandler;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
/** /**
* 分片广播任务 * 分片广播任务
@ -16,12 +16,14 @@ public class ShardingJobHandler extends IJobHandler {
public ReturnT<String> execute(String param) throws Exception { public ReturnT<String> execute(String param) throws Exception {
// 分片参数 // 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal()); int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 业务逻辑 // 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) { for (int i = 0; i < shardTotal; i++) {
if (i == shardingVO.getIndex()) { if (i == shardIndex) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i); XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else { } else {
XxlJobLogger.log("第 {} 片, 忽略", i); XxlJobLogger.log("第 {} 片, 忽略", i);

@ -1,9 +1,9 @@
package com.xuxueli.executor.sample.jfinal.jobhandler; package com.xuxueli.executor.sample.jfinal.jobhandler;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
/** /**
* 分片广播任务 * 分片广播任务
@ -16,12 +16,14 @@ public class ShardingJobHandler extends IJobHandler {
public ReturnT<String> execute(String param) throws Exception { public ReturnT<String> execute(String param) throws Exception {
// 分片参数 // 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal()); int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 业务逻辑 // 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) { for (int i = 0; i < shardTotal; i++) {
if (i == shardingVO.getIndex()) { if (i == shardIndex) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i); XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else { } else {
XxlJobLogger.log("第 {} 片, 忽略", i); XxlJobLogger.log("第 {} 片, 忽略", i);

@ -1,9 +1,9 @@
package com.xuxueli.executor.sample.nutz.jobhandler; package com.xuxueli.executor.sample.nutz.jobhandler;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
/** /**
* 分片广播任务 * 分片广播任务
@ -16,12 +16,14 @@ public class ShardingJobHandler extends IJobHandler {
public ReturnT<String> execute(String param) throws Exception { public ReturnT<String> execute(String param) throws Exception {
// 分片参数 // 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal()); int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 业务逻辑 // 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) { for (int i = 0; i < shardTotal; i++) {
if (i == shardingVO.getIndex()) { if (i == shardIndex) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i); XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else { } else {
XxlJobLogger.log("第 {} 片, 忽略", i); XxlJobLogger.log("第 {} 片, 忽略", i);

@ -1,10 +1,10 @@
package com.xxl.job.executor.service.jobhandler; package com.xxl.job.executor.service.jobhandler;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -55,12 +55,14 @@ public class SampleXxlJob {
public ReturnT<String> shardingJobHandler(String param) throws Exception { public ReturnT<String> shardingJobHandler(String param) throws Exception {
// 分片参数 // 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal()); int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 业务逻辑 // 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) { for (int i = 0; i < shardTotal; i++) {
if (i == shardingVO.getIndex()) { if (i == shardIndex) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i); XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else { } else {
XxlJobLogger.log("第 {} 片, 忽略", i); XxlJobLogger.log("第 {} 片, 忽略", i);

@ -1,10 +1,10 @@
package com.xxl.job.executor.service.jobhandler; package com.xxl.job.executor.service.jobhandler;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -55,12 +55,14 @@ public class SampleXxlJob {
public ReturnT<String> shardingJobHandler(String param) throws Exception { public ReturnT<String> shardingJobHandler(String param) throws Exception {
// 分片参数 // 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal()); int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 业务逻辑 // 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) { for (int i = 0; i < shardTotal; i++) {
if (i == shardingVO.getIndex()) { if (i == shardIndex) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i); XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else { } else {
XxlJobLogger.log("第 {} 片, 忽略", i); XxlJobLogger.log("第 {} 片, 忽略", i);

Loading…
Cancel
Save