parent
eb9a3f0e69
commit
d30a2fcf39
14 changed files with 301 additions and 233 deletions
@ -0,0 +1,99 @@ |
||||
package com.xxl.job.admin.core.handle; |
||||
|
||||
import com.xxl.job.admin.core.conf.XxlJobAdminConfig; |
||||
import com.xxl.job.admin.core.model.XxlJobInfo; |
||||
import com.xxl.job.admin.core.model.XxlJobLog; |
||||
import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; |
||||
import com.xxl.job.admin.core.trigger.TriggerTypeEnum; |
||||
import com.xxl.job.admin.core.util.I18nUtil; |
||||
import com.xxl.job.core.biz.model.ReturnT; |
||||
import com.xxl.job.core.handler.IJobHandler; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.text.MessageFormat; |
||||
|
||||
/** |
||||
* @author xuxueli 2020-10-30 20:43:10 |
||||
*/ |
||||
public class XxlJobPostHandleHelper { |
||||
private static Logger logger = LoggerFactory.getLogger(XxlJobPostHandleHelper.class); |
||||
|
||||
/** |
||||
* common fresh handle entrance (limit only once) |
||||
* |
||||
* @param xxlJobLog |
||||
* @return |
||||
*/ |
||||
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { |
||||
|
||||
// finish
|
||||
finishJob(xxlJobLog); |
||||
|
||||
// text最大64kb 避免长度过长
|
||||
if (xxlJobLog.getHandleMsg().length() > 15000) { |
||||
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) ); |
||||
} |
||||
|
||||
// fresh handle
|
||||
return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* do somethind to finish job |
||||
*/ |
||||
private static void finishJob(XxlJobLog xxlJobLog){ |
||||
|
||||
// 1、handle success, to trigger child job
|
||||
String triggerChildMsg = null; |
||||
if (IJobHandler.SUCCESS.getCode() == xxlJobLog.getHandleCode()) { |
||||
XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId()); |
||||
if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) { |
||||
triggerChildMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>"; |
||||
|
||||
String[] childJobIds = xxlJobInfo.getChildJobId().split(","); |
||||
for (int i = 0; i < childJobIds.length; i++) { |
||||
int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1; |
||||
if (childJobId > 0) { |
||||
|
||||
JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null); |
||||
ReturnT<String> triggerChildResult = ReturnT.SUCCESS; |
||||
|
||||
// add msg
|
||||
triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), |
||||
(i+1), |
||||
childJobIds.length, |
||||
childJobIds[i], |
||||
(triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")), |
||||
triggerChildResult.getMsg()); |
||||
} else { |
||||
triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), |
||||
(i+1), |
||||
childJobIds.length, |
||||
childJobIds[i]); |
||||
} |
||||
} |
||||
|
||||
} |
||||
} |
||||
|
||||
if (triggerChildMsg != null) { |
||||
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg ); |
||||
} |
||||
|
||||
// 2、fix_delay trigger next
|
||||
// on the way
|
||||
|
||||
} |
||||
|
||||
private static boolean isNumeric(String str){ |
||||
try { |
||||
int result = Integer.valueOf(str); |
||||
return true; |
||||
} catch (NumberFormatException e) { |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
} |
@ -0,0 +1,185 @@ |
||||
package com.xxl.job.admin.core.thread; |
||||
|
||||
import com.xxl.job.admin.core.conf.XxlJobAdminConfig; |
||||
import com.xxl.job.admin.core.handle.XxlJobPostHandleHelper; |
||||
import com.xxl.job.admin.core.model.XxlJobLog; |
||||
import com.xxl.job.admin.core.util.I18nUtil; |
||||
import com.xxl.job.core.biz.model.HandleCallbackParam; |
||||
import com.xxl.job.core.biz.model.ReturnT; |
||||
import com.xxl.job.core.handler.IJobHandler; |
||||
import com.xxl.job.core.util.DateUtil; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
import java.util.concurrent.*; |
||||
|
||||
/** |
||||
* job lose-monitor instance |
||||
* |
||||
* @author xuxueli 2015-9-1 18:05:56 |
||||
*/ |
||||
public class JobLogHelper { |
||||
private static Logger logger = LoggerFactory.getLogger(JobLogHelper.class); |
||||
|
||||
private static JobLogHelper instance = new JobLogHelper(); |
||||
public static JobLogHelper getInstance(){ |
||||
return instance; |
||||
} |
||||
|
||||
// ---------------------- monitor ----------------------
|
||||
|
||||
private ThreadPoolExecutor callbackThreadPool = null; |
||||
private Thread monitorThread; |
||||
private volatile boolean toStop = false; |
||||
public void start(){ |
||||
|
||||
// for callback
|
||||
callbackThreadPool = new ThreadPoolExecutor( |
||||
2, |
||||
20, |
||||
30L, |
||||
TimeUnit.SECONDS, |
||||
new LinkedBlockingQueue<Runnable>(3000), |
||||
new ThreadFactory() { |
||||
@Override |
||||
public Thread newThread(Runnable r) { |
||||
return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()); |
||||
} |
||||
}, |
||||
new RejectedExecutionHandler() { |
||||
@Override |
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { |
||||
r.run(); |
||||
logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now)."); |
||||
} |
||||
}); |
||||
|
||||
|
||||
// for monitor
|
||||
monitorThread = new Thread(new Runnable() { |
||||
|
||||
@Override |
||||
public void run() { |
||||
|
||||
// wait for JobTriggerPoolHelper-init
|
||||
try { |
||||
TimeUnit.MILLISECONDS.sleep(50); |
||||
} catch (InterruptedException e) { |
||||
if (!toStop) { |
||||
logger.error(e.getMessage(), e); |
||||
} |
||||
} |
||||
|
||||
// monitor
|
||||
while (!toStop) { |
||||
try { |
||||
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
|
||||
Date losedTime = DateUtil.addMinutes(new Date(), -10); |
||||
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); |
||||
|
||||
if (losedJobIds!=null && losedJobIds.size()>0) { |
||||
for (Long logId: losedJobIds) { |
||||
|
||||
XxlJobLog jobLog = new XxlJobLog(); |
||||
jobLog.setId(logId); |
||||
|
||||
jobLog.setHandleTime(new Date()); |
||||
jobLog.setHandleCode(ReturnT.FAIL_CODE); |
||||
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); |
||||
|
||||
XxlJobPostHandleHelper.updateHandleInfoAndFinish(jobLog); |
||||
} |
||||
|
||||
} |
||||
} catch (Exception e) { |
||||
if (!toStop) { |
||||
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); |
||||
} |
||||
} |
||||
|
||||
try { |
||||
TimeUnit.SECONDS.sleep(60); |
||||
} catch (Exception e) { |
||||
if (!toStop) { |
||||
logger.error(e.getMessage(), e); |
||||
} |
||||
} |
||||
|
||||
} |
||||
|
||||
logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); |
||||
|
||||
} |
||||
}); |
||||
monitorThread.setDaemon(true); |
||||
monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); |
||||
monitorThread.start(); |
||||
} |
||||
|
||||
public void toStop(){ |
||||
toStop = true; |
||||
|
||||
// stop registryOrRemoveThreadPool
|
||||
callbackThreadPool.shutdownNow(); |
||||
|
||||
// stop monitorThread (interrupt and wait)
|
||||
monitorThread.interrupt(); |
||||
try { |
||||
monitorThread.join(); |
||||
} catch (InterruptedException e) { |
||||
logger.error(e.getMessage(), e); |
||||
} |
||||
} |
||||
|
||||
|
||||
// ---------------------- helper ----------------------
|
||||
|
||||
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { |
||||
|
||||
callbackThreadPool.execute(new Runnable() { |
||||
@Override |
||||
public void run() { |
||||
for (HandleCallbackParam handleCallbackParam: callbackParamList) { |
||||
ReturnT<String> callbackResult = callback(handleCallbackParam); |
||||
logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", |
||||
(callbackResult.getCode()== IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult); |
||||
} |
||||
} |
||||
}); |
||||
|
||||
return ReturnT.SUCCESS; |
||||
} |
||||
|
||||
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) { |
||||
// valid log item
|
||||
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); |
||||
if (log == null) { |
||||
return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found."); |
||||
} |
||||
if (log.getHandleCode() > 0) { |
||||
return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc
|
||||
} |
||||
|
||||
// handle msg
|
||||
StringBuffer handleMsg = new StringBuffer(); |
||||
if (log.getHandleMsg()!=null) { |
||||
handleMsg.append(log.getHandleMsg()).append("<br>"); |
||||
} |
||||
if (handleCallbackParam.getExecuteResult().getMsg() != null) { |
||||
handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); |
||||
} |
||||
|
||||
// success, save log
|
||||
log.setHandleTime(new Date()); |
||||
log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); |
||||
log.setHandleMsg(handleMsg.toString()); |
||||
XxlJobPostHandleHelper.updateHandleInfoAndFinish(log); |
||||
|
||||
return ReturnT.SUCCESS; |
||||
} |
||||
|
||||
|
||||
|
||||
} |
@ -1,104 +0,0 @@ |
||||
package com.xxl.job.admin.core.thread; |
||||
|
||||
import com.xxl.job.admin.core.conf.XxlJobAdminConfig; |
||||
import com.xxl.job.admin.core.model.XxlJobLog; |
||||
import com.xxl.job.admin.core.util.I18nUtil; |
||||
import com.xxl.job.core.biz.model.ReturnT; |
||||
import com.xxl.job.core.util.DateUtil; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
/** |
||||
* job lose-monitor instance |
||||
* |
||||
* @author xuxueli 2015-9-1 18:05:56 |
||||
*/ |
||||
public class JobLosedMonitorHelper { |
||||
private static Logger logger = LoggerFactory.getLogger(JobLosedMonitorHelper.class); |
||||
|
||||
private static JobLosedMonitorHelper instance = new JobLosedMonitorHelper(); |
||||
public static JobLosedMonitorHelper getInstance(){ |
||||
return instance; |
||||
} |
||||
|
||||
// ---------------------- monitor ----------------------
|
||||
|
||||
private Thread monitorThread; |
||||
private volatile boolean toStop = false; |
||||
public void start(){ |
||||
monitorThread = new Thread(new Runnable() { |
||||
|
||||
@Override |
||||
public void run() { |
||||
|
||||
// wait for JobTriggerPoolHelper-init
|
||||
try { |
||||
TimeUnit.MILLISECONDS.sleep(50); |
||||
} catch (InterruptedException e) { |
||||
if (!toStop) { |
||||
logger.error(e.getMessage(), e); |
||||
} |
||||
} |
||||
|
||||
// monitor
|
||||
while (!toStop) { |
||||
try { |
||||
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
|
||||
Date losedTime = DateUtil.addMinutes(new Date(), -10); |
||||
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); |
||||
|
||||
if (losedJobIds!=null && losedJobIds.size()>0) { |
||||
for (Long logId: losedJobIds) { |
||||
|
||||
XxlJobLog jobLog = new XxlJobLog(); |
||||
jobLog.setId(logId); |
||||
|
||||
jobLog.setHandleTime(new Date()); |
||||
jobLog.setHandleCode(ReturnT.FAIL_CODE); |
||||
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") ); |
||||
|
||||
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog); |
||||
} |
||||
|
||||
} |
||||
} catch (Exception e) { |
||||
if (!toStop) { |
||||
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); |
||||
} |
||||
} |
||||
|
||||
try { |
||||
TimeUnit.SECONDS.sleep(60); |
||||
} catch (Exception e) { |
||||
if (!toStop) { |
||||
logger.error(e.getMessage(), e); |
||||
} |
||||
} |
||||
|
||||
} |
||||
|
||||
logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop"); |
||||
|
||||
} |
||||
}); |
||||
monitorThread.setDaemon(true); |
||||
monitorThread.setName("xxl-job, admin JobLosedMonitorHelper"); |
||||
monitorThread.start(); |
||||
} |
||||
|
||||
public void toStop(){ |
||||
toStop = true; |
||||
// interrupt and wait
|
||||
monitorThread.interrupt(); |
||||
try { |
||||
monitorThread.join(); |
||||
} catch (InterruptedException e) { |
||||
logger.error(e.getMessage(), e); |
||||
} |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue