Node.js中的異步Generator函數(shù)和Websockets
異步 generator 函數(shù)是 ES2018 中新增的特性。Node.js 從 v10 版本增加了對(duì)異步 generator 函數(shù)的支持。異步 generator 函數(shù)看似一個(gè)相當(dāng)小眾特性特性,但是卻為 node.js websocket 框架提供了一個(gè)靈巧的使用機(jī)會(huì)。
在這篇文章中,我將說(shuō)明 Node.js websocket 框架將如何使用異步 generator 函數(shù)。
HTTP 框架分類
首先,想一下 Express 或 Hapi 之類的 HTTP 服務(wù)器框架。一般來(lái)說(shuō),大多數(shù) HTTP 服務(wù)器框架都屬于以下三種之一:
1. 顯式響應(yīng)。 在 Express 中發(fā)送一個(gè) HTTP 響應(yīng),你必須調(diào)用 res.end(),res.json() 或者 res 對(duì)象上的一些其他方法。換句話說(shuō),你必須顯式調(diào)用一個(gè)方法來(lái)發(fā)送一個(gè)響應(yīng)。
2. 使用 return 隱式響應(yīng)。 另一方面,Hapi 在 v17 中明確地刪除了 reply() 函數(shù),也就是說(shuō) Hapi 沒(méi)有等同于 res 的方式。如果需要發(fā)送一個(gè)響應(yīng)。你只需在請(qǐng)求的處理方法中 return 一個(gè)返回值。之后 Hapi 就會(huì)將 return 的值封裝進(jìn)一個(gè) HTTP 響應(yīng)中。
3. 在適當(dāng)?shù)奈恢眯薷捻憫?yīng)。 Koa 使用了一種混合了以上兩種實(shí)現(xiàn)的獨(dú)特處理方式。你將以修改 ctx 對(duì)象的方式,替代調(diào)用 res 對(duì)象的方法來(lái)構(gòu)建響應(yīng)。
換句話說(shuō),一些 HTTP 框架要求你顯式調(diào)用方法來(lái)發(fā)送 HTTP 響應(yīng),另一些框架會(huì)提供給你一個(gè)可更改的 HTTP 響應(yīng)對(duì)象,還有一些框架僅需要處理函數(shù)中 return 一個(gè)值。
Websockets 和 HTTP 的區(qū)別在于,Websockets 服務(wù)器可以在任何時(shí)間向 socket 推送消息,不管是不是基于某條消息的響應(yīng)。也就是說(shuō),初級(jí)的 websocket 框架,例如 ws, 看起來(lái)很像 “顯式響應(yīng)” 模式:你需要顯式調(diào)用一個(gè)方法用于發(fā)送一條消息。
然而,是否可以在保持允許消息多發(fā)這個(gè)優(yōu)點(diǎn)的同時(shí),使 websockets 可以實(shí)現(xiàn)隱式響應(yīng)?這就是異步 generator 產(chǎn)生的原因。
從服務(wù)器上讀取大塊數(shù)據(jù)
假設(shè)你有一個(gè)一次讀取一堆文檔的 Mongoose 指針,并且你希望用 websocket 在每一個(gè)文檔讀出時(shí)盡快將它發(fā)送出去。這種方式有助于在任何時(shí)刻都使服務(wù)器的內(nèi)存使用量保持在最?。嚎蛻舳丝梢垣@取所有的數(shù)據(jù),而服務(wù)器卻不用為此在內(nèi)存中一次保存所有的數(shù)據(jù)。舉個(gè)例子,這是使用 async/await 方式讀取一個(gè)指針的實(shí)現(xiàn):
- const User = mongoose.model('User', mongoose.Schema({ name: String }));
- const cursor = Model.find().cursor();
- for await (const doc of cursor) {
- console.log(doc.name); // Print user names 1 by 1.
- }
使 generator 函數(shù)變得有趣的地方在于,在一個(gè)函數(shù)中 yield 方法可以被調(diào)用多次,并且在上次停止的地方繼續(xù)運(yùn)行,除了這點(diǎn)以外,yield 方法和 return 方法類似。
- const User = mongoose.model('User', mongoose.Schema({ name: String }));
- async function* streamUsers() {
- const cursor = Model.find().cursor();
- for await (const doc of cursor) {
- // Yielding each doc behaves like multiple implicit responses, if you have
- // a framework that supports it.
- yield doc;
- }
- }
以下是如何使用 Node.js 編寫(xiě)一個(gè) Websocket 服務(wù)器:
- const WebSocket = require('ws');
- const server = new WebSocket.Server({
- port: 8080
- });
- server.on('connection', function(socket) {
- socket.on('message', function(msg) {
- // Handle message
- });
- });
至此,接下來(lái)要做的是為 websocket 服務(wù)器添加 streamUsers() 方法。假設(shè)收到的每條消息都是有效的 JSON,并且都有屬性 action 和 id。當(dāng) action === 'streamUsers'時(shí),streamUsers() 就會(huì)被執(zhí)行,并且基于 socket 向外發(fā)送每個(gè)被 Mongoose cursor 查詢出來(lái)的用戶。
- const WebSocket = require('ws');
- const server = new WebSocket.Server({
- port: 8080
- });
- server.on('connection', function(socket) {
- socket.on('message', function(msg) {
- msg = JSON.parse(msg);
- if (msg.action === 'streamUsers') {
- void async function() {
- // Send 1 message per user, as opposed to loading all users and then
- // sending them all in 1 message.
- for await (const doc of streamUsers()) {
- socket.send(JSON.stringify({ id: msg.id, doc }));
- }
- }().catch(err => socket.send(JSON.stringify({ id: msg.id, error: err.message })));
- }
- });
- });
以下是如何通過(guò) websocket 客戶端調(diào)用 streamUsers() 方法:
- const client = new WebSocket('ws://localhost:8080');
- // Will print each user doc 1 at a time.
- client.on('message', msg => console.log(msg));
- await new Promise(resolve => client.once('open', resolve));
- client.send(JSON.stringify({ action: 'streamUsers', id: 1 }));
后續(xù)
異步 generator 函數(shù)提供了一種創(chuàng)建更高級(jí)的,如同一些 HTTP 框架(例如 Hapi 和 Fastify)那樣,基于隱式響應(yīng)的 websocket 框架的機(jī)會(huì)。而隱式響應(yīng)的主要優(yōu)勢(shì)就在于,你在業(yè)務(wù)邏輯中不需要關(guān)注框架是通過(guò) websocket,HTTP 輪詢或是其他某種方式來(lái)發(fā)送結(jié)果??蚣茏杂墒?Javascript 編程更輕便并且更容易測(cè)試。
通過(guò)將所有產(chǎn)生的值存放在一個(gè)數(shù)組中,或者讓客戶端發(fā)起多次請(qǐng)求對(duì)一個(gè)指針進(jìn)行迭代,streamUsers() 方法就可以很容易的在一個(gè) HTTP 框架,或者是一個(gè)使用輪詢的 HTTP 框架中重用。沒(méi)有異步 generator 函數(shù),所有這些都是不能實(shí)現(xiàn)的。