XXL-JOB真的要涼了?出現(xiàn)了一個王炸級別的分布式任務(wù)調(diào)度與計算框架?
關(guān)于PowerJob
PowerJob(原OhMyScheduler)是全新一代分布式任務(wù)調(diào)度與計算框架,其主要功能特性如下:
- 使用簡單:提供前端Web界面,允許開發(fā)者可視化地完成調(diào)度任務(wù)的管理(增、刪、改、查)、任務(wù)運行狀態(tài)監(jiān)控和運行日志查看等功能。
- 定時策略完善:支持 CRON 表達式、固定頻率、固定延遲和API四種定時調(diào)度策略。
- 執(zhí)行模式豐富:支持單機、廣播、Map、MapReduce 四種執(zhí)行模式,其中 Map/MapReduce 處理器能使開發(fā)者寥寥數(shù)行代碼便獲得集群分布式計算的能力。
- 工作流支持:支持在線配置任務(wù)依賴關(guān)系(DAG),以可視化的方式對任務(wù)進行編排,同時還支持上下游任務(wù)間的數(shù)據(jù)傳遞,以及多種節(jié)點類型(判斷節(jié)點 & 嵌套工作流節(jié)點)。
- 執(zhí)行器支持廣泛:支持 Spring Bean、內(nèi)置/外置 Java 類,另外可以通過引入官方提供的依賴包,一鍵集成 Shell、Python、HTTP、SQL 等處理器,應(yīng)用范圍廣。
- 運維便捷:支持在線日志功能,執(zhí)行器產(chǎn)生的日志可以在前端控制臺頁面實時顯示,降低 debug 成本,極大地提高開發(fā)效率。
- 依賴精簡:最小僅依賴關(guān)系型數(shù)據(jù)庫(MySQL/PostgreSQL/Oracle/MS SQLServer...)
- 高可用 & 高性能:調(diào)度服務(wù)器經(jīng)過精心設(shè)計,一改其他調(diào)度框架基于數(shù)據(jù)庫鎖的策略,實現(xiàn)了無鎖化調(diào)度。部署多個調(diào)度服務(wù)器可以同時實現(xiàn)高可用和性能的提升(支持無限的水平擴展)。
- 故障轉(zhuǎn)移與恢復(fù):任務(wù)執(zhí)行失敗后,可根據(jù)配置的重試策略完成重試,只要執(zhí)行器集群有足夠的計算節(jié)點,任務(wù)就能順利完成。
適用場景
- 有定時執(zhí)行需求的業(yè)務(wù)場景:如每天凌晨全量同步數(shù)據(jù)、生成業(yè)務(wù)報表、未支付訂單超時取消等。
- 有需要全部機器一同執(zhí)行的業(yè)務(wù)場景:如使用廣播執(zhí)行模式清理集群日志。
- 有需要分布式處理的業(yè)務(wù)場景:比如需要更新一大批數(shù)據(jù),單機執(zhí)行耗時非常長,可以使用Map/MapReduce 處理器完成任務(wù)的分發(fā),調(diào)動整個集群加速計算。
- 有需要延遲執(zhí)行某些任務(wù)的業(yè)務(wù)場景:比如訂單過期處理等。
同類產(chǎn)品對比
基本概念
分組概念
- appName:應(yīng)用名稱,建議與用戶實際接入 PowerJob 的應(yīng)用名稱保持一致,用于業(yè)務(wù)分組與隔離。一個 appName 等于一個業(yè)務(wù)集群,也就是實際的一個 Java 項目。
核心概念
- 任務(wù)(Job):描述了需要被 PowerJob 調(diào)度的任務(wù)信息,包括任務(wù)名稱、調(diào)度時間、處理器信息等。
- 任務(wù)實例( JobInstance,簡稱 Instance):任務(wù)(Job)被調(diào)度執(zhí)行后會生成任務(wù)實例(Instance),任務(wù)實例記錄了任務(wù)的運行時信息(任務(wù)與任務(wù)實例的關(guān)系類似于類與對象的關(guān)系)。
- 作業(yè)(Task):任務(wù)實例的執(zhí)行單元,一個 JobInstance 存在至少一個 Task,具體規(guī)則如下:
- 單機任務(wù)(STANDALONE):一個 JobInstance 對應(yīng)一個 Task
- 廣播任務(wù)(BROADCAST):一個 JobInstance 對應(yīng) N 個 Task,N為集群機器數(shù)量,即每一臺機器都會生成一個 Task
- Map/MapReduce任務(wù):一個 JobInstance 對應(yīng)若干個 Task,由開發(fā)者手動 map 產(chǎn)生
- 工作流(Workflow):由 DAG(有向無環(huán)圖)描述的一組任務(wù)(Job),用于任務(wù)編排。
- 工作流實例(WorkflowInstance):工作流被調(diào)度執(zhí)行后會生成工作流實例,記錄了工作流的運行時信息。
擴展概念
- JVM 容器:以 Maven 工程項目的維度組織一堆 Java 文件(開發(fā)者開發(fā)的眾多 Java 處理器),可以通過前端網(wǎng)頁動態(tài)發(fā)布并被執(zhí)行器加載,具有極強的擴展能力和靈活性。
- OpenAPI:允許開發(fā)者通過接口來完成手工的操作,讓系統(tǒng)整體變得更加靈活。開發(fā)者可以基于 API 便捷地擴展 PowerJob 原有的功能。
定時任務(wù)類型
- API:該任務(wù)只會由 powerjob-client 中提供的 OpenAPI 接口觸發(fā),server 不會主動調(diào)度。
- CRON:該任務(wù)的調(diào)度時間由 CRON 表達式指定。
- 固定頻率:秒級任務(wù),每隔多少毫秒運行一次,功能與 java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate 相同。
- 固定延遲:秒級任務(wù),延遲多少毫秒運行一次,功能與 java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay 相同。
- 工作流:該任務(wù)只會由其所屬的工作流調(diào)度執(zhí)行,server 不會主動調(diào)度該任務(wù)。如果該任務(wù)不屬于任何一個工作流,該任務(wù)就不會被調(diào)度。
備注:固定延遲和固定頻率任務(wù)統(tǒng)稱秒級任務(wù),這兩種任務(wù)無法被停止,只有任務(wù)被關(guān)閉或刪除時才能真正停止任務(wù)。
搭建PowerJob環(huán)境
本地啟動
初始化項目
git clone https://github.com/PowerJob/PowerJob.git
導(dǎo)入 IDE,源碼結(jié)構(gòu)如下,我們需要啟動調(diào)度服務(wù)器(powerjob-server),同時在 samples 工程中編寫自己的處理器代碼
啟動調(diào)度服務(wù)器
- 創(chuàng)建數(shù)據(jù)庫(僅需要創(chuàng)建數(shù)據(jù)庫):找到你的 DB,運行 SQL CREATE DATABASE IF NOT EXISTS `powerjob-daily` DEFAULT CHARSET utf8mb4,搞定~
- 修改配置文件:配置文件的說明官方文檔寫的非常詳細,此處不再贅述。需要修改的地方為數(shù)據(jù)庫配置spring.datasource.core.jdbc-url、spring.datasource.core.username和spring.datasource.core.password,當(dāng)然,有 mongoDB 的同學(xué)也可以修改spring.data.mongodb.uri以獲取完全版體驗。
powerjob-server 日常環(huán)境配置文件:application-daily.properties
oms.env=DAILY
logging.cnotallow=classpath:logback-dev.xml
####### 外部數(shù)據(jù)庫配置(需要用戶更改為自己的數(shù)據(jù)庫配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimeznotallow=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置,非核心依賴,通過配置 oms.mongodb.enable=false 來關(guān)閉 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily
####### 郵件配置(不需要郵件報警可以刪除以下配置來避免報錯) #######
spring.mail.host=smtp.163.com
spring.mail.username=zqq@163.com
spring.mail.password=GOFZPNARMVKCGONV
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 資源清理配置 #######
oms.instanceinfo.retentinotallow=1
oms.container.retention.local=1
oms.container.retention.remote=-1
####### 緩存配置 #######
oms.instance.metadata.cache.size=1024
- 啟動應(yīng)用:完成配置文件的修改后,可以直接通過啟動類 tech.powerjob.server.PowerJobServerApplication 啟動調(diào)度服務(wù)器(注意:需要使用 daily 配置文件啟動,可自行百度搜索“SpringBoot 指定配置文件啟動”),觀察啟動日志,查看是否啟動成功~啟動成功后,訪問 http://127.0.0.1:7700/ ,如果能順利出現(xiàn) Web 界面,則說明調(diào)度服務(wù)器啟動成功!
- 注冊應(yīng)用:點擊主頁應(yīng)用注冊按鈕,填入 powerjob-agent-test 和控制臺密碼(用于進入控制臺),注冊示例應(yīng)用(當(dāng)然你也可以注冊其他的 appName,只是別忘記在示例程序中同步修改~)
圖片
docker-compose啟動
環(huán)境要求
本地需要安裝docker和docker-compose
下載代碼
git clone --depth=1 https://github.com/PowerJob/PowerJob.git
運行
cd PowerJob
docker-compose up
docker-compose up -d
剛開始啟動時,powerjob-worker-samples會啟動失敗,等powerjob-server啟動成功后,powerjob-worker-samples才會啟動成功。這大概需要幾分鐘。
運行成功后,瀏覽器訪問 http://127.0.0.1:7700/
應(yīng)用名稱:powerjob-worker-samples
密碼:powerjob123
停止
docker-compose down
Stopping powerjob-worker-samples ... done
Stopping powerjob-server ... done
Stopping powerjob-mysql ... done
Removing powerjob-worker-samples ... done
Removing powerjob-server ... done
Removing powerjob-mysql ... done
cd PowerJob
rm -rf powerjob-data
SpringBoot集成PowerJob
添加相關(guān)maven依賴
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>${latest.powerjob.version}</version>
</dependency>
配置文件配置
powerjob:
worker:
# akka 工作端口,可選,默認 27777
akka-port: 27777
# 接入應(yīng)用名稱,用于分組隔離,推薦填寫 本 Java 項目名稱
app-name: ${spring.application.name}
# 調(diào)度服務(wù)器地址,IP:Port 或 域名,多值逗號分隔
server-address: 81.70.117.188:7700
# 持久化方式,可選,默認 disk
store-strategy: disk
# 任務(wù)返回結(jié)果信息的最大長度,超過這個長度的信息會被截斷,默認值 8192
max-result-length: 8192
# 單個任務(wù)追加的工作流上下文最大長度,超過這個長度的會被直接丟棄,默認值 8192
max-appended-wf-context-length: 8192
處理器(Processor)開發(fā)
處理器概述
基本概念
PowerJob 支持 Python、Shell、HTTP、SQL 等眾多通用任務(wù)的處理,開發(fā)者只需要引入依賴,在控制臺配置好相關(guān)參數(shù)即可,關(guān)于這部分詳見 官方處理器 ,此處不再贅述。本章將重點闡述 Java 處理器開發(fā)方法與使用技巧。
- Java 處理器可根據(jù)代碼所處位置劃分為內(nèi)置 Java 處理器和外置 Java 處理器,前者直接集成在宿主應(yīng)用(也就是接入本系統(tǒng)的業(yè)務(wù)應(yīng)用)中,一般用來處理業(yè)務(wù)需求;后者可以在一個獨立的輕量級的 Java 工程中開發(fā),通過 JVM 容器技術(shù)(詳見容器章節(jié))被 worker 集群熱加載,提供 Java 的“腳本能力”,一般用于處理靈活多變的需求。
- Java 處理器可根據(jù)對象創(chuàng)建者劃分為 SpringBean 處理器和普通 Java 對象處理器,前者由 Spring IOC 容器完成處理器的創(chuàng)建和初始化,后者則由 PowerJob 維護其生命周期。如果宿主應(yīng)用支持 Spring,強烈建議使用 SpringBean 處理器,開發(fā)者僅需要將 Processor 注冊進 Spring IOC 容器(一個 @Component 注解或一句 bean 配置)即可享受 Spring 帶來的便捷之處。
- Java處理器可根據(jù)功能劃分為單機處理器、廣播處理器、Map 處理器和 MapReduce 處理器。
單機處理器(BasicProcessor)對應(yīng)了單機任務(wù),即某個任務(wù)的某次運行只會有某一臺機器的某一個線程參與運算。
廣播處理器(BroadcastProcessor)對應(yīng)了廣播任務(wù),即某個任務(wù)的某次運行會調(diào)動集群內(nèi)所有機器參與運算。
Map處理器(MapProcessor)對應(yīng)了Map任務(wù),即某個任務(wù)在運行過程中,允許產(chǎn)生子任務(wù)并分發(fā)到其他機器進行運算。
MapReduce 處理器(MapReduceProcessor)對應(yīng)了 MapReduce 任務(wù),在 Map 任務(wù)的基礎(chǔ)上,增加了所有任務(wù)結(jié)束后的匯總統(tǒng)計。
入?yún)?TaskContext
TaskContext 包含了本次任務(wù)的上下文信息,具體信息如下
屬性列表(紅色標注的為常用屬性) | |
屬性名稱 | 意義/用法 |
jobId | 任務(wù) ID,開發(fā)者一般無需關(guān)心此參數(shù) |
instanceId | 任務(wù)實例 ID,全局唯一,開發(fā)者一般無需關(guān)心此參數(shù) |
subInstanceId | 子任務(wù)實例 ID,秒級任務(wù)使用,開發(fā)者一般無需關(guān)心此參數(shù) |
taskId | 采用鏈式命名法的 ID,在某個任務(wù)實例內(nèi)唯一,開發(fā)者一般無需關(guān)心此參數(shù) |
taskName | task 名稱,Map/MapReduce 任務(wù)的子任務(wù)的值為開發(fā)者指定,否則為系統(tǒng)默認值,開發(fā)者一般無需關(guān)心此參數(shù) |
jobParams | 任務(wù)參數(shù) 對于非工作流中的任務(wù)其值等同于控制臺錄入的任務(wù)參數(shù);如果該任務(wù)為工作流中的任務(wù)且有配置節(jié)點參數(shù)信息,那么接收到的是節(jié)點配置的參數(shù)信息 |
instanceParams | 任務(wù)實例參數(shù) 對于非工作流中的任務(wù) 其值 等同于 OpenAPI 傳遞的實例參數(shù),非 OpenAPI 觸發(fā)的任務(wù)則一定為空。如果該任務(wù)為工作流中的任務(wù)那么這里實際接收到的是工作流上下文信息,建議使用 getWorkflowContext 方法獲取上下文信息 |
maxRetryTimes | Task 的最大重試次數(shù) |
currentRetryTimes | Task 的當(dāng)前重試次數(shù),和 maxRetryTimes 聯(lián)合起來可以判斷當(dāng)前是否為該 Task 的最后一次運行機會 |
subTask | 子 Task,Map/MapReduce 處理器專屬,開發(fā)者調(diào)用map方法時傳遞的子任務(wù)列表中的某一個 |
omsLogger | 在線日志,用法同 Slf4J,記錄的日志可以直接通過控制臺查看,非常便捷和強大!不過使用過程中需要注意頻率,濫用在線日志會對 Server 造成巨大的壓力 |
userContext | 用戶在 PowerJobWorkerConfig 中設(shè)置的自定義上下文 |
workflowContext | 工作流上下文,更多信息見下方說明 |
工作流上下文( WorkflowContext )
該屬性是 v4.0.0 版本的重大變更之一,移除了原來的參數(shù)傳遞機制,提供了 API 讓開發(fā)者可以更加靈活便捷地在工作流中實現(xiàn)信息的傳遞。
屬性列表 | |
屬性名稱 | 意義/用法 |
wfInstanceId | 工作流實例 ID |
data | 工作流上下文數(shù)據(jù),鍵值對 |
appendedContextData | 當(dāng)前任務(wù)向工作流上下文中追加的數(shù)據(jù)。在任務(wù)執(zhí)行完成后 ProcessorTracker 會將其上報給 TaskTracker,TaskTracker 在當(dāng)前任務(wù)執(zhí)行完成后會將這個信息上報給 server ,追加到當(dāng)前的工作流上下文中,供下游任務(wù)消費 |
上游任務(wù)通過 WorkflowContext#appendData2WfContext(String key,Object value) 方法向工作流上下文中追加數(shù)據(jù),下游任務(wù)便可以通過 WorkflowContext#fetchWorkflowContext() 方法獲取到相應(yīng)的數(shù)據(jù)進行消費。注意,當(dāng)追加的上下文信息的 key 已經(jīng)存在于當(dāng)前的上下文中時,新的 value 會覆蓋之前的值。另外,每次任務(wù)實例追加的上下文數(shù)據(jù)大小也會受到 worker 的配置項 powerjob.worker.max-appended-wf-context-length 的限制,超過這個長度的會被直接丟棄。
返回值 ProcessResult
方法的返回值為 ProcessResult,代表了本次 Task 執(zhí)行的結(jié)果,包含 success 和 msg 兩個屬性,分別用于傳遞 Task 是否執(zhí)行成功和 Task 需要返回的信息。
處理器開發(fā)示例
單機處理器:BasicProcessor
單機執(zhí)行的策略下,server 會在所有可用 worker 中選取健康度最佳的機器進行執(zhí)行。單機執(zhí)行任務(wù)需要實現(xiàn)接口 BasicProcessor,代碼示例如下:
/**
* @Author iron.guo
* @Date 2023/1/7
* @Description
*/
@Component
@Slf4j
public class StandaloneProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("處理器啟動成功,context 是 {}.", context);
log.info("單機處理器正在處理");
log.info(context.getJobParams());
omsLogger.info("處理器執(zhí)行結(jié)束");
boolean success = true;
return new ProcessResult(success, context + ": " + success);
}
}
執(zhí)行結(jié)果
圖片
廣播處理器:BroadcastProcessor
廣播執(zhí)行的策略下,所有機器都會被調(diào)度執(zhí)行該任務(wù)。為了便于資源的準備和釋放,廣播處理器在BasicProcessor 的基礎(chǔ)上額外增加了 preProcess 和 postProcess 方法,分別在整個集群開始之前/結(jié)束之后選一臺機器執(zhí)行相關(guān)方法。代碼示例如下:
@Slf4j
@Component
public class BroadcastProcessorDemo implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("預(yù)執(zhí)行,會在所有 worker 執(zhí)行 process 方法前調(diào)用");
log.info("預(yù)執(zhí)行,會在所有 worker 執(zhí)行 process 方法前調(diào)用");
// 預(yù)執(zhí)行,會在所有 worker 執(zhí)行 process 方法前調(diào)用
return new ProcessResult(true, "init success");
}
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
// 撰寫整個worker集群都會執(zhí)行的代碼邏輯
omsLogger.info("撰寫整個worker集群都會執(zhí)行的代碼邏輯");
log.info("撰寫整個worker集群都會執(zhí)行的代碼邏輯");
return new ProcessResult(true, "release resource success");
}
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
// taskResults 存儲了所有worker執(zhí)行的結(jié)果(包括preProcess)
// 收尾,會在所有 worker 執(zhí)行完畢 process 方法后調(diào)用,該結(jié)果將作為最終的執(zhí)行結(jié)果
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("收尾,會在所有 worker 執(zhí)行完畢 process 方法后調(diào)用,該結(jié)果將作為最終的執(zhí)行結(jié)果");
log.info("收尾,會在所有 worker 執(zhí)行完畢 process 方法后調(diào)用,該結(jié)果將作為最終的執(zhí)行結(jié)果");
return new ProcessResult(true, "process success");
}
}
執(zhí)行結(jié)果
圖片
并行處理器:MapReduceProcessor
MapReduce 是最復(fù)雜也是最強大的一種執(zhí)行器,它允許開發(fā)者完成任務(wù)的拆分,將子任務(wù)派發(fā)到集群中其他Worker 執(zhí)行,是執(zhí)行大批量處理任務(wù)的不二之選!實現(xiàn) MapReduce 處理器需要繼承 MapReduceProcessor類,具體用法如下示例代碼所示:
@Slf4j
@Component
public class MapReduceProcessorDemo implements MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
final OmsLogger omsLogger = context.getOmsLogger();
// 判斷是否為根任務(wù)
if (isRootTask()) {
// 構(gòu)造子任務(wù)
List<SubTask> subTaskList = Lists.newLinkedList();
SubTask subTask=new SubTask();
subTask.setSiteId(1L);
subTask.setName("iron.guo");
subTaskList.add(subTask);
/*
* 子任務(wù)的構(gòu)造由開發(fā)者自己定義
* eg. 現(xiàn)在需要從文件中讀取100W個ID,并處理數(shù)據(jù)庫中這些ID對應(yīng)的數(shù)據(jù),那么步驟如下:
* 1. 根任務(wù)(RootTask)讀取文件,流式拉取100W個ID,并按1000個一批的大小組裝成子任務(wù)進行派發(fā)
* 2. 非根任務(wù)獲取子任務(wù),完成業(yè)務(wù)邏輯的處理
*/
// 調(diào)用 map 方法,派發(fā)子任務(wù)(map 可能會失敗并拋出異常,做好業(yè)務(wù)操作)
map(subTaskList, "DATA_PROCESS_TASK");
omsLogger.info("執(zhí)行根任務(wù)-派發(fā)子任務(wù)");
return new ProcessResult(true, "ROOT_PROCESS_SUCCESS");
}
// 非子任務(wù),可根據(jù) subTask 的類型 或 TaskName 來判斷分支
if (context.getSubTask() instanceof SubTask) {
omsLogger.info("執(zhí)行子任務(wù)開始");
omsLogger.info("Get from SubTask : name is {} and id is {}",((SubTask) context.getSubTask()).getName(),((SubTask) context.getSubTask()).getSiteId());
// 執(zhí)行子任務(wù),注:子任務(wù)人可以 map 產(chǎn)生新的子任務(wù),可以構(gòu)建任意級的 MapReduce 處理器
return new ProcessResult(true, "PROCESS_SUB_TASK_SUCCESS");
}
return new ProcessResult(false, "UNKNOWN_BUG");
}
@Override
public ProcessResult reduce(TaskContext taskContext, List<TaskResult> taskResults) {
// 所有 Task 執(zhí)行結(jié)束后,reduce 將會被執(zhí)行
// taskResults 保存了所有子任務(wù)的執(zhí)行結(jié)果
// 用法舉例,統(tǒng)計執(zhí)行結(jié)果
AtomicLong successCnt = new AtomicLong(0);
taskResults.forEach(tr -> {
if (tr.isSuccess()) {
successCnt.incrementAndGet();
}
});
// 該結(jié)果將作為任務(wù)最終的執(zhí)行結(jié)果
return new ProcessResult(true, "success task num:" + successCnt.get());
}
// 自定義的子任務(wù)
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
private static class SubTask {
private Long siteId;
private String name;
}
}
執(zhí)行結(jié)果
圖片
注:Map 處理器相當(dāng)于 MapReduce 處理器的閹割版本(閹割了 reduce 方法),此處不再單獨舉例。
工作流
圖片
點擊右上角按鈕 新建工作流,即可錄入新的工作流,具體界面和說明如下所示。
- 工作流名稱:名稱,無實際業(yè)務(wù)用途,請盡量精簡字段
- 工作流描述:描述,無實際業(yè)務(wù)用途,請盡量精簡字段
- 定時信息:該工作流的觸發(fā)方式的觸發(fā)方式,包含時間表達式類型選擇框和時間表達式輸入框
CRON -> 填寫 CRON 表達式(在線生成網(wǎng)站)
API -> 不需要填寫任何參數(shù),表明該任務(wù)由 OpenAPI 觸發(fā)
- 生命周期:定時策略生效的時間段
- 最大實例:該工作流同時執(zhí)行的數(shù)量
- 任務(wù)依賴關(guān)系:提供編輯界面可視化操作,繪制 DAG(有向無環(huán)圖),配置工作流內(nèi)各個任務(wù)的依賴關(guān)系
DAG 操作指南
編輯依賴關(guān)系
v4.0.0 以后支持節(jié)點的自由拖拉拽,不用再點點點了,哈哈哈 ~
- 添加節(jié)點:點擊 DAG 編輯框左上方的 “導(dǎo)入任務(wù)”,導(dǎo)入當(dāng)前存在的任務(wù)(需要提前在 任務(wù)管理界面 錄入任務(wù)),生成 DAG 的節(jié)點
- 連接節(jié)點:點擊起始節(jié)點的任意一個錨點摁住不放,拖動鼠標連接到另一個節(jié)點的任意一個錨點即可
- 刪除節(jié)點:選中需要刪除的節(jié)點,按退格鍵( 注意:windows 下使用退格鍵 [Backspace],macOS 下使用刪除鍵 [delete] )
- 刪除邊:選中需要刪除的邊,按退格鍵( 注意:windows 下使用退格鍵 [Backspace],macOS 下使用刪除鍵 [delete] )
導(dǎo)入任務(wù)節(jié)點
任務(wù)為之前創(chuàng)建的任務(wù),可用工作流形式串聯(lián)起來執(zhí)行。
圖片
編輯節(jié)點信息
點擊需要編輯的節(jié)點,在右側(cè)會彈出一個編輯框,如下圖所示
圖片
- 任務(wù)名稱:當(dāng)前節(jié)點引用的任務(wù)名稱,點擊可編輯(支持輸入名稱進行模糊搜索)
- 節(jié)點名稱:節(jié)點的名稱,無實際業(yè)務(wù)用途,在能明確表示節(jié)點背后的業(yè)務(wù)邏輯的情況下請盡量精簡字段
- 節(jié)點參數(shù):節(jié)點的參數(shù)配置,當(dāng)這個信息不為空的時候使用這個參數(shù)覆蓋當(dāng)前節(jié)點所引用的任務(wù)所配置的參數(shù)信息
- 是否啟用:未啟用的節(jié)點將會直接跳過
- 失敗跳過:當(dāng)這個節(jié)點執(zhí)行失敗的時候不會打斷整個工作流的執(zhí)行
特殊節(jié)點說明
判斷節(jié)點
圖片
判斷節(jié)點 不允許失敗跳過以及禁用,節(jié)點參數(shù)中存儲的是 Groovy 代碼(執(zhí)行 Groovy 代碼時會將當(dāng)前工作流上下文作為 context 變量注入到代碼執(zhí)行的上下文中),其執(zhí)行結(jié)果僅能返回 "true" 或者 "false",同時判斷節(jié)點僅有且必須有兩條“輸出”路徑。會根據(jù)該代碼的執(zhí)行結(jié)果決定下游需要執(zhí)行的節(jié)點。這里處理的原則是, 僅 cancel 那些只能通過被 disable 掉的邊可達的節(jié)點
舉個兩個栗子,灰色代表相應(yīng)的邊 或者 節(jié)點被 disable 或 cancel,菱形代表判斷節(jié)點,假定執(zhí)行結(jié)果為 true
圖片
case 3 以及 case 4 中的節(jié)點 3 都會被 cancel ,因為它只能通過節(jié)點 1 -> 節(jié)點 3 的邊可達(該邊的屬性為 false),但對于節(jié)點 5 而言,在 case 4 中因為判斷節(jié)點 2 的執(zhí)行結(jié)果為 true ,那么其可以通過節(jié)點 2 -> 節(jié)點 5 的邊可達,所以不會被 disable 。
備注:如果需要根據(jù)上游節(jié)點的執(zhí)行結(jié)果決定下游節(jié)點,可以將上游節(jié)點的執(zhí)行結(jié)果注入上下文中,再在判斷節(jié)點中做相應(yīng)的判斷。
工作流嵌套節(jié)點
圖片
該節(jié)點代表對某個工作流的引用,節(jié)點的 jobId 屬性存儲的是工作流 id,其他屬性和普通的任務(wù)節(jié)點一致。不允許出現(xiàn)循環(huán)引用以及多級嵌套的情況,即嵌套節(jié)點中指向的工作流一定是一個不含嵌套節(jié)點的工作流。
執(zhí)行到該節(jié)點時,如果該節(jié)點處于啟用狀態(tài),那么將啟動該節(jié)點所引用工作流的一個新實例,待該實例執(zhí)行完成后再同步更新該節(jié)點的狀態(tài)。
注意,創(chuàng)建子工作流時,會透傳當(dāng)前的上下文作為工作流的實例參數(shù),在子工作流執(zhí)行完成時會合并子工作流的上下文至父工作流的上下文中。
重試子工作流不會聯(lián)動重試父工作流,但失敗的子工作流會隨著父工作流的重試而原地重試(不會生成新的實例)