項目終于用上了xxl-job,好起來了!
本篇文章主要記錄項目中遇到的 xxl-job 的實(shí)戰(zhàn),希望能通過這篇文章告訴讀者們什么是 xxl-job 以及怎么使用 xxl-job 并分享一個實(shí)戰(zhàn)案例。
那么下面先說明什么是 xxl-job 以及為什么要使用它。
xxl-job 是什么?
XXL-JOB 是一個分布式任務(wù)調(diào)度平臺,其核心設(shè)計目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡單、輕量級、易擴(kuò)展。
設(shè)計思想 是將調(diào)度行為抽象形成 調(diào)度中心 平臺,平臺本身不承擔(dān)業(yè)務(wù)邏輯,而是負(fù)責(zé)發(fā)起 調(diào)度請求 后,由 執(zhí)行器 接收調(diào)度請求并執(zhí)行 任務(wù),這里的 任務(wù) 抽象為 分散的 JobHandler。通過這種方式即可實(shí)現(xiàn) 調(diào)度 與 任務(wù) 相互解耦,從而提高系統(tǒng)整體的穩(wěn)定性和拓展性。
為了更好理解,這里放一張官網(wǎng)的架構(gòu)圖:
圖片
任務(wù)調(diào)度是什么?
在開發(fā)項目時大家是否也遇到過類似的場景問題:
- 系統(tǒng)需要定時在每天0點(diǎn)進(jìn)行數(shù)據(jù)備份。
- 系統(tǒng)需要在活動開始前幾小時預(yù)熱執(zhí)行一些前置業(yè)務(wù)。
- 系統(tǒng)需要定時對 MQ 消息表的發(fā)送裝填,對發(fā)送失敗的 MQ 消息進(jìn)行補(bǔ)償重新發(fā)送。
這些場景問題都可以通過 任務(wù)調(diào)度 來解決,任務(wù)調(diào)度指的是系統(tǒng)在約定的指定時間自動去執(zhí)行指定的任務(wù)的過程。
單體系統(tǒng) 中有許多實(shí)現(xiàn) 任務(wù)調(diào)度 的方式,如多線程方式、Timer 類、Spring Tasks 等等。這里比較常用的是 Spring Tasks(通過 @EnableScheduling + @Scheduled 的注解可以自定義定時任務(wù),有興趣的可以去了解一下)
為什么需要分布式任務(wù)調(diào)度平臺?
分布式下,每個服務(wù)都可以搭建為集群,這樣的好處是可以將任務(wù)切片分給每一個服務(wù)從而實(shí)現(xiàn)并行執(zhí)行,提高任務(wù)調(diào)度的處理效率。那么為什么 分布式系統(tǒng) 不能使用 單體系統(tǒng) 的任務(wù)調(diào)度實(shí)現(xiàn)方式呢。
在集群服務(wù)下,如果還是使用每臺機(jī)器按照單體系統(tǒng)的任務(wù)調(diào)度實(shí)現(xiàn)方式實(shí)現(xiàn)的話,會出現(xiàn)下面這四個問題:
- 怎么做到對任務(wù)的控制(如何避免任務(wù)重復(fù)執(zhí)行)。
- 如果某臺機(jī)器宕機(jī)了,會不會存在任務(wù)丟失。
- 如果要增加服務(wù)實(shí)例,怎么做到彈性擴(kuò)容。
- 如何做到對任務(wù)調(diào)度的執(zhí)行情況統(tǒng)一監(jiān)測。
通過上面的問題可以了解到分布式系統(tǒng)下需要一個滿足高可用、容錯管理、負(fù)載均衡等功能的任務(wù)調(diào)度平臺來實(shí)現(xiàn)任務(wù)調(diào)度。分布式系統(tǒng)下,也有許多可以實(shí)現(xiàn)任務(wù)調(diào)度的第三方的分布式任務(wù)調(diào)度系統(tǒng),如 xxl-job、Quartz、elastic-job 等等常用的分布式任務(wù)調(diào)度系統(tǒng)。
如何使用 xxl-job
作為開源軟件的 xxl-job,可以在 github 或 gitee上查看和下載 xxl-job 的源碼。
下面將介紹我使用 xxl-job 的流程(如果有操作不當(dāng)?shù)模梢圆榭垂俜降闹形奈臋n:https://www.xuxueli.com/xxl-job)
dokcer 下安裝 xxl-job:
1、docker 下拉取 xxl-job 的鏡像(這里使用 2.3.1 版本)
docker pull xuxueli/xxl-job-admin:2.3.1
2、創(chuàng)建映射容器的文件目錄
mkdir -p -m 777 /mydata/xxl-job/data/applogs
3、在 /mydata/xxl-job 的目錄下創(chuàng)建 application.properties 文件
由于 application.properties 的代碼過長,這里就不展示了,需要的可以去 gitee 上獲取,具體路徑如圖:
圖片
這里需要注意數(shù)據(jù)庫位置的填寫:
圖片
如果還需要更改端口的可以更改這里:
圖片
這里還需要注意告警郵箱和訪問口令(后續(xù)Spring Boot配置用到):
圖片
4、將 tables_xxl-job.sql 文件導(dǎo)入上面步驟3指定的數(shù)據(jù)庫(自己填寫的那個數(shù)據(jù)庫)
同樣由于文件代碼過長,這里展示 gitee 上獲取的路徑圖:
圖片
5、執(zhí)行 docker 命令
注意這里的 -p 8088:8088 是因?yàn)槲腋牧饲懊?application.porperties 文件的端口號為 8088,所以這里我執(zhí)行的 docker 命令為 -p 8088:8088,如果沒有更改的這里一定要改為 -p 8080:8080。
docker run -p 8088:8088 \
-d --name=xxl-job-admin --restart=always \
-v /mydata/xxl-job/application.properties:/application.properties \
-v /mydata/xxl-job/data/applogs:/data/applogs \
-e PARAMS='--spring.config.locatinotallow=/application.properties' xuxueli/xxl-job-admin:2.3.1
執(zhí)行后通過 docker ps 查看是否成功運(yùn)行,如果失敗可以通過 docker logs xxl-job-admin 查看具體錯誤日志。
6、通過 http://192.168.101.25:8088/xxl-job-admin/ 訪問(這里ip和端口是自己的)
圖片
賬號:admin 密碼:123456
到這里就算是完成了 xxl-job 在 docker 的搭建。
Spring Boot 項目集成 xxl-job
xxl-job 由 調(diào)度中心 和 執(zhí)行器 組成,上面已經(jīng)完成了在 docker 上部署調(diào)度中心了,接下來介紹怎么配置部署執(zhí)行器項目。
1、在 Spring Boot 項目中導(dǎo)入 maven 依賴
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>
這里需要注意版本號與 xxl-job 版本需要一致,這里我配置的都是 2.3.1 版本。
2、在 Spring Boot 項目中配置 application.yml 文件
xxl:
job:
admin:
addresses: http://192.168.101.25:8088/xxl-job-admin
executor:
appname: media-process-service
address:
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken: default_token
- 這里的 xxl.job.admin.addresses 用于指定調(diào)度中心的地址。
- 這里的 xxl.job.accessToken 用于指定訪問口令(也就是前面搭建 xxl-job 中步驟3指定的)。
- 這里的 xxl.job.executor.appname 用于指定執(zhí)行器的名稱(需要與后續(xù)配置執(zhí)行器的名稱一致)。關(guān)注工眾號:碼猿技術(shù)專欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊!
- 這里的 xxl.job.executor.port 用于指定執(zhí)行器的端口(執(zhí)行器實(shí)際上是一個內(nèi)嵌的 Server,默認(rèn)端口為9999,配置多個同一服務(wù)實(shí)例時需要指定不同的執(zhí)行器端口,否則會端口沖突)。
- 其他屬性只需要照著配置即可(想要了解屬性的具體含義可以查看中文文檔中的2.4配置部署執(zhí)行器項目章節(jié))。
3、編寫配置類
/**
* XXL-JOB配置類
*
* @author 公眾號:碼猿技術(shù)專欄
*/
@Slf4j
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
4、調(diào)度中心中新增執(zhí)行器
圖片
執(zhí)行器的配置屬性:
- AppName: 每個執(zhí)行器集群的唯一標(biāo)示 AppName,執(zhí)行器會周期性以 AppName 為對象進(jìn)行自動注冊??赏ㄟ^該配置自動發(fā)現(xiàn)注冊成功的執(zhí)行器,供任務(wù)調(diào)度時使用。
- 名稱: 執(zhí)行器的名稱(可以使用中文更好地體現(xiàn)該執(zhí)行器是用來干嘛的)。
- 注冊方式:調(diào)度中心獲取執(zhí)行器地址的方式(一般為了方便可以選用自動注冊即可)。
- 自動注冊:執(zhí)行器自動進(jìn)行執(zhí)行器注冊,調(diào)度中心通過底層注冊表可以動態(tài)發(fā)現(xiàn)執(zhí)行器機(jī)器地址。
- 手動錄入:人工手動錄入執(zhí)行器的地址信息,多地址逗號分隔,供調(diào)度中心使用。
- 機(jī)器地址:"注冊方式"為"手動錄入"時有效,支持人工維護(hù)執(zhí)行器的地址信息。
5、配置自定義任務(wù)
配置自定義任務(wù)有許多種模式,如 Bean模式(基于方法)、Bean模式(基于類)、GLUE模式等等。這里介紹通過 Bean模式(基于方法) 是如何自定義任務(wù)的(對于其余的模式可以參考官方文檔)。
Bean模式(基于方法)也就是每個任務(wù)對應(yīng)一個方法,通過添加 @XxLJob(value="自定義JobHandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法") 注解即可完成定義。
/**
* 任務(wù)處理類
*
* @author 公眾號:碼猿技術(shù)專欄
*/
@Component
public class TestJob {
/**
* 測試任務(wù)
*/
@XxlJob("testHandler")
public void testHandler() {
XxlJobHelper.handleSuccess("本次測試任務(wù)調(diào)度成功");
}
}
- 通過注解也可以指定 初始化方法和銷毀方法,如果不填寫可以直接寫一個 自定義的JobHandler名稱 用于后面在調(diào)度中心中配置任務(wù)時對應(yīng)任務(wù)的 JobHandler 屬性值。
- 可以通過 XxlJobHelper.log 來打印日志,通過調(diào)度中心可以查看執(zhí)行日志的情況。
- 可以通過 XxlJobHelper.handleFail 或 XxlJobHelper.handleSuccess 手動設(shè)置任務(wù)調(diào)度的結(jié)果(不設(shè)置時默認(rèn)結(jié)果為成功狀態(tài),除非任務(wù)執(zhí)行時出現(xiàn)異常)。
6、調(diào)度中心中新增任務(wù)
圖片
這里主要注意 Cron 表達(dá)式的時間配置以及 JobHandler 的值需要與自定義任務(wù)方法的注解上的 value 屬性值一致即可。
關(guān)于高級配置這里放一張中文文檔的詳細(xì)說明(也可以直接去看文檔):
圖片
需要搭建集群或過期策略等高級玩法時可以進(jìn)行配置。
到這里就完成了 SpringBoot 集成 xxl-job 實(shí)現(xiàn)分布式任務(wù)調(diào)度的全過程了,接下來會通過一個實(shí)戰(zhàn)案例來具體看看 xxl-job 的用處。
xxl-job 實(shí)戰(zhàn)
下面通過一個最近自己在跟著做的學(xué)習(xí)項目中使用到 xxl-job 的場景案例來具體了解一下如何利用 xxl-job 來實(shí)現(xiàn)任務(wù)調(diào)度。
實(shí)戰(zhàn)背景
當(dāng)前項目需要對上傳到分布式文件系統(tǒng) minio 中的視頻文件進(jìn)行統(tǒng)一格式的視頻轉(zhuǎn)碼操作,由于本身視頻轉(zhuǎn)碼操作會帶了很大的時間消耗以及 CPU 的開銷,所以考慮集群服務(wù)下使用 xxl-job 的方式以任務(wù)調(diào)度的方式定時處理視頻轉(zhuǎn)碼操作。
這樣可以帶來兩個好處:① 以任務(wù)調(diào)度的方式,可以使得視頻轉(zhuǎn)碼操作不會阻塞主線程,避免影響主要業(yè)務(wù)的吞吐量;關(guān)注工眾號:碼猿技術(shù)專欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊?、?以集群服務(wù)分片接收任務(wù)的方式,可以將任務(wù)均分給每個機(jī)器使得任務(wù)調(diào)度可以并行執(zhí)行,提高總?cè)蝿?wù)處理時間以及降低單臺機(jī)器 CPU 的開銷;
xxl-job 執(zhí)行流程圖
xxl-job實(shí)戰(zhàn).png
怎么將任務(wù)均分給每臺服務(wù)器?
由于任務(wù)執(zhí)行時間過長,需要搭建集群服務(wù)來做到并行任務(wù)調(diào)度,從而減小 CPU 的開銷,那么怎么均分任務(wù)呢?
利用 xxl-job 在集群部署時,配置路由策略中選擇 分片廣播 的方式,可以使一次任務(wù)調(diào)度會廣播觸發(fā)集群中所有的執(zhí)行器執(zhí)行一次任務(wù),并且可以向系統(tǒng)傳遞分片參數(shù)。
圖片
利用這一特性可以根據(jù) 當(dāng)前執(zhí)行器的分片序號和分片總數(shù) 來獲取對應(yīng)的任務(wù)記錄。
先來看看 Bean 模式下怎么獲取分片序號和分片總數(shù):
// 分片序號(當(dāng)前執(zhí)行器序號)
int shardIndex = XxlJobHelper.getShardIndex();
// 分片總數(shù)(執(zhí)行器總數(shù))
int shardTotal = XxlJobHelper.getShardTotal();
有了這兩個屬性,當(dāng)執(zhí)行器掃描數(shù)據(jù)庫獲取記錄時,可以根據(jù) 取模 的方式獲取屬于當(dāng)前執(zhí)行器的任務(wù),可以這樣編寫 sql 獲取任務(wù)記錄:
select * from media_process m
where m.id % #{shareTotal} = #{shareIndex}
and (m.status = '1' or m.status = '3')
and m.fail_count < 3
limit #{count}
掃描任務(wù)表,根據(jù)任務(wù) id 對分片總數(shù) 取模 來實(shí)現(xiàn)對所有分片的均分任務(wù),通過判斷是否是當(dāng)前分片序號,并且當(dāng)前任務(wù)狀態(tài)為 1(未處理)或 3(處理失?。┎⑶耶?dāng)前任務(wù)失敗次數(shù)小于3次時可以取得當(dāng)前任務(wù)。每次掃描只取出 count 個任務(wù)數(shù)(批量處理)。
圖片
因此通過 xxl-job 的分片廣播 + 取模 的方式即可實(shí)現(xiàn)對集群服務(wù)均分任務(wù)的操作。
怎么確保任務(wù)不會被重復(fù)消費(fèi)?
由于視頻轉(zhuǎn)碼本身處理時間就會比較長,所以更不允許服務(wù)重復(fù)執(zhí)行,雖然上面通過分片廣播+取模的方式提高了任務(wù)不會被重復(fù)執(zhí)行的機(jī)率,但是依舊存在如下情況:
如下圖,有三臺集群機(jī)器和六個任務(wù),剛開始分配好了每臺機(jī)器兩個任務(wù),執(zhí)行器0正準(zhǔn)備執(zhí)行任務(wù)3時,剛好執(zhí)行器2宕機(jī)了,此時執(zhí)行器1剛好執(zhí)行一次任務(wù),因?yàn)榉制倲?shù)減小,導(dǎo)致執(zhí)行器1重新分配到需要執(zhí)行的任務(wù)正好也是任務(wù)3,那么此時就會出現(xiàn)執(zhí)行器0和執(zhí)行器1都在執(zhí)行任務(wù)3的情況。
圖片
那么這種情況就需要實(shí)現(xiàn)冪等性了,冪等性有很多種實(shí)現(xiàn)方法,有興趣了解的可以參考:接口冪等性的實(shí)現(xiàn)方案
這里使用樂觀鎖的方式實(shí)現(xiàn)冪等性,具體 sql 如下:
update media_process m
set m.status = '2'
where (m.status = '1' or m.status = '3')
and m.fail_count < 3
and m.id = #{id}
這里只需要依靠任務(wù)的狀態(tài)即可實(shí)現(xiàn)(未處理1;處理中2;處理失敗3;處理成功4),可以看到這里類似于 CAS 的方式通過比較和設(shè)置的方式只有在狀態(tài)為未處理或處理失敗時才能設(shè)置為處理中。這樣在并發(fā)場景下,即使多個執(zhí)行器同時處理該任務(wù),也只有一個任務(wù)可以設(shè)置成功進(jìn)入處理任務(wù)階段。
為了真正達(dá)到冪等性,還需要設(shè)置一下 xxl-job 的調(diào)度過期策略和阻塞處理策略來保證真正的冪等性。分別設(shè)置為 忽略(調(diào)度過期后,忽略過期的任務(wù),從當(dāng)前時間開始重新計算下次觸發(fā)時間) 和 丟棄后續(xù)調(diào)度(調(diào)度請求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),本次請求將會被丟棄并標(biāo)記為失敗)。
圖片
編寫完成該功能所需的所有任務(wù)
1、分片視頻轉(zhuǎn)碼處理
代碼(這里的代碼只展示部分核心步驟代碼):
/**
* 視頻轉(zhuǎn)碼處理任務(wù)
*/
@XxlJob("videoTranscodingHandler")
public void videoTranscodingHandler() throws InterruptedException {
// 1. 分片獲取當(dāng)前執(zhí)行器需要執(zhí)行的所有任務(wù)
List<MediaProcess> mediaProcessList = mediaProcessService.getMediaProcessList(shardIndex, shardTotal, count);
// 通過JUC工具類阻塞直到所有任務(wù)執(zhí)行完
CountDownLatch countDownLatch = new CountDownLatch(mediaProcessList.size());
// 遍歷所有任務(wù)
mediaProcessList.forEach(mediaProcess -> {
// 以多線程的方式執(zhí)行所有任務(wù)
executor.execute(() -> {
try {
// 2. 嘗試搶占任務(wù)(通過樂觀鎖實(shí)現(xiàn))
boolean res = mediaProcessService.startTask(id);
if (!res) {
XxlJobHelper.log("任務(wù)搶占失敗,任務(wù)id{}", id);
return;
}
// 3. 從minio中下載視頻到本地
File file = mediaFileService.downloadFileFromMinIO(bucket, objectName);
// 下載失敗
if (file == null) {
XxlJobHelper.log("下載視頻出錯,任務(wù)id:{},bucket:{},objectName:{}", id, bucket, objectName);
// 出現(xiàn)異常重置任務(wù)狀態(tài)為處理失敗等待下一次處理
mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "下載視頻到本地失敗");
return;
}
// 4. 視頻轉(zhuǎn)碼
String result = videoUtil.generateMp4();
if (!result.equals("success")) {
XxlJobHelper.log("視頻轉(zhuǎn)碼失敗,原因:{},bucket:{},objectName:{},", result, bucket, objectName);
// 出現(xiàn)異常重置任務(wù)狀態(tài)為處理失敗等待下一次處理
mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "視頻轉(zhuǎn)碼失敗");
return;
}
// 5. 上傳轉(zhuǎn)碼后的文件
boolean b1 = mediaFileService.addMediaFilesToMinIO(new_File.getAbsolutePath(), "video/mp4", bucket, objectNameMp4);
if (!b1) {
XxlJobHelper.log("上傳 mp4 到 minio 失敗,任務(wù)id:{}", id);
// 出現(xiàn)異常重置任務(wù)狀態(tài)為處理失敗等待下一次處理
mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "上傳 mp4 文件到 minio 失敗");
return;
}
// 6. 更新任務(wù)狀態(tài)為成功
mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.SUCCESS.getValue(), fileId, url, "創(chuàng)建臨時文件異常");
} finally {
countDownLatch.countDown();
}
});
});
// 阻塞直到所有方法執(zhí)行完成(30min后不再等待)
countDownLatch.await(30, TimeUnit.MINUTES);
}
核心任務(wù) - 分片獲取任務(wù)后執(zhí)行視頻轉(zhuǎn)碼任務(wù),步驟如下:
- 通過 分片廣播拿到的參數(shù)以取模的方式 獲取當(dāng)前執(zhí)行器所屬的任務(wù)記錄集合
- 遍歷集合,以 多線程的方式 并發(fā)地執(zhí)行任務(wù)
- 每次執(zhí)行任務(wù)前需要先通過 數(shù)據(jù)庫樂觀鎖的方式 搶占當(dāng)前任務(wù),搶占到才能執(zhí)行;關(guān)注工眾號:碼猿技術(shù)專欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊!
- 執(zhí)行任務(wù)過程分為 分布式文件系統(tǒng)下載需要轉(zhuǎn)碼的視頻文件 -> 視頻轉(zhuǎn)碼 -> 上傳轉(zhuǎn)碼后的視頻 -> 更新任務(wù)狀態(tài)(處理成功)
- 使用JUC工具類 CountDownLatch 實(shí)現(xiàn)所有任務(wù)執(zhí)行完后才退出方法
- 中間使用 xxl-job 的日志記錄錯誤信息和執(zhí)行結(jié)果
2、清理任務(wù)表中轉(zhuǎn)碼成功的任務(wù)的記錄并將其插入任務(wù)歷史表
由于任務(wù)表處理完任務(wù)后只是更新任務(wù)狀態(tài),這樣隨著任務(wù)增多會導(dǎo)致檢索起來時間消耗過大,所以使用任務(wù)調(diào)度的方式定期掃描任務(wù)表,將任務(wù)狀態(tài)為處理成功的任務(wù)刪除并重新插入任務(wù)歷史表中留存(由于代碼過于簡單,這里就不做展示了)。
主要實(shí)現(xiàn)兩個功能:① 清理任務(wù)表中已成功處理的任務(wù);② 將處理成功的任務(wù)記錄插入歷史表中;
3、視頻補(bǔ)償機(jī)制
由于使用樂觀鎖會將任務(wù)狀態(tài)更新為處理中,如果此時執(zhí)行任務(wù)的執(zhí)行器(服務(wù))宕機(jī)了,會導(dǎo)致該任務(wù)記錄一直存在,因?yàn)闃酚^鎖的原因別的執(zhí)行器也無法獲取,這個時候同樣需要使用任務(wù)調(diào)度的方式,定期掃描任務(wù)表,判斷任務(wù)是否處于處理中狀態(tài)并且任務(wù)創(chuàng)建時間遠(yuǎn)大于30分鐘,則說明任務(wù)超時了,則是使用任務(wù)調(diào)度的方式重新更新任務(wù)的狀態(tài)為未處理,等待下一次視頻轉(zhuǎn)碼任務(wù)的調(diào)度處理。此外視頻補(bǔ)償機(jī)制任務(wù)調(diào)度還需要檢查是否存在任務(wù)最大次數(shù)已經(jīng)大于3次的,如果存在則交付給人工處理(由于代碼過于簡單,這里就不做展示了)。
主要實(shí)現(xiàn)兩個功能:① 處理任務(wù)超時情況下的任務(wù),做出補(bǔ)償;② 處理失敗次數(shù)大于3次的任務(wù),做出補(bǔ)償;
測試并查看日志
準(zhǔn)備好的任務(wù)表記錄:
圖片
啟動三臺媒資服務(wù)器,并開啟任務(wù):
圖片
可以單獨(dú)查看每個任務(wù)的日志:
圖片
通過日志中的執(zhí)行日志查看具體日志信息:
圖片
可以看到直接為了測試改錯的路徑導(dǎo)致下載視頻出錯:
圖片
查看數(shù)據(jù)庫表的變化:
圖片
到這里可以看到核心的視頻轉(zhuǎn)碼任務(wù)執(zhí)行成功,并且邏輯正確,能夠起到分布式任務(wù)調(diào)度的作用。
總結(jié)
這就是本次 xxl-job 實(shí)戰(zhàn)的全部內(nèi)容了,寫這篇文章主要是為了記錄一下項目中是如何使用 xxl-job 的,并且提供一種分片廣播均分任務(wù)的思路以及冪等性問題如何處理,具體使用 xxl-job 還需根據(jù)自己項目的需求,遇到問題可以參考官網(wǎng)。