首頁/ 汽車/ 正文

Spring Boot 定時任務動態管理通用解決方案

Spring Boot 定時任務動態管理通用解決方案

一、功能說明

SpringBoot的定時任務的加強工具,實現對SpringBoot原生的定時任務進行動態管理,完全相容原生@Scheduled註解,無需對原本的定時任務進行修改

推薦下自己做的 Spring Boot 的實戰專案:

https://github。com/YunaiV/ruoyi-vue-pro

二、快速使用

具體的功能已經封裝成SpringBoot-starter即插即用

com。github。guoyixing spring-boot-starter-super-scheduled 0。3。1

使用方法和原始碼:

碼雲:https://gitee。com/qiaodaimadewangcai/super-scheduled

github:https://github。com/guoyixing/super-scheduled

推薦下自己做的 Spring Cloud 的實戰專案:

https://github。com/YunaiV/onemall

三、實現原理

1、動態管理實現

(1) 配置管理介紹

@Component(“superScheduledConfig”)public class SuperScheduledConfig { /** * 執行定時任務的執行緒池 */ private ThreadPoolTaskScheduler taskScheduler; /** * 定時任務名稱與定時任務回撥鉤子 的關聯關係容器 */ private Map nameToScheduledFuture = new ConcurrentHashMap<>(); /** * 定時任務名稱與定時任務需要執行的邏輯 的關聯關係容器 */ private Map nameToRunnable = new ConcurrentHashMap<>(); /** * 定時任務名稱與定時任務的源資訊 的關聯關係容器 */ private Map nameToScheduledSource = new ConcurrentHashMap<>(); /* 普通的get/sets省略 */}

(2) 使用後處理器攔截SpringBoot原本的定時任務

實現ApplicationContextAware介面拿到SpringBoot的上下文

實現BeanPostProcessor介面,將這個類標記為後處理器,後處理器會在每個bean例項化之後執行

使用@DependsOn註解強制依賴SuperScheduledConfig類,讓SpringBoot例項化SuperScheduledPostProcessor類之前先例項化SuperScheduledConfig類

主要實現邏輯在postProcessAfterInitialization()方法中

Spring Boot 定時任務動態管理通用解決方案

@DependsOn({“superScheduledConfig”})@Component@Orderpublic class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware { protected final Log logger = LogFactory。getLog(getClass()); private ApplicationContext applicationContext; /** * 例項化bean之前的操作 * @param bean bean例項 * @param beanName bean的Name */ @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } /** * 例項化bean之後的操作 * @param bean bean例項 * @param beanName bean的Name */ @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //1。獲取配置管理器 SuperScheduledConfig superScheduledConfig = applicationContext。getBean(SuperScheduledConfig。class); //2。獲取當前例項化完成的bean的所有方法 Method[] methods = bean。getClass()。getDeclaredMethods(); //迴圈處理對每個方法逐一處理 if (methods。length > 0) { for (Method method : methods) { //3。嘗試在該方法上獲取@Scheduled註解(SpringBoot的定時任務註解) Scheduled annotation = method。getAnnotation(Scheduled。class); //如果無法獲取到@Scheduled註解,就跳過這個方法 if (annotation == null) { continue; } //4。建立定時任務的源屬性 //建立定時任務的源屬性(用來記錄定時任務的配置,初始化的時候記錄的是註解上原本的屬性) ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean); //對註解上獲取到源屬性中的屬性進行檢測 if (!scheduledSource。check()) { throw new SuperScheduledException(“在” + beanName + “Bean中” + method。getName() + “方法的註解引數錯誤”); } //生成定時任務的名稱(id),使用beanName+“。”+方法名 String name = beanName + “。” + method。getName(); //將以key-value的形式,將源資料存入配置管理器中,key:定時任務的名稱 value:源資料 superScheduledConfig。addScheduledSource(name, scheduledSource); try { //5。將原本SpringBoot的定時任務取消掉 clearOriginalScheduled(annotation); } catch (Exception e) { throw new SuperScheduledException(“在關閉原始方法” + beanName + method。getName() + “時出現錯誤”); } } } //最後bean保持原有返回 return bean; } /** * 修改註解原先的屬性 * @param annotation 註解例項物件 * @throws Exception */ private void clearOriginalScheduled(Scheduled annotation) throws Exception { changeAnnotationValue(annotation, “cron”, Scheduled。CRON_DISABLED); changeAnnotationValue(annotation, “fixedDelay”, -1L); changeAnnotationValue(annotation, “fixedDelayString”, “”); changeAnnotationValue(annotation, “fixedRate”, -1L); changeAnnotationValue(annotation, “fixedRateString”, “”); changeAnnotationValue(annotation, “initialDelay”, -1L); changeAnnotationValue(annotation, “initialDelayString”, “”); } /** * 獲取SpringBoot的上下文 * @param applicationContext SpringBoot的上下文 */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this。applicationContext = applicationContext; }}

(3) 使用ApplicationRunner初始化自定義的定時任務執行器

實現ApplicationContextAware介面拿到SpringBoot的上下文

使用@DependsOn註解強制依賴threadPoolTaskScheduler類

實現ApplicationRunner介面,在所有bean初始化結束之後,執行自定義邏輯

主要實現邏輯在run()方法中

Spring Boot 定時任務動態管理通用解決方案

@DependsOn(“threadPoolTaskScheduler”)@Componentpublic class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware { protected final Log logger = LogFactory。getLog(getClass()); private DateTimeFormatter df = DateTimeFormatter。ofPattern(“yyyy-MM-dd HH:mm:ss”); private ApplicationContext applicationContext; /** * 定時任務配置管理器 */ @Autowired private SuperScheduledConfig superScheduledConfig; /** * 定時任務執行執行緒 */ @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Override public void run(ApplicationArguments args) { //1。定時任務配置管理器中快取 定時任務執行執行緒 superScheduledConfig。setTaskScheduler(threadPoolTaskScheduler); //2。獲取所有定時任務源資料 Map nameToScheduledSource = superScheduledConfig。getNameToScheduledSource(); //逐一處理定時任務 for (String name : nameToScheduledSource。keySet()) { //3。獲取定時任務源資料 ScheduledSource scheduledSource = nameToScheduledSource。get(name); //4。獲取所有增強類 String[] baseStrengthenBeanNames = applicationContext。getBeanNamesForType(BaseStrengthen。class); //5。建立執行控制器 SuperScheduledRunnable runnable = new SuperScheduledRunnable(); //配置執行控制器 runnable。setMethod(scheduledSource。getMethod()); runnable。setBean(scheduledSource。getBean()); //6。逐一處理增強類(增強器實現原理後面具體分析) List points = new ArrayList<>(baseStrengthenBeanNames。length); for (String baseStrengthenBeanName : baseStrengthenBeanNames) { //7。將增強器代理成point Object baseStrengthenBean = applicationContext。getBean(baseStrengthenBeanName); //建立代理 Point proxy = ProxyUtils。getInstance(Point。class, new RunnableBaseInterceptor(baseStrengthenBean, runnable)); proxy。setSuperScheduledName(name); //8。所有的points連成起來 points。add(proxy); } //將point形成呼叫鏈 runnable。setChain(new Chain(points)); //將執行邏輯封裝並快取到定時任務配置管理器中 superScheduledConfig。addRunnable(name, runnable::invoke); try { //8。啟動定時任務 ScheduledFuture<?> schedule = ScheduledFutureFactory。create(threadPoolTaskScheduler , scheduledSource, runnable::invoke); //將執行緒回撥鉤子存到任務配置管理器中 superScheduledConfig。addScheduledFuture(name, schedule); logger。info(df。format(LocalDateTime。now()) + “任務” + name + “已經啟動。。。”); } catch (Exception e) { throw new SuperScheduledException(“任務” + name + “啟動失敗,錯誤資訊:” + e。getLocalizedMessage()); } } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this。applicationContext = applicationContext; }}

(4) 進行動態管理

@Componentpublic class SuperScheduledManager { protected final Log logger = LogFactory。getLog(getClass()); private DateTimeFormatter df = DateTimeFormatter。ofPattern(“yyyy-MM-dd HH:mm:ss”); @Autowired private SuperScheduledConfig superScheduledConfig; /** * 修改Scheduled的執行週期 * * @param name scheduled的名稱 * @param cron cron表示式 */ public void setScheduledCron(String name, String cron) { //終止原先的任務 cancelScheduled(name); //建立新的任務 ScheduledSource scheduledSource = superScheduledConfig。getScheduledSource(name); scheduledSource。clear(); scheduledSource。setCron(cron); addScheduled(name, scheduledSource); } /** * 修改Scheduled的fixedDelay * * @param name scheduled的名稱 * @param fixedDelay 上一次執行完畢時間點之後多長時間再執行 */ public void setScheduledFixedDelay(String name, Long fixedDelay) { //終止原先的任務 cancelScheduled(name); //建立新的任務 ScheduledSource scheduledSource = superScheduledConfig。getScheduledSource(name); scheduledSource。clear(); scheduledSource。setFixedDelay(fixedDelay); addScheduled(name, scheduledSource); } /** * 修改Scheduled的fixedRate * * @param name scheduled的名稱 * @param fixedRate 上一次開始執行之後多長時間再執行 */ public void setScheduledFixedRate(String name, Long fixedRate) { //終止原先的任務 cancelScheduled(name); //建立新的任務 ScheduledSource scheduledSource = superScheduledConfig。getScheduledSource(name); scheduledSource。clear(); scheduledSource。setFixedRate(fixedRate); addScheduled(name, scheduledSource); } /** * 查詢所有啟動的Scheduled */ public List getRunScheduledName() { Set names = superScheduledConfig。getNameToScheduledFuture()。keySet(); return new ArrayList<>(names); } /** * 查詢所有的Scheduled */ public List getAllSuperScheduledName() { Set names = superScheduledConfig。getNameToRunnable()。keySet(); return new ArrayList<>(names); } /** * 終止Scheduled * * @param name scheduled的名稱 */ public void cancelScheduled(String name) { ScheduledFuture scheduledFuture = superScheduledConfig。getScheduledFuture(name); scheduledFuture。cancel(true); superScheduledConfig。removeScheduledFuture(name); logger。info(df。format(LocalDateTime。now()) + “任務” + name + “已經終止。。。”); } /** * 啟動Scheduled * * @param name scheduled的名稱 * @param scheduledSource 定時任務的源資訊 */ public void addScheduled(String name, ScheduledSource scheduledSource) { if (getRunScheduledName()。contains(name)) { throw new SuperScheduledException(“定時任務” + name + “已經被啟動過了”); } if (!scheduledSource。check()) { throw new SuperScheduledException(“定時任務” + name + “源資料內容錯誤”); } scheduledSource。refreshType(); Runnable runnable = superScheduledConfig。getRunnable(name); ThreadPoolTaskScheduler taskScheduler = superScheduledConfig。getTaskScheduler(); ScheduledFuture<?> schedule = ScheduledFutureFactory。create(taskScheduler, scheduledSource, runnable); logger。info(df。format(LocalDateTime。now()) + “任務” + name + “已經啟動。。。”); superScheduledConfig。addScheduledSource(name, scheduledSource); superScheduledConfig。addScheduledFuture(name, schedule); } /** * 以cron型別啟動Scheduled * * @param name scheduled的名稱 * @param cron cron表示式 */ public void addCronScheduled(String name, String cron) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource。setCron(cron); addScheduled(name, scheduledSource); } /** * 以fixedDelay型別啟動Scheduled * * @param name scheduled的名稱 * @param fixedDelay 上一次執行完畢時間點之後多長時間再執行 * @param initialDelay 第一次執行的延遲時間 */ public void addFixedDelayScheduled(String name, Long fixedDelay, Long。。。 initialDelay) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource。setFixedDelay(fixedDelay); if (initialDelay != null && initialDelay。length == 1) { scheduledSource。setInitialDelay(initialDelay[0]); } else if (initialDelay != null && initialDelay。length > 1) { throw new SuperScheduledException(“第一次執行的延遲時間只能傳入一個引數”); } addScheduled(name, scheduledSource); } /** * 以fixedRate型別啟動Scheduled * * @param name scheduled的名稱 * @param fixedRate 上一次開始執行之後多長時間再執行 * @param initialDelay 第一次執行的延遲時間 */ public void addFixedRateScheduled(String name, Long fixedRate, Long。。。 initialDelay) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource。setFixedRate(fixedRate); if (initialDelay != null && initialDelay。length == 1) { scheduledSource。setInitialDelay(initialDelay[0]); } else if (initialDelay != null && initialDelay。length > 1) { throw new SuperScheduledException(“第一次執行的延遲時間只能傳入一個引數”); } addScheduled(name, scheduledSource); } /** * 手動執行一次任務 * * @param name scheduled的名稱 */ public void runScheduled(String name) { Runnable runnable = superScheduledConfig。getRunnable(name); runnable。run(); }}

2、增強介面實現

增強器實現的整體思路與SpringAop的思路一致,實現沒有Aop複雜

(1) 增強介面

@Order(Ordered。HIGHEST_PRECEDENCE)public interface BaseStrengthen { /** * 前置強化方法 * * @param bean bean例項(或者是被代理的bean) * @param method 執行的方法物件 * @param args 方法引數 */ void before(Object bean, Method method, Object[] args); /** * 後置強化方法 * 出現異常不會執行 * 如果未出現異常,在afterFinally方法之後執行 * * @param bean bean例項(或者是被代理的bean) * @param method 執行的方法物件 * @param args 方法引數 */ void after(Object bean, Method method, Object[] args); /** * 異常強化方法 * * @param bean bean例項(或者是被代理的bean) * @param method 執行的方法物件 * @param args 方法引數 */ void exception(Object bean, Method method, Object[] args); /** * Finally強化方法,出現異常也會執行 * * @param bean bean例項(或者是被代理的bean) * @param method 執行的方法物件 * @param args 方法引數 */ void afterFinally(Object bean, Method method, Object[] args);}

(2) 代理抽象類

public abstract class Point { /** * 定時任務名 */ private String superScheduledName; /** * 抽象的執行方法,使用代理實現 * @param runnable 定時任務執行器 */ public abstract Object invoke(SuperScheduledRunnable runnable); /* 普通的get/sets省略 */}

(3) 呼叫鏈類

public class Chain { private List list; private int index = -1; /** * 索引自增1 */ public int incIndex() { return ++index; } /** * 索引還原 */ public void resetIndex() { this。index = -1; }}

(4) cglib動態代理實現

使用cglib代理增強器,將增強器全部代理成呼叫鏈節點Point

public class RunnableBaseInterceptor implements MethodInterceptor { /** * 定時任務執行器 */ private SuperScheduledRunnable runnable; /** * 定時任務增強類 */ private BaseStrengthen strengthen; @Override public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { Object result; //如果執行的是invoke()方法 if (“invoke”。equals(method。getName())) { //前置強化方法 strengthen。before(obj, method, args); try { //呼叫執行器中的invoke()方法 result = runnable。invoke(); } catch (Exception e) { //異常強化方法 strengthen。exception(obj, method, args); throw new SuperScheduledException(strengthen。getClass() + “中強化執行時發生錯誤”, e); } finally { //Finally強化方法,出現異常也會執行 strengthen。afterFinally(obj, method, args); } //後置強化方法 strengthen。after(obj, method, args); } else { //直接執行方法 result = methodProxy。invokeSuper(obj, args); } return result; } public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) { this。runnable = runnable; if (BaseStrengthen。class。isAssignableFrom(object。getClass())) { this。strengthen = (BaseStrengthen) object; } else { throw new SuperScheduledException(object。getClass() + “物件不是BaseStrengthen型別”); } } public RunnableBaseInterceptor() { }}

(5) 定時任務執行器實現

public class SuperScheduledRunnable { /** * 原始的方法 */ private Method method; /** * 方法所在的bean */ private Object bean; /** * 增強器的呼叫鏈 */ private Chain chain; public Object invoke() { Object result; //索引自增1 if (chain。incIndex() == chain。getList()。size()) { //呼叫鏈中的增強方法已經全部執行結束 try { //呼叫鏈索引初始化 chain。resetIndex(); //增強器全部執行完畢,執行原本的方法 result = method。invoke(bean); } catch (IllegalAccessException | InvocationTargetException e) { throw new SuperScheduledException(e。getLocalizedMessage()); } } else { //獲取被代理後的方法增強器 Point point = chain。getList()。get(chain。getIndex()); //執行增強器代理 //增強器代理中,會回撥方法執行器,形成呼叫鏈,逐一執行呼叫鏈中的增強器 result = point。invoke(this); } return result; } /* 普通的get/sets省略 */}

(6) 增強器代理邏輯

com。gyx。superscheduled。core。SuperScheduledApplicationRunner類中的程式碼片段

//建立執行控制器SuperScheduledRunnable runnable = new SuperScheduledRunnable();runnable。setMethod(scheduledSource。getMethod());runnable。setBean(scheduledSource。getBean());//用來存放 增強器的代理物件List points = new ArrayList<>(baseStrengthenBeanNames。length);//迴圈所有的增強器的beanNamefor (String baseStrengthenBeanName : baseStrengthenBeanNames) { //獲取增強器的bean物件 Object baseStrengthenBean = applicationContext。getBean(baseStrengthenBeanName); //將增強器代理成Point節點 Point proxy = ProxyUtils。getInstance(Point。class, new RunnableBaseInterceptor(baseStrengthenBean, runnable)); proxy。setSuperScheduledName(name); //增強器的代理物件快取到list中 points。add(proxy);}//將增強器代理例項的集合生成呼叫鏈//執行控制器中設定呼叫鏈runnable。setChain(new Chain(points));

原文連結:https://mp。weixin。qq。com/s/7sQtqro639qw0Ihyd_G8FQ

相關文章

頂部