执行器、调度中心,支持自定义注册IP地址;解决机器多网卡时错误网卡注册的情况;

master
xueli.xue 8 years ago
parent c86d7d1c8e
commit 65cfe57b6b
  1. 1
      README.md
  2. 10
      xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java
  3. 3
      xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java
  4. 5
      xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java
  5. 19
      xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java
  6. 4
      xxl-job-admin/src/main/resources/applicationcontext-xxl-job.xml
  7. 15
      xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java
  8. 2
      xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml

@ -702,6 +702,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
#### 6.10 版本 V1.5.2 特性 #### 6.10 版本 V1.5.2 特性
- 1、IP工具类优化,IP静态缓存; - 1、IP工具类优化,IP静态缓存;
- 2、执行器、调度中心,支持自定义注册IP地址;解决机器多网卡时错误网卡注册的情况;
#### 规划中 #### 规划中
- 1、集群执行器选择规则自定义:单点=选择第一个,随机=随机选择一个; - 1、集群执行器选择规则自定义:单点=选择第一个,随机=随机选择一个;

@ -9,25 +9,15 @@ import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.xxl.job.core.util.IpUtil;
/** /**
* Created by xuxueli on 2016-5-22 11:15:42 * Created by xuxueli on 2016-5-22 11:15:42
*/ */
public class XxlJobLogCallbackServer { public class XxlJobLogCallbackServer {
private static final Logger logger = LoggerFactory.getLogger(XxlJobLogCallbackServer.class); private static final Logger logger = LoggerFactory.getLogger(XxlJobLogCallbackServer.class);
private static String trigger_log_address;
public static String getTrigger_log_address() {
return trigger_log_address;
}
Server server = null; Server server = null;
public void start(int callBackPort) throws Exception { public void start(int callBackPort) throws Exception {
// init address
trigger_log_address = IpUtil.getIpPort(callBackPort);
final int port = Integer.valueOf(callBackPort); final int port = Integer.valueOf(callBackPort);
new Thread(new Runnable() { new Thread(new Runnable() {
@Override @Override

@ -1,6 +1,5 @@
package com.xxl.job.admin.core.jobbean; package com.xxl.job.admin.core.jobbean;
import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer;
import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.model.XxlJobLog;
@ -51,7 +50,7 @@ public class RemoteHttpJobBean extends QuartzJobBean {
if (adminAddressList!=null) { if (adminAddressList!=null) {
adminAddressSet.addAll(adminAddressList); adminAddressSet.addAll(adminAddressList);
} }
adminAddressSet.add(XxlJobLogCallbackServer.getTrigger_log_address()); adminAddressSet.add(DynamicSchedulerUtil.getCallbackAddress());
// trigger request // trigger request
RequestModel requestModel = new RequestModel(); RequestModel requestModel = new RequestModel();

@ -1,6 +1,5 @@
package com.xxl.job.admin.core.thread; package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer;
import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.admin.core.model.XxlJobRegistry;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil; import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
import com.xxl.job.core.registry.RegistHelper; import com.xxl.job.core.registry.RegistHelper;
@ -29,9 +28,9 @@ public class JobRegistryHelper {
while (true) { while (true) {
try { try {
// registry admin // registry admin
int ret = DynamicSchedulerUtil.xxlJobRegistryDao.registryUpdate(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name(), XxlJobLogCallbackServer.getTrigger_log_address()); int ret = DynamicSchedulerUtil.xxlJobRegistryDao.registryUpdate(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name(), DynamicSchedulerUtil.getCallbackAddress());
if (ret < 1) { if (ret < 1) {
DynamicSchedulerUtil.xxlJobRegistryDao.registrySave(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name(), XxlJobLogCallbackServer.getTrigger_log_address()); DynamicSchedulerUtil.xxlJobRegistryDao.registrySave(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name(), DynamicSchedulerUtil.getCallbackAddress());
} }
// fresh registry map // fresh registry map

@ -8,6 +8,7 @@ import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogDao; import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.admin.dao.IXxlJobRegistryDao; import com.xxl.job.admin.dao.IXxlJobRegistryDao;
import com.xxl.job.core.util.IpUtil;
import org.quartz.*; import org.quartz.*;
import org.quartz.Trigger.TriggerState; import org.quartz.Trigger.TriggerState;
import org.quartz.impl.matchers.GroupMatcher; import org.quartz.impl.matchers.GroupMatcher;
@ -35,11 +36,20 @@ public final class DynamicSchedulerUtil implements ApplicationContextAware, Init
DynamicSchedulerUtil.scheduler = scheduler; DynamicSchedulerUtil.scheduler = scheduler;
} }
// trigger callback port // trigger callback address
private String callBackIp;
private int callBackPort = 8888; private int callBackPort = 8888;
private static String callbackAddress;
public void setCallBackIp(String callBackIp) {
this.callBackIp = callBackIp;
}
public void setCallBackPort(int callBackPort) { public void setCallBackPort(int callBackPort) {
this.callBackPort = callBackPort; this.callBackPort = callBackPort;
} }
public static String getCallbackAddress(){
return callbackAddress;
}
// init // init
XxlJobLogCallbackServer xxlJobLogCallbackServer = null; XxlJobLogCallbackServer xxlJobLogCallbackServer = null;
@ -52,6 +62,13 @@ public final class DynamicSchedulerUtil implements ApplicationContextAware, Init
e.printStackTrace(); e.printStackTrace();
} }
// init callbackAddress
if (callBackIp!=null && callBackIp.trim().length()>0) {
callbackAddress = callBackIp.trim().concat(":").concat(String.valueOf(callBackPort));
} else {
callbackAddress = IpUtil.getIpPort(callBackPort);;
}
// init JobRegistryHelper // init JobRegistryHelper
JobRegistryHelper.discover("g", "k"); JobRegistryHelper.discover("g", "k");
} }

@ -4,6 +4,7 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<!-- quartz-调度器 -->
<bean id="quartzScheduler" lazy-init="false" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <bean id="quartzScheduler" lazy-init="false" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="dataSource" ref="dataSource" /> <property name="dataSource" ref="dataSource" />
<property name="autoStartup" value="true" /> <!--自动启动 --> <property name="autoStartup" value="true" /> <!--自动启动 -->
@ -16,6 +17,9 @@
<bean id="dynamicSchedulerUtil" class="com.xxl.job.admin.core.util.DynamicSchedulerUtil" init-method="init" destroy-method="destroy" > <bean id="dynamicSchedulerUtil" class="com.xxl.job.admin.core.util.DynamicSchedulerUtil" init-method="init" destroy-method="destroy" >
<!-- (轻易不要变更“调度器名称”, 任务创建时会绑定该“调度器名称”) --> <!-- (轻易不要变更“调度器名称”, 任务创建时会绑定该“调度器名称”) -->
<property name="scheduler" ref="quartzScheduler"/> <property name="scheduler" ref="quartzScheduler"/>
<!-- 调度中心回调IP[选填],为空则自动获取 -->
<property name="callBackIp" value=""/>
<!-- 调度中心回调端口号 -->
<property name="callBackPort" value="8888"/> <property name="callBackPort" value="8888"/>
</bean> </bean>

@ -26,9 +26,14 @@ import java.util.concurrent.TimeUnit;
public class XxlJobExecutor implements ApplicationContextAware { public class XxlJobExecutor implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
private String ip;
private int port = 9999; private int port = 9999;
private String appName; private String appName;
private RegistHelper registHelper; private RegistHelper registHelper;
public void setIp(String ip) {
this.ip = ip;
}
public void setPort(int port) { public void setPort(int port) {
this.port = port; this.port = port;
} }
@ -94,7 +99,15 @@ public class XxlJobExecutor implements ApplicationContextAware {
public void run() { public void run() {
while (true) { while (true) {
try { try {
String address = IpUtil.getIpPort(port);
// generate addredd = ip:port
String address = null;
if (ip != null && ip.trim().length()>0) {
address = ip.trim().concat(":").concat(String.valueOf(port));
} else {
address = IpUtil.getIpPort(port);
}
registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address); registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address);
TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT); TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT);
} catch (Exception e) { } catch (Exception e) {

@ -14,6 +14,8 @@
<!-- 配置02、执行器 --> <!-- 配置02、执行器 -->
<bean id="xxlJobExecutor" class="com.xxl.job.core.executor.jetty.XxlJobExecutor" init-method="start" destroy-method="destroy" > <bean id="xxlJobExecutor" class="com.xxl.job.core.executor.jetty.XxlJobExecutor" init-method="start" destroy-method="destroy" >
<!-- 执行器IP[选填],为空则自动获取 -->
<property name="ip" value="" />
<!-- 执行器端口号 --> <!-- 执行器端口号 -->
<property name="port" value="9999" /> <property name="port" value="9999" />
<property name="appName" value="xxl-job-executor-example" /> <property name="appName" value="xxl-job-executor-example" />

Loading…
Cancel
Save