调度中心与执行器通讯规范为双向restful,方便跨语言,以及第三方执行器实现;通讯组件xxl-rpc方案调整为Jetty+Gson方案;

master
xuxueli 5 years ago
parent bb62887f52
commit c511a94561
  1. 238
      doc/XXL-JOB官方文档.md
  2. 50
      xxl-job-admin/src/test/java/com/xxl/job/executorbiz/ExecutorBizTest.java
  3. 14
      xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java
  4. 10
      xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java
  5. 42
      xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java

@ -50,14 +50,13 @@ XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅
- 27、推送maven中央仓库: 将会把最新稳定版推送到maven中央仓库, 方便用户接入和使用;
- 28、运行报表:支持实时查看运行数据,如任务数量、调度次数、执行器数量等;以及调度报表,如调度日期分布图,调度成功分布图等;
- 29、全异步:任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰,理论上支持任意时长任务的运行;
- 30、跨平台:原生提供通用HTTP任务Handler(Bean任务,"HttpJobHandler");业务方只需要提供HTTP链接即可,不限制语言、平台
- 30、跨语言:调度中心与执行器提供语言无关的 RESTful API 服务,第三方任意语言可据此对接调度中心或者实现执行器。除此之外,还提供了 “多任务模式”和“httpJobHandler”等其他跨语言方案
- 31、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文;
- 32、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用;
- 33、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;
- 34、用户管理:支持在线管理系统用户,存在管理员、普通用户两种角色;
- 35、权限控制:执行器维度进行权限控制,管理员拥有全量权限,普通用户需要分配执行器权限后才允许相关操作;
### 1.4 发展
于2015年中,我在github上创建XXL-JOB项目仓库并提交第一个commit,随之进行系统结构设计,UI选型,交互设计……
@ -1243,7 +1242,7 @@ echo "分片总数 total = $3"
### 5.15 跨语言
XXL-JOB是一个跨语言的任务调度平台,主要体现在如下几个方面:
- 1、RESTful API:调度中心与执行器之间约定一种基于 restful+json 的通讯协议,并提供相应的API服务。第三方任意语言可据此对接调度中心或者实现执行器。(可参考章节 “调度中心/执行器 RESTful API” )
- 1、RESTful API:调度中心与执行器提供语言无关的 RESTful API 服务,第三方任意语言可据此对接调度中心或者实现执行器。(可参考章节 “调度中心/执行器 RESTful API” )
- 2、多任务模式:提供Java、Python、PHP……等十来种任务模式,可参考章节 “5.5 任务 "运行模式" ”;理论上可扩展任意语言任务模式;
- 2、提供基于HTTP的任务Handler(Bean任务,JobHandler="httpJobHandler");业务方只需要提供HTTP链接等相关信息即可,不限制语言、平台;(可参考章节 “原生内置Bean模式任务” )
@ -1278,32 +1277,227 @@ XXL-JOB日志主要包含如下两部分,均支持日志自动清理,说明
## 六、调度中心/执行器 RESTful API
Java语言应用,可以直接通过官方提供的调度中心与执行器,方便快速的接入和使用调度中心,可以上文 “快速入门” 章节。
非Java语言,可借助 XXL-JOB 提供的标准 RESTful API 方便的实现多语言支持。
- 调度中心 RESTful API:除管理平台之外,可据此通过API服务管理在线任务。
- 执行器 RESTful API :执行器标准API,可参考该API实现非Java语言的个性化执行器.
### 6.1 调度中心 RESTful API
调度中心提供了API服务,主要分为两种类型:
#### 5.11.1 提供给执行器的API服务:
1、任务结果回调服务;
2、执行器注册服务;
3、执行器注册摘除服务;
4、触发任务单次执行服务,支持任务根据业务事件触发;
API服务位置:com.xxl.job.core.biz.AdminBiz ( com.xxl.job.admin.controller.JobApiController )
API服务请求参考代码:com.xxl.job.adminbiz.AdminBizTest
#### a、任务回调
```
说明:任务执行结果回调
------
地址格式:{调度中心跟地址}/callback
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
[{
"logId":1,
"logDateTim":0,
"executeResult":{
"code": 200, // 200 表示任务执行正常,500表示失败
"msg": null
}
}]
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
```
API服务位置:com.xxl.job.core.biz.AdminBiz.java
API服务请求参考代码:com.xxl.job.adminbiz.AdminBizTest.java
#### b、执行器注册
```
说明:执行器注册,调度中心会实时感知注册成功的执行器并发起任务调度
------
地址格式:{调度中心跟地址}/registry
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
{
"registryGroup":"EXECUTOR",
"registryKey":"xxl-job-executor-example", // 执行器AppName
"registryValue":"http://127.0.0.1:9999/" // 执行器地址,内置服务跟地址
}
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
```
#### c、执行器注册摘除
```
说明:执行器注册摘除,注册摘除后的执行器不参与任务调度与执行
------
地址格式:{调度中心跟地址}/registryRemove
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
{
"registryGroup":"EXECUTOR",
"registryKey":"xxl-job-executor-example", // 执行器AppName
"registryValue":"http://127.0.0.1:9999/" // 执行器地址,内置服务跟地址
}
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
```
### 6.2 执行器 RESTful API
### 5.12 执行器API服务
执行器提供了API服务,供调度中心选择使用,目前提供的API服务有:
API服务位置:com.xxl.job.core.biz.ExecutorBiz
API服务请求参考代码:com.xxl.job.executorbiz.ExecutorBizTest
#### a、心跳检测
```
说明:心跳检测,检测执行器是否在线
------
地址格式:{执行器内嵌服务跟地址}/beat
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
```
#### b、忙碌检测
```
说明:忙碌检测,检测执行器上该任务是否空闲
------
地址格式:{执行器内嵌服务跟地址}/idleBeat
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
1 // 任务ID
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
```
#### c、触发任务
```
说明:触发任务执行
------
地址格式:{执行器内嵌服务跟地址}/run
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
{
"jobId":1, // 任务ID
"executorHandler":"demoJobHandler", // 任务标识
"executorBlockStrategy":"COVER_EARLY", // 任务阻塞策略,可选值参考 com.xxl.job.core.enums.ExecutorBlockStrategyEnum
"executorTimeout":0, // 任务超时时间,单位秒,大于零时生效
"logId":1, // 本次调度日志ID
"logDateTime":1586629003729, // 本次调度日志时间
"glueType":"BEAN", // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum
"glueUpdatetime":1586629003727, // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新
"broadcastIndex":0, // 分片参数:当前分片
"broadcastTotal":0 // 分片参数:总分片
}
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
```
#### f、终止任务
```
说明:终止任务
------
地址格式:{执行器内嵌服务跟地址}/kill
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
1 // 任务ID
响应数据格式:
{
"code": 200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
}
```
#### d、查看执行日志
```
说明:终止任务
------
地址格式:{执行器内嵌服务跟地址}/log
Header:
XXL-JOB-ACCESS-TOKEN : {请求令牌}
请求数据格式如下,放置在 RequestBody 中,JSON格式:
{
"logDateTim":0, // 本次调度日志时间
"logId":0, // 本次调度日志ID
"fromLineNum":0 // 日志开始行号,滚动加载日志
}
响应数据格式:
{
"code":200, // 200 表示正常、其他失败
"msg": null // 错误提示消息
"content":{
"fromLineNum":0, // 本次请求,日志开始行数
"toLineNum":100, // 本次请求,日志结束行号
"logContent":"xxx", // 本次请求日志内容
"isEnd":true // 日志是否全部加载完
}
}
```
1、心跳检测:调度中心使用
2、忙碌检测:调度中心使用
3、触发任务执行:调度中心使用;本地进行任务开发时,可使用该API服务模拟触发任务;
4、获取Rolling Log:调度中心使用
5、终止任务:调度中心使用
API服务位置:com.xxl.job.core.biz.ExecutorBiz
API服务请求参考代码:com.xxl.job.executor.ExecutorBizTest
## 七、版本更新日志
@ -1750,7 +1944,7 @@ data: post-data
- 20、任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
- 21、执行备注消息长度限制,修复数据超长无法存储导致导致回调失败的问题;
- 22、一致性哈希路由策略优化:默认虚拟节点数量调整为100,提高路由的均衡性;
- 23、调度中心与执行器通讯规范为双向restful,方便跨语言,以及第三方执行器实现;通讯组件xxl-rpc方案调整为Jetty+Gson方案;
- 23、RESTful API:调度中心与执行器提供语言无关的 RESTful API 服务,第三方任意语言可据此对接调度中心或者实现执行器。
注意:XxlJobSpringExecutor组件个别字段调整:“appName” 调整为 “appname” ,升级时该组件时需要注意;
### TODO LIST

@ -1,4 +1,4 @@
package com.xxl.job.executor;
package com.xxl.job.executorbiz;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.client.ExecutorBizClient;
@ -12,7 +12,7 @@ import org.junit.Assert;
import org.junit.Test;
/**
* executor-api client, test
* executor api test
*
* Created by xuxueli on 17/5/12.
*/
@ -51,6 +51,29 @@ public class ExecutorBizTest {
Assert.assertEquals("job thread is running or has trigger queue.", retval.getMsg());
}
@Test
public void run(){
ExecutorBiz executorBiz = new ExecutorBizClient(addressUrl, accessToken);
// trigger data
final TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(1);
triggerParam.setExecutorHandler("demoJobHandler");
triggerParam.setExecutorParams(null);
triggerParam.setExecutorBlockStrategy(ExecutorBlockStrategyEnum.COVER_EARLY.name());
triggerParam.setGlueType(GlueTypeEnum.BEAN.name());
triggerParam.setGlueSource(null);
triggerParam.setGlueUpdatetime(System.currentTimeMillis());
triggerParam.setLogId(1);
triggerParam.setLogDateTime(System.currentTimeMillis());
// Act
final ReturnT<String> retval = executorBiz.run(triggerParam);
// Assert result
Assert.assertNotNull(retval);
}
@Test
public void kill(){
ExecutorBiz executorBiz = new ExecutorBizClient(addressUrl, accessToken);
@ -82,27 +105,4 @@ public class ExecutorBizTest {
Assert.assertNotNull(retval);
}
@Test
public void run(){
ExecutorBiz executorBiz = new ExecutorBizClient(addressUrl, accessToken);
// trigger data
final TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(1);
triggerParam.setExecutorHandler("demoJobHandler");
triggerParam.setExecutorParams(null);
triggerParam.setExecutorBlockStrategy(ExecutorBlockStrategyEnum.COVER_EARLY.name());
triggerParam.setGlueType(GlueTypeEnum.BEAN.name());
triggerParam.setGlueSource(null);
triggerParam.setGlueUpdatetime(System.currentTimeMillis());
triggerParam.setLogId(1);
triggerParam.setLogDateTime(System.currentTimeMillis());
// Act
final ReturnT<String> retval = executorBiz.run(triggerParam);
// Assert result
Assert.assertNotNull(retval);
}
}

@ -24,6 +24,13 @@ public interface ExecutorBiz {
*/
public ReturnT<String> idleBeat(int jobId);
/**
* run
* @param triggerParam
* @return
*/
public ReturnT<String> run(TriggerParam triggerParam);
/**
* kill
* @param jobId
@ -38,11 +45,4 @@ public interface ExecutorBiz {
*/
public ReturnT<LogResult> log(LogParam logParam);
/**
* run
* @param triggerParam
* @return
*/
public ReturnT<String> run(TriggerParam triggerParam);
}

@ -36,10 +36,16 @@ public class ExecutorBizClient implements ExecutorBiz {
return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, null, String.class);
}
@Override
public ReturnT<String> idleBeat(int jobId){
return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, jobId, String.class);
}
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
@Override
public ReturnT<String> kill(int jobId) {
return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, jobId, String.class);
@ -50,8 +56,4 @@ public class ExecutorBizClient implements ExecutorBiz {
return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class);
}
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
}

@ -46,27 +46,6 @@ public class ExecutorBizImpl implements ExecutorBiz {
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> kill(int jobId) {
// kill handlerThread, and create new one
JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
if (jobThread != null) {
XxlJobExecutor.removeJobThread(jobId, "scheduling center kill job.");
return ReturnT.SUCCESS;
}
return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread already killed.");
}
@Override
public ReturnT<LogResult> log(LogParam logParam) {
// log filename: logPath/yyyy-MM-dd/9999.log
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId());
LogResult logResult = XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum());
return new ReturnT<LogResult>(logResult);
}
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
@ -172,4 +151,25 @@ public class ExecutorBizImpl implements ExecutorBiz {
return pushResult;
}
@Override
public ReturnT<String> kill(int jobId) {
// kill handlerThread, and create new one
JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
if (jobThread != null) {
XxlJobExecutor.removeJobThread(jobId, "scheduling center kill job.");
return ReturnT.SUCCESS;
}
return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread already killed.");
}
@Override
public ReturnT<LogResult> log(LogParam logParam) {
// log filename: logPath/yyyy-MM-dd/9999.log
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId());
LogResult logResult = XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum());
return new ReturnT<LogResult>(logResult);
}
}

Loading…
Cancel
Save