前端代碼經(jīng)常要處理各種異步邏輯。
有的是串行的:
const promise1 = new Promise(function(resolve) {
// 異步邏輯 1...
resolve();
});
const promise2 = new Promise(function(resolve) {
// 異步邏輯 2...
resolve();
});
promise1.then(() => promise2);
await promise1;
await promise2;
有的是并行的:
await Promise.all([promise1, promise2]);
await Promise.race([promise1, promise2]);
并行的異步邏輯有時(shí)還要做并發(fā)控制。
并發(fā)控制是常見(jiàn)的需求,也是面試常考的面試題。
一般我們會(huì)用 p-limit 來(lái)做:
import pLimit from 'p-limit';
const limit = pLimit(2);
const input = [
limit(() => fetchSomething('foo')),
limit(() => fetchSomething('bar')),
limit(() => doSomething())
];
const result = await Promise.all(input);
console.log(result);
比如上面這段邏輯,就是幾個(gè)異步邏輯并行執(zhí)行,并且最大并發(fā)是 2。
那如何實(shí)現(xiàn)這樣的并發(fā)控制呢?
我們自己來(lái)寫(xiě)一個(gè):
首先,要傳入并發(fā)數(shù)量,返回一個(gè)添加并發(fā)任務(wù)的函數(shù),我們把它叫做 generator:
const pLimit = (concurrency) => {
const generator = (fn, ...args) =>
new Promise((resolve) => {
//...
});
return generator;
}
這里添加的并發(fā)任務(wù)要進(jìn)行排隊(duì),所以我們準(zhǔn)備一個(gè) queue,并記錄當(dāng)前在進(jìn)行中的異步任務(wù)。
const queue = [];
let activeCount = 0;
const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});
添加的異步任務(wù)就入隊(duì),也就是 enqueue。
enqueue 做的事情就是把一個(gè)異步任務(wù)添加到 queue 中,并且只要沒(méi)達(dá)到并發(fā)上限就再執(zhí)行一批任務(wù):
const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};
具體運(yùn)行的邏輯是這樣的:
const run = async (fn, resolve, ...args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
計(jì)數(shù),運(yùn)行這個(gè)函數(shù),改變最后返回的那個(gè) promise 的狀態(tài),然后執(zhí)行完之后進(jìn)行下一步處理:
下一步處理自然就是把活躍任務(wù)數(shù)量減一,然后再跑一個(gè)任務(wù):
const next = () => {
activeCount--;
if (queue.length > 0) {
queue.shift()();
}
};
這樣就保證了并發(fā)的數(shù)量限制。
現(xiàn)在的全部代碼如下,只有 40 行代碼:
const pLimit = (concurrency) => {
const queue = [];
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.length > 0) {
queue.shift()();
}
};
const run = async (fn, resolve, ...args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};
const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});
return generator;
};
這就已經(jīng)實(shí)現(xiàn)了并發(fā)控制。
不信我們跑跑看:
準(zhǔn)備這樣一段測(cè)試代碼:
const limit = pLimit(2);
function asyncFun(value, delay) {
return new Promise((resolve) => {
console.log('start ' + value);
setTimeout(() => resolve(value), delay);
});
}
(async function () {
const arr = [
limit(() => asyncFun('aaa', 2000)),
limit(() => asyncFun('bbb', 3000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000))
];
const result = await Promise.all(arr);
console.log(result);
})();
沒(méi)啥好說(shuō)的,就是 setTimeout + promise,設(shè)置不同的 delay 時(shí)間。
并發(fā)數(shù)量為 2。
我們?cè)囅拢?/p>

先并發(fā)執(zhí)行前兩個(gè)任務(wù),2s 的時(shí)候一個(gè)任務(wù)執(zhí)行完,又執(zhí)行了一個(gè)任務(wù),然后再過(guò)一秒,都執(zhí)行完了,有同時(shí)執(zhí)行了兩個(gè)任務(wù)。
經(jīng)過(guò)測(cè)試,我們已經(jīng)實(shí)現(xiàn)了并發(fā)控制!
回顧一下我們實(shí)現(xiàn)的過(guò)程,其實(shí)就是一個(gè)隊(duì)列來(lái)保存任務(wù),開(kāi)始的時(shí)候一次性執(zhí)行最大并發(fā)數(shù)的任務(wù),然后每執(zhí)行完一個(gè)啟動(dòng)一個(gè)新的。
還是比較簡(jiǎn)單的。
上面的 40 行代碼是最簡(jiǎn)化的版本,其實(shí)還有一些可以完善的地方,我們繼續(xù)完善一下。
首先,我們要把并發(fā)數(shù)暴露出去,還要讓開(kāi)發(fā)者可以手動(dòng)清理任務(wù)隊(duì)列。
我們這樣寫(xiě):
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount
},
pendingCount: {
get: () => queue.length
},
clearQueue: {
value: () => {
queue.length = 0;
}
}
});
用 Object.defineProperties 只定義 get 函數(shù),這樣 activeCount、pendingCount 就是只能讀不能改的。
同時(shí)還提供了一個(gè)清空任務(wù)隊(duì)列的函數(shù)。
然后傳入的參數(shù)也加個(gè)校驗(yàn)邏輯:
if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
不是整數(shù)或者小于 0 就報(bào)錯(cuò),當(dāng)然,Infinity 也是可以的。
最后,其實(shí)還有一個(gè)特別需要完善的點(diǎn),就是這里:
const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};
應(yīng)該改成這樣:
const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
})();
};
因?yàn)?activeCount-- 的邏輯是在執(zhí)行完任務(wù)之后才執(zhí)行的,萬(wàn)一任務(wù)還沒(méi)執(zhí)行完,這時(shí)候 activeCount 就是不準(zhǔn)的。
所以為了保證并發(fā)數(shù)量能控制準(zhǔn)確,要等全部的微任務(wù)執(zhí)行完再拿 activeCount。
怎么在全部的微任務(wù)執(zhí)行完再執(zhí)行邏輯呢?
加一個(gè)新的微任務(wù)不就行了?
所以有這樣的 await Promise.resolve(); 的邏輯。
這樣,就是一個(gè)完善的并發(fā)控制邏輯了,p-limit 也是這么實(shí)現(xiàn)的。
感興趣的同學(xué)可以自己試一下:
const pLimit = (concurrency) => {
if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
const queue = [];
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.length > 0) {
queue.shift()();
}
};
const run = async (fn, resolve, ...args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
})();
};
const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount
},
pendingCount: {
get: () => queue.length
},
clearQueue: {
value: () => {
queue.length = 0;
}
}
});
return generator;
};
const limit = pLimit(2);
function asyncFun(value, delay) {
return new Promise((resolve) => {
console.log('start ' + value);
setTimeout(() => resolve(value), delay);
});
}
(async function () {
const arr = [
limit(() => asyncFun('aaa', 2000)),
limit(() => asyncFun('bbb', 3000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000))
];
const result = await Promise.all(arr);
console.log(result);
})();
總結(jié)
js 代碼經(jīng)常要處理異步邏輯的串行、并行,還可能要做并發(fā)控制,這也是面試??嫉狞c(diǎn)。
實(shí)現(xiàn)并發(fā)控制的核心就是通過(guò)一個(gè)隊(duì)列保存所有的任務(wù),然后最開(kāi)始批量執(zhí)行一批任務(wù)到最大并發(fā)數(shù),然后每執(zhí)行完一個(gè)任務(wù)就再執(zhí)行一個(gè)新的。
其中要注意的是為了保證獲取的任務(wù)數(shù)量是準(zhǔn)確的,要在所有微任務(wù)執(zhí)行完之后再獲取數(shù)量。
實(shí)現(xiàn)并發(fā)控制只要 40 多行代碼,其實(shí)這就是 p-limit 的源碼了,大家感興趣也可以自己實(shí)現(xiàn)一下。