From 46779d7e6bfde30bf4e6fb4f1106be3d13802a0d Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Mon, 18 Feb 2019 19:12:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=99=A8=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E8=90=BD=E7=9B=98=E6=96=B9=E6=A1=88=E5=A4=8D?= =?UTF-8?q?=E7=94=A8RPC=E5=BA=8F=E5=88=97=E5=8C=96=E6=96=B9=E6=A1=88?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E7=A7=BB=E9=99=A4Jackson=E4=BE=9D=E8=B5=96?= =?UTF-8?q?=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 3 +- pom.xml | 2 - .../resolver/WebExceptionResolver.java | 2 +- .../com/xxl/job/admin/core/util/I18nUtil.java | 1 - .../xxl/job/admin}/core/util/JacksonUtil.java | 2 +- xxl-job-core/pom.xml | 7 -- .../xxl/job/core/executor/XxlJobExecutor.java | 7 +- .../core/thread/TriggerCallbackThread.java | 68 ++++++++++-------- .../java/com/xxl/job/core/util/FileUtil.java | 70 +++++++++++++++++-- 9 files changed, 112 insertions(+), 50 deletions(-) rename {xxl-job-core/src/main/java/com/xxl/job => xxl-job-admin/src/main/java/com/xxl/job/admin}/core/util/JacksonUtil.java (98%) diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 1e62d97b..3cb4aba2 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -1419,9 +1419,10 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 5、精简项目,取消第三方依赖,如 commons-collections4 ; - 6、底层Log调优,应用正常终止取消异常栈信息打印; - 7、交互优化,尽量避免新开页面窗口;仅WebIDE支持新开页,并提供窗口快速关闭按钮; -- 8、[测试中]底层通讯方案优化:升级较新版本xxl-rpc,由"JETTY"方案调整为"NETTY_HTTP"方案,执行器内嵌netty-http-server提供服务,调度中心复用容器端口提供服务; +- 8、底层通讯方案优化:升级较新版本xxl-rpc,由"JETTY"方案调整为"NETTY_HTTP"方案,执行器内嵌netty-http-server提供服务,调度中心复用容器端口提供服务; - 9、任务暂停、删除优化,避免quartz delete不完整导致任务脏数据; - 10、任务回调、心跳注册成功日志优化,非核心常规日志调整为debug级别,降低冗余日志输出; +- 11、执行器回调日志落盘方案复用RPC序列化方案,并移除Jackson依赖; - [迭代中]注册中心优化,实时性注册发现:心跳注册间隔10s,refresh失败则首次注册并立即更新注册信息,心跳类似;30s过期销毁; - [迭代中]脚本任务,支持数据参数,新版本仅支持单参数不支持需要兼容; - [迭代中]提供执行器Docker镜像; diff --git a/pom.xml b/pom.xml index d6e0525c..245b1881 100644 --- a/pom.xml +++ b/pom.xml @@ -41,8 +41,6 @@ 2.5.5 2.3.0 - 2.9.8 - diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java index f520fda0..e2e0f27a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/resolver/WebExceptionResolver.java @@ -1,7 +1,7 @@ package com.xxl.job.admin.controller.resolver; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.util.JacksonUtil; +import com.xxl.job.admin.core.util.JacksonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/I18nUtil.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/I18nUtil.java index 408b97db..ec291c8b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/I18nUtil.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/I18nUtil.java @@ -1,7 +1,6 @@ package com.xxl.job.admin.core.util; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.core.util.JacksonUtil; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/JacksonUtil.java similarity index 98% rename from xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java rename to xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/JacksonUtil.java index e24d865d..4cf5a189 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/JacksonUtil.java @@ -1,4 +1,4 @@ -package com.xxl.job.core.util; +package com.xxl.job.admin.core.util; import com.fasterxml.jackson.core.JsonGenerationException; import com.fasterxml.jackson.core.JsonParseException; diff --git a/xxl-job-core/pom.xml b/xxl-job-core/pom.xml index f5b88975..61027242 100644 --- a/xxl-job-core/pom.xml +++ b/xxl-job-core/pom.xml @@ -36,13 +36,6 @@ ${commons-exec.version} - - - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} - - org.springframework diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index b38eff72..0fc1c14c 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -107,7 +107,9 @@ public class XxlJobExecutor { // ---------------------- admin-client (rpc invoker) ---------------------- private static List adminBizList; + private static Serializer serializer; private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { + serializer = Serializer.SerializeEnum.HESSIAN.getSerializer(); if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { @@ -116,7 +118,7 @@ public class XxlJobExecutor { AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean( NetEnum.NETTY_HTTP, - Serializer.SerializeEnum.HESSIAN.getSerializer(), + serializer, CallType.SYNC, LoadBalance.ROUND, AdminBiz.class, @@ -139,6 +141,9 @@ public class XxlJobExecutor { public static List getAdminBizList(){ return adminBizList; } + public static Serializer getSerializer() { + return serializer; + } // ---------------------- executor-server (rpc provider) ---------------------- diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 69b41191..ad5497ad 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -8,7 +8,6 @@ import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.util.FileUtil; -import com.xxl.job.core.util.JacksonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -190,46 +189,53 @@ public class TriggerCallbackThread { // ---------------------- fail-callback file ---------------------- - private static String failCallbackFileName = XxlJobFileAppender.getLogPath().concat(File.separator).concat("xxl-job-callback").concat(".log"); + private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator); + private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log"); private void appendFailCallbackFile(List callbackParamList){ + // valid + if (callbackParamList==null || callbackParamList.size()==0) { + return; + } + // append file - String content = JacksonUtil.writeValueAsString(callbackParamList); - FileUtil.appendFileLine(failCallbackFileName, content); + byte[] callbackParamList_bytes = XxlJobExecutor.getSerializer().serialize(callbackParamList); + + File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()))); + if (callbackLogFile.exists()) { + for (int i = 0; i < 100; i++) { + callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) )); + if (!callbackLogFile.exists()) { + break; + } + } + } + FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes); } private void retryFailCallbackFile(){ - // load and clear file - List fileLines = FileUtil.loadFileLines(failCallbackFileName); - FileUtil.deleteFile(failCallbackFileName); - - // parse - List failCallbackParamList = new ArrayList<>(); - if (fileLines!=null && fileLines.size()>0) { - for (String line: fileLines) { - List failCallbackParamListTmp = JacksonUtil.readValue(line, List.class, HandleCallbackParam.class); - if (failCallbackParamListTmp!=null && failCallbackParamListTmp.size()>0) { - failCallbackParamList.addAll(failCallbackParamListTmp); - } - } + // valid + File callbackLogPath = new File(failCallbackFilePath); + if (!callbackLogPath.exists()) { + return; + } + if (callbackLogPath.isFile()) { + callbackLogPath.delete(); + } + if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) { + return; } - // retry callback, 100 lines per page - if (failCallbackParamList!=null && failCallbackParamList.size()>0) { - int pagesize = 100; - List pageData = new ArrayList<>(); - for (int i = 0; i < failCallbackParamList.size(); i++) { - pageData.add(failCallbackParamList.get(i)); - if (i>0 && i%pagesize == 0) { - doCallback(pageData); - pageData.clear(); - } - } - if (pageData.size() > 0) { - doCallback(pageData); - } + // load and clear file, retry + for (File callbaclLogFile: callbackLogPath.listFiles()) { + byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile); + List callbackParamList = (List) XxlJobExecutor.getSerializer().deserialize(callbackParamList_bytes, HandleCallbackParam.class); + + callbaclLogFile.delete(); + doCallback(callbackParamList); } + } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java index 6a4abee0..0c9c5a12 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java @@ -3,9 +3,10 @@ package com.xxl.job.core.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; -import java.util.ArrayList; -import java.util.List; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; /** * file tool @@ -15,6 +16,7 @@ import java.util.List; public class FileUtil { private static Logger logger = LoggerFactory.getLogger(FileUtil.class); + /** * delete recursively * @@ -36,6 +38,7 @@ public class FileUtil { return false; } + public static void deleteFile(String fileName) { // file File file = new File(fileName); @@ -44,7 +47,64 @@ public class FileUtil { } } - public static void appendFileLine(String fileName, String content) { + + public static void writeFileContent(File file, byte[] data) { + + // file + if (!file.exists()) { + try { + file.createNewFile(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + return; + } + } + + // append file content + FileOutputStream fos = null; + try { + fos = new FileOutputStream(file); + fos.write(data); + fos.flush(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + if (fos != null) { + try { + fos.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + + } + + public static byte[] readFileContent(File file) { + Long filelength = file.length(); + byte[] filecontent = new byte[filelength.intValue()]; + + FileInputStream in = null; + try { + in = new FileInputStream(file); + in.read(filecontent); + in.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + } + return filecontent; + } + + + /*public static void appendFileLine(String fileName, String content) { // file File file = new File(fileName); @@ -119,6 +179,6 @@ public class FileUtil { } return result; - } + }*/ }