Nodejs線程池的設(shè)計與實現(xiàn)
本文轉(zhuǎn)載自微信公眾號「編程雜技」,作者theanarkh。轉(zhuǎn)載本文請聯(lián)系編程雜技公眾號。
前言:之前的版本不方便開放,重新設(shè)計了一版nodejs的線程池庫,本文介紹該庫的一些設(shè)計和實現(xiàn)。
nodejs雖然提供了線程的能力,但是很多時候,往往不能直接使用線程或者無限制地創(chuàng)建線程,比如我們有一個功能是cpu密集型的,如果一個請求就開一個線程,這很明顯不是最好的實踐,這時候,我們需要使用池化的技術(shù),本文介紹在nodejs線程模塊的基礎(chǔ)上,如何設(shè)計和實現(xiàn)一個線程池庫(https://github.com/theanarkh/nodejs-threadpool或npm i nodejs-threadpool )。下面是線程池的總體架構(gòu)。
設(shè)計一個線程池,在真正寫代碼之前,有很多設(shè)計需要考慮,大概如下:
1任務(wù)隊列的設(shè)計,一個隊列,多個線程互斥訪問,或者每個線程一個隊列,不需要互斥訪問。
2 線程退出的設(shè)計,可以由主線程檢測空閑線程,然后使子線程退出。或者子線程退出,通知主線程。空閑不一定是沒有任務(wù)就退出,可以設(shè)計空閑時間達(dá)到閾值后退出,因為創(chuàng)建線程是有時間開銷的。
3 任務(wù)數(shù)的設(shè)計,每個線程可以有個任務(wù)數(shù),還可以增加一個總?cè)蝿?wù)數(shù),即全部線程任務(wù)數(shù)加起來
4 選擇線程的設(shè)計,選擇任務(wù)數(shù)最少的線程。
5 線程類型的設(shè)計,可以區(qū)分核心線程和預(yù)備線程,任務(wù)少的時候,核心線程處理就行。任務(wù)多也創(chuàng)建預(yù)備線程幫忙處理。
6 線程池類型的設(shè)計,cpu密集型的,線程數(shù)等于核數(shù),否則自定義線程數(shù)就行。
7 支持任務(wù)的取消和超時機(jī)制,防止一個任務(wù)時間過長或者死循環(huán)。
本文介紹的線程池具體設(shè)計思想如下(參考java):
1 主線程維護(hù)一個隊列,子線程的任務(wù)由子線程負(fù)責(zé)分發(fā),不需要互斥訪問,子線程也不需要維護(hù)自己的隊列。
2 線程退出的設(shè)計,主線程負(fù)責(zé)檢查子線程空閑時間是否達(dá)到閾值,是則使子線程退出。
3 任務(wù)數(shù)的設(shè)計,主線程負(fù)責(zé)管理任務(wù)個數(shù)并應(yīng)有相應(yīng)的策略。
4 選擇線程的設(shè)計,選擇任務(wù)數(shù)最少的線程。
5 線程類型的設(shè)計,區(qū)分核心線程和預(yù)備線程,任務(wù)少的時候,核心線程處理就行。任務(wù)多也創(chuàng)建預(yù)備線程幫忙處理。
6 線程池類型的設(shè)計,cpu密集型的,線程數(shù)等于核數(shù),否則自定義線程數(shù)就行。
7 支持任務(wù)的取消和超時機(jī)制,超時或者取消的時候,主線程判斷任務(wù)是待執(zhí)行還是正在執(zhí)行,如果是待執(zhí)行則從任務(wù)隊列中刪除,如果是正在執(zhí)行則殺死對應(yīng)的子線程。下面我們看一下具體的設(shè)計。
1 主線程和子線程通信的數(shù)據(jù)結(jié)構(gòu)
- // 任務(wù)類,一個任務(wù)對應(yīng)一個id
 - class Work {
 - constructor({workId, filename, options}) {
 - // 任務(wù)id
 - this.workId = workId;
 - // 任務(wù)邏輯,字符串或者js文件路徑
 - this.filename = filename;
 - // 任務(wù)返回的結(jié)果
 - this.data = null;
 - // 任務(wù)返回的錯誤
 - this.error = null;
 - // 執(zhí)行任務(wù)時傳入的參數(shù),用戶定義
 - this.options = options;
 - }
 - }
 
主線程給子線程分派一個任務(wù)的時候,就給子線程發(fā)送一個Work對象。在nodejs中線程間通信需要經(jīng)過序列化和反序列化,所以通信的數(shù)據(jù)結(jié)構(gòu)包括的信息不能過多。
2 子線程處理任務(wù)邏輯
- const { parentPort } = require('worker_threads');
 - const vm = require('vm');
 - const { isFunction, isJSFile } = require('./utils');
 - // 監(jiān)聽主線程提交過來的任務(wù)
 - parentPort.on('message', async (work) => {
 - try {
 - const { filename, options } = work;
 - let aFunction;
 - if (isJSFile(filename)) {
 - aFunction = require(filename);
 - } else {
 - aFunction = vm.runInThisContext(`(${filename})`);
 - }
 - if (!isFunction(aFunction)) {
 - throw new Error('work type error: js file or string');
 - }
 - work.data = await aFunction(options);
 - parentPort.postMessage({event: 'done', work});
 - } catch (error) {
 - work.error = error.toString();
 - parentPort.postMessage({event: 'error', work});
 - }
 - });
 - process.on('uncaughtException', (...rest) => {
 - console.error(...rest);
 - });
 - process.on('unhandledRejection', (...rest) => {
 - console.error(...rest);
 - });
 
子線程的邏輯比較簡單,就是監(jiān)聽主線程分派過來的任務(wù),然后執(zhí)行任務(wù),執(zhí)行完之后通知主線程。任務(wù)支持js文件和字符串代碼的形式。需要返回一個Promise或者async函數(shù)。用于用于通知主線程任務(wù)已經(jīng)完成。
3 線程池和業(yè)務(wù)的通信
- // 提供給用戶側(cè)的接口
 - class UserWork extends EventEmitter {
 - constructor({ workId }) {
 - super();
 - // 任務(wù)id
 - this.workId = workId;
 - // 支持超時取消任務(wù)
 - this.timer = null;
 - // 任務(wù)狀態(tài)
 - this.state = WORK_STATE.PENDDING;
 - }
 - // 超時后取消任務(wù)
 - setTimeout(timeout) {
 - this.timer = setTimeout(() => {
 - this.timer && this.cancel() && this.emit('timeout');
 - }, ~~timeout);
 - }
 - // 取消之前設(shè)置的定時器
 - clearTimeout() {
 - clearTimeout(this.timer);
 - this.timer = null;
 - }
 - // 直接取消任務(wù),如果執(zhí)行完了就不能取消了,this.terminate是動態(tài)設(shè)置的
 - cancel() {
 - if (this.state === WORK_STATE.END || this.state === WORK_STATE.CANCELED) {
 - return false;
 - } else {
 - this.terminate();
 - return true;
 - }
 - }
 - // 修改任務(wù)狀態(tài)
 - setState(state) {
 - this.state = state;
 - }
 - }
 
業(yè)務(wù)提交一個任務(wù)給線程池的時候,線程池會返回一個UserWork類,業(yè)務(wù)側(cè)通過UserWork類和線程池通信。
4 管理子線程的數(shù)據(jù)結(jié)構(gòu)
- // 管理子線程的數(shù)據(jù)結(jié)構(gòu)
 - class Thread {
 - constructor({ worker }) {
 - // nodejs的Worker對象,nodejs的worker_threads模塊的Worker
 - this.worker = worker;
 - // 線程狀態(tài)
 - this.state = THREAD_STATE.IDLE;
 - // 上次工作的時間
 - this.lastWorkTime = Date.now();
 - }
 - // 修改線程狀態(tài)
 - setState(state) {
 - this.state = state;
 - }
 - // 修改線程最后工作時間
 - setLastWorkTime(time) {
 - this.lastWorkTime = time;
 - }
 - }
 
線程池中維護(hù)了多個子線程,Thread類用于管理子線程的信息。
5 線程池 線程池的實現(xiàn)是核心,我們分為幾個部分講。
5.1 支持的配置
- constructor(options = {}) {
 - this.options = options;
 - // 子線程隊列
 - this.workerQueue = [];
 - // 核心線程數(shù)
 - this.coreThreads = ~~options.coreThreads || config.CORE_THREADS;
 - // 線程池最大線程數(shù),如果不支持動態(tài)擴(kuò)容則最大線程數(shù)等于核心線程數(shù)
 - this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;
 - // 超過任務(wù)隊列長度時的處理策略
 - this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD;
 - // 是否預(yù)創(chuàng)建子線程
 - this.preCreate = options.preCreate === true;
 - // 線程最大空閑時間,達(dá)到后自動退出
 - this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME;
 - // 是否預(yù)創(chuàng)建線程池
 - this.preCreate && this.preCreateThreads();
 - // 保存線程池中任務(wù)對應(yīng)的UserWork
 - this.workPool = {};
 - // 線程池中當(dāng)前可用的任務(wù)id,每次有新任務(wù)時自增1
 - this.workId = 0;
 - // 線程池中的任務(wù)隊列
 - this.queue = [];
 - // 線程池總?cè)蝿?wù)數(shù)
 - this.totalWork = 0;
 - // 支持的最大任務(wù)數(shù)
 - this.maxWork = ~~options.maxWork || config.MAX_WORK;
 - // 處理任務(wù)的超時時間,全局配置
 - this.timeout = ~~options.timeout;
 - this.pollIdle();
 - }
 
上面的代碼列出了線程池所支持的能力。
5.2 創(chuàng)建線程
- newThread() {
 - const worker = new Worker(workerPath);
 - const thread = new Thread({worker});
 - this.workerQueue.push(thread);
 - const threadId = worker.threadId;
 - worker.on('exit', () => {
 - // 找到該線程對應(yīng)的數(shù)據(jù)結(jié)構(gòu),然后刪除該線程的數(shù)據(jù)結(jié)構(gòu)
 - const position = this.workerQueue.findIndex(({worker}) => {
 - return worker.threadId === threadId;
 - });
 - const exitedThread = this.workerQueue.splice(position, 1);
 - // 退出時狀態(tài)是BUSY說明還在處理任務(wù)(非正常退出)
 - this.totalWork -= exitedThread.state === THREAD_STATE.BUSY ? 1 : 0;
 - });
 - // 和子線程通信
 - worker.on('message', (result) => {
 - const {
 - work,
 - event,
 - } = result;
 - const { data, error, workId } = work;
 - // 通過workId拿到對應(yīng)的userWork
 - const userWork = this.workPool[workId];
 - // 不存在說明任務(wù)被取消了
 - if (!userWork) {
 - return;
 - }
 - // 修改線程池數(shù)據(jù)結(jié)構(gòu)
 - this.endWork(userWork);
 - // 修改線程數(shù)據(jù)結(jié)構(gòu)
 - thread.setLastWorkTime(Date.now());
 - // 還有任務(wù)則通知子線程處理,否則修改子線程狀態(tài)為空閑
 - if (this.queue.length) {
 - // 從任務(wù)隊列拿到一個任務(wù)交給子線程
 - this.submitWorkToThread(thread, this.queue.shift());
 - } else {
 - thread.setState(THREAD_STATE.IDLE);
 - }
 - switch(event) {
 - case 'done':
 - // 通知用戶,任務(wù)完成
 - userWork.emit('done', data);
 - break;
 - case 'error':
 - // 通知用戶,任務(wù)出錯
 - if (EventEmitter.listenerCount(userWork, 'error')) {
 - userWork.emit('error', error);
 - }
 - break;
 - default: break;
 - }
 - });
 - worker.on('error', (...rest) => {
 - console.error(...rest);
 - });
 - return thread;
 - }
 
創(chuàng)建線程,并保持線程對應(yīng)的數(shù)據(jù)結(jié)構(gòu)、退出、通信管理、任務(wù)分派。子線程執(zhí)行完任務(wù)后,會通知線程池,主線程通知用戶。
5.3 選擇線程
- selectThead() {
 - // 找出空閑的線程,把任務(wù)交給他
 - for (let i = 0; i < this.workerQueue.length; i++) {
 - if (this.workerQueue[i].state === THREAD_STATE.IDLE) {
 - return this.workerQueue[i];
 - }
 - }
 - // 沒有空閑的則隨機(jī)選擇一個
 - return this.workerQueue[~~(Math.random() * this.workerQueue.length)];
 - }
 
當(dāng)用戶給線程池提交一個任務(wù)時,線程池會選擇一個空閑的線程處理該任務(wù)。如果沒有可用線程則任務(wù)插入待處理隊列等待處理。
5.4 提交任務(wù)
- // 給線程池提交一個任務(wù)
 - submit(filename, options = {}) {
 - return new Promise(async (resolve, reject) => {
 - let thread;
 - // 沒有線程則創(chuàng)建一個
 - if (this.workerQueue.length) {
 - thread = this.selectThead();
 - // 該線程還有任務(wù)需要處理
 - if (thread.state === THREAD_STATE.BUSY) {
 - // 子線程個數(shù)還沒有達(dá)到核心線程數(shù),則新建線程處理
 - if (this.workerQueue.length < this.coreThreads) {
 - thread = this.newThread();
 - } else if (this.totalWork + 1 > this.maxWork){
 - // 總?cè)蝿?wù)數(shù)已達(dá)到閾值,還沒有達(dá)到線程數(shù)閾值,則創(chuàng)建
 - if(this.workerQueue.length < this.maxThreads) {
 - thread = this.newThread();
 - } else {
 - // 處理溢出的任務(wù)
 - switch(this.discardPolicy) {
 - case DISCARD_POLICY.ABORT:
 - return reject(new Error('queue overflow'));
 - case DISCARD_POLICY.CALLER_RUN:
 - const workId = this.generateWorkId();
 - const userWork = new UserWork({workId});
 - userWork.setState(WORK_STATE.RUNNING);
 - userWork.terminate = () => {
 - userWork.setState(WORK_STATE.CANCELED);
 - };
 - this.timeout && userWork.setTimeout(this.timeout);
 - resolve(userWork);
 - try {
 - let aFunction;
 - if (isJSFile(filename)) {
 - aFunction = require(filename);
 - } else {
 - aFunction = vm.runInThisContext(`(${filename})`);
 - }
 - if (!isFunction(aFunction)) {
 - throw new Error('work type error: js file or string');
 - }
 - const result = await aFunction(options);
 - // 延遲通知,讓用戶有機(jī)會取消或者注冊事件
 - setImmediate(() => {
 - if (userWork.state !== WORK_STATE.CANCELED) {
 - userWork.setState(WORK_STATE.END);
 - userWork.emit('done', result);
 - }
 - });
 - } catch (error) {
 - setImmediate(() => {
 - if (userWork.state !== WORK_STATE.CANCELED) {
 - userWork.setState(WORK_STATE.END);
 - userWork.emit('error', error.toString());
 - }
 - });
 - }
 - return;
 - case DISCARD_POLICY.OLDEST_DISCARD:
 - const work = this.queue.shift();
 - // maxWork為1時,work會為空
 - if (work && this.workPool[work.workId]) {
 - this.cancelWork(this.workPool[work.workId]);
 - } else {
 - return reject(new Error('no work can be discarded'));
 - }
 - break;
 - case DISCARD_POLICY.DISCARD:
 - return reject(new Error('discard'));
 - case DISCARD_POLICY.NOT_DISCARD:
 - break;
 - default:
 - break;
 - }
 - }
 - }
 - }
 - } else {
 - thread = this.newThread();
 - }
 - // 生成一個任務(wù)id
 - const workId = this.generateWorkId();
 - // 新建一個UserWork
 - const userWork = new UserWork({workId});
 - this.timeout && userWork.setTimeout(this.timeout);
 - // 新建一個work
 - const work = new Work({ workId, filename, options });
 - // 修改線程池數(shù)據(jù)結(jié)構(gòu),把UserWork和Work關(guān)聯(lián)起來
 - this.addWork(userWork);
 - // 選中的線程正在處理任務(wù),則先緩存到任務(wù)隊列
 - if (thread.state === THREAD_STATE.BUSY) {
 - this.queue.push(work);
 - userWork.terminate = () => {
 - this.cancelWork(userWork);
 - this.queue = this.queue.filter((node) => {
 - return node.workId !== work.workId;
 - });
 - }
 - } else {
 - this.submitWorkToThread(thread, work);
 - }
 - resolve(userWork);
 - })
 - }
 - submitWorkToThread(thread, work) {
 - const userWork = this.workPool[work.workId];
 - userWork.setState(WORK_STATE.RUNNING);
 - // 否則交給線程處理,并修改狀態(tài)和記錄該線程當(dāng)前處理的任務(wù)id
 - thread.setState(THREAD_STATE.BUSY);
 - thread.worker.postMessage(work);
 - userWork.terminate = () => {
 - this.cancelWork(userWork);
 - thread.setState(THREAD_STATE.DEAD);
 - thread.worker.terminate();
 - }
 - }
 - addWork(userWork) {
 - userWork.setState(WORK_STATE.PENDDING);
 - this.workPool[userWork.workId] = userWork;
 - this.totalWork++;
 - }
 - endWork(userWork) {
 - delete this.workPool[userWork.workId];
 - this.totalWork--;
 - userWork.setState(WORK_STATE.END);
 - userWork.clearTimeout();
 - }
 - cancelWork(userWork) {
 - delete this.workPool[userWork.workId];
 - this.totalWork--;
 - userWork.setState(WORK_STATE.CANCELED);
 - userWork.emit('cancel');
 - }
 
提交任務(wù)是線程池暴露給用戶側(cè)的接口,主要處理的邏輯包括,根據(jù)當(dāng)前的策略判斷是否需要新建線程、選擇線程處理任務(wù)、排隊任務(wù)等,如果任務(wù)數(shù)達(dá)到閾值,則根據(jù)丟棄策略處理該任務(wù)。
5.5 空閑處理
- pollIdle() {
 - setTimeout(() => {
 - for (let i = 0; i < this.workerQueue.length; i++) {
 - const node = this.workerQueue[i];
 - if (node.state === THREAD_STATE.IDLE && Date.now() - node.lastWorkTime > this.maxIdleTime) {
 - node.worker.terminate();
 - }
 - }
 - this.pollIdle();
 - }, 1000);
 - }
 
當(dāng)子線程空閑時間達(dá)到閾值后,主線程會殺死子線程,避免浪費系統(tǒng)資源??偨Y(jié),這就是線程池具體的設(shè)計和實現(xiàn),另外創(chuàng)建線程失敗會導(dǎo)致主線程掛掉,所以使用線程的時候,最后新開一個子進(jìn)程來管理該線程池。
















 
 
 









 
 
 
 