結(jié)合異步迭代器實(shí)現(xiàn) Node.js 流式數(shù)據(jù)復(fù)制
實(shí)現(xiàn)可讀流到可寫(xiě)流數(shù)據(jù)復(fù)制,就是不斷的讀取->寫(xiě)入這個(gè)過(guò)程,那么你首先想到的是不是下面這樣呢?代碼看似很簡(jiǎn)單,結(jié)果卻是很糟糕的,沒(méi)有任何的數(shù)據(jù)積壓處理。如果讀取的文件很大了,造成的后果就是緩沖區(qū)數(shù)據(jù)溢出,程序會(huì)占用過(guò)多的系統(tǒng)內(nèi)存,拖垮服務(wù)器上的其它應(yīng)用,如果不明白的回顧下這篇文章 Node.js Stream 背壓 — 消費(fèi)端數(shù)據(jù)積壓來(lái)不及處理會(huì)怎么樣?。
- // 糟糕的示例,沒(méi)有數(shù)據(jù)積壓處理
- readable.on('data', data => {
- writable.write(data)
- });
類(lèi)似以上的需求,推薦你用 pipe() 方法以流的形式完成數(shù)據(jù)的復(fù)制。
作為學(xué)習(xí),結(jié)合異步迭代器以一種簡(jiǎn)單的方式實(shí)現(xiàn)一個(gè)類(lèi)似于 pipe 一樣的方法完成數(shù)據(jù)源到目標(biāo)源的數(shù)據(jù)復(fù)制。
數(shù)據(jù)寫(xiě)入方法實(shí)現(xiàn)
_write 方法目的是控制可寫(xiě)流的數(shù)據(jù)寫(xiě)入,它返回一個(gè) Promise 對(duì)象,如果可寫(xiě)流的 dest.write() 方法返回 true,表示內(nèi)部緩沖區(qū)未滿(mǎn),繼續(xù)寫(xiě)入。
當(dāng) dest.write() 方法返回 false 表示向流中寫(xiě)入數(shù)據(jù)超過(guò)了它所能處理的最大能力限制,此時(shí)暫停向流中寫(xiě)入數(shù)據(jù),直到 drain 事件觸發(fā),表示緩沖區(qū)中的數(shù)據(jù)已排空了可以繼續(xù)寫(xiě)入,再將 Promise 對(duì)象變?yōu)榻鉀Q。
- function _write(dest, chunk) {
- return new Promise(resolve => {
- if (dest.write(chunk)) {
- return resolve(null);
- }
- dest.once('drain', resolve);
- })
- }
結(jié)合異步迭代器實(shí)現(xiàn)
異步迭代器使從可讀流對(duì)象讀取數(shù)據(jù)變得更簡(jiǎn)單,異步的讀取數(shù)據(jù)并調(diào)用我們封裝的 _write(chunk) 方法寫(xiě)入數(shù)據(jù),如果緩沖區(qū)空間已滿(mǎn),這里 await _write(dest, chunk) 也會(huì)等待,當(dāng)緩沖區(qū)有空間可以繼續(xù)寫(xiě)入了,再次進(jìn)行讀取 -> 寫(xiě)入。
- function myCopy(src, dest) {
- return new Promise(async (resolve, reject) => {
- dest.on('error', reject);
- try {
- for await (const chunk of src) {
- await _write(dest, chunk);
- }
- resolve();
- } catch (err) {
- reject(err);
- }
- });
- }
使用如下所示:
- const readable = fs.createReadStream('text.txt');
- const writable = fs.createWriteStream('dest-text.txt');
- await myCopy(readable, writable);
本文轉(zhuǎn)載自微信公眾號(hào)「Nodejs技術(shù)棧」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系Nodejs技術(shù)棧公眾號(hào)。

























