提升 Node.js UDP 的性能?
網(wǎng)絡(luò) IO 性能一直是熱門的討論話題,不管是操作系統(tǒng)還是應(yīng)用軟件也在不斷優(yōu)化網(wǎng)絡(luò) IO 的性能。我們知道系統(tǒng)調(diào)用是相對耗時的,所以性能優(yōu)化的一種方式就是減少系統(tǒng)調(diào)用。本文介紹最近嘗試在 Node.js 中通過引入 recvmmsg 提升 UDP 接收數(shù)據(jù)性能的相關(guān)內(nèi)容,相比之前每調(diào)一次系統(tǒng)調(diào)用讀取一個數(shù)據(jù)包,recvmmsg 可以一次讀取多個包,從而減少系統(tǒng)調(diào)用提升性能。
UDP 的實(shí)現(xiàn)
下面是在 Node.js 中使用 UDP 的例子。
const dgram = require('dgram')
const socket = dgram.createSocket({type: 'udp4', msgCount: 10});
socket.bind(9999, function() {
    socket.on('message',function() {
      
    });
})其底層實(shí)現(xiàn)如下。
- 初始化 udp handle。
 
r = uv_udp_init(env->event_loop(), &handle_);- 注冊讀事件。
 
int UDPWrap::RecvStart() {
  int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
  return err;
}uv_udp_recv_start 用于注冊讀事件,當(dāng)有數(shù)據(jù)到來時執(zhí)行 OnAlloc 分配內(nèi)存,然后執(zhí)行 OnRecv 消費(fèi)數(shù)據(jù)。
- 分配保存數(shù)據(jù)的內(nèi)存。 OnAlloc 實(shí)現(xiàn)如下:
 
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
  return env()->allocate_managed_buffer(suggested_size);
}
uv_buf_t Environment::allocate_managed_buffer(const size_t suggested_size) {
  std::unique_ptr<BackingStore> bs = ArrayBuffer::NewBackingStore(
      isolate(),
      suggested_size,
      BackingStoreInitializationMode::kUninitialized);
  uv_buf_t buf = uv_buf_init(static_cast<char*>(bs->Data()), bs->ByteLength());
  released_allocated_buffers_.emplace(buf.base, std::move(bs));
  return buf;
}Node.js 通過 V8 的 BackingStore 申請內(nèi)存,然后把這塊內(nèi)存?zhèn)鹘o Libuv 讀取數(shù)據(jù),最后記錄內(nèi)存首地址和 BackingStore 的映射關(guān)系,后續(xù)會用到。
- 消費(fèi)數(shù)據(jù) Libuv 讀取數(shù)據(jù)后會調(diào) OnRecv 通知調(diào)用者。
 
void UDPWrap::OnRecv(ssize_t nread,
                     const uv_buf_t& buf_,
                     const sockaddr* addr,
                     unsigned int flags) {
  Environment* env = this->env();
  Isolate* isolate = env->isolate();
  // 通過 buf 找到對應(yīng)的 BackingStore,然后回調(diào) JS
  std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
  Local<Value> argv[] = {
      Integer::New(isolate, static_cast<int32_t>(nread)),
      object(),
      Undefined(isolate),
      Undefined(isolate)
  };
  if (nread < 0) {
    MakeCallback(env->onmessage_string(), arraysize(argv), argv);
    return;
  } else if (nread == 0) {
    bs = ArrayBuffer::NewBackingStore(isolate, 0);
  } else if (static_cast<size_t>(nread) != bs->ByteLength()) { // 讀取的數(shù)據(jù)和預(yù)分配的不一致,需要創(chuàng)建新的 BackingStore 并把數(shù)據(jù)復(fù)制過去
    std::unique_ptr<BackingStore> old_bs = std::move(bs);
    bs = ArrayBuffer::NewBackingStore(
        isolate, nread, BackingStoreInitializationMode::kUninitialized);
    memcpy(bs->Data(), old_bs->Data(), nread);
  }
  // ...
  // ...
}可以看到 UDP 的實(shí)現(xiàn)不算復(fù)雜。接著看如何在這個基礎(chǔ)上引入 recvmmsg 能力。
引入 recvmmsg
Libuv 本身支持 recvmmsg,所以我們不需要關(guān)系平臺兼容性問題(目前支持多個平臺,但并不支持在所有平臺中使用)。
- 設(shè)置使用 recvmmsg。
 
env->event_loop(), &handle_, AF_UNSPEC | UV_UDP_RECVMMSG);通過 UV_UDP_RECVMMSG 標(biāo)記可以使得 Libuv 使用 recvmmsg 接收 UDP 數(shù)據(jù)。
- 分配內(nèi)存。 分配內(nèi)存這里有點(diǎn)棘手,使用 UV_UDP_RECVMMSG 時需要分配一大塊內(nèi)容,足以保存多個數(shù)據(jù)包的數(shù)據(jù),而之前的實(shí)現(xiàn)中,每次是通過 BackingStore 分配一塊獨(dú)立的內(nèi)容,用完后就釋放的。所以這里不能使用 BackingStore 了,直接使用原生的內(nèi)存分配。
 
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
  if (using_recvmmsg()) {
    // msg_count_ 表示每次讀取多少個數(shù)據(jù)包
    suggested_size *= msg_count_;
    // 在當(dāng)前對象記錄正在使用的內(nèi)存,后面分析
    mmsg_buf_ = uv_buf_init(reinterpret_cast<char*>(malloc(suggested_size)),
                            suggested_size);
    return mmsg_buf_;
  }
  return env()->allocate_managed_buffer(suggested_size);
}通過 malloc 分配接收多個數(shù)據(jù)包所需要的一大片內(nèi)存,每個數(shù)據(jù)包對應(yīng)其中的一個分片。
- 消費(fèi)數(shù)據(jù) 消費(fèi)數(shù)據(jù)和之前的邏輯也有些不同,之前是消費(fèi)完數(shù)據(jù)后就可以釋放對應(yīng)的內(nèi)存了,但是引入 recvmmsg 后,每次是分配一大塊內(nèi)存的,每個數(shù)據(jù)包對應(yīng)其中的一個分片,所以消費(fèi)完一個數(shù)據(jù)包還不能釋放整個內(nèi)存,需要全部數(shù)據(jù)包才能釋放這個內(nèi)容。Libuv 的實(shí)現(xiàn)如下:
 
// buf->len 對應(yīng) OnAlloc 分片的一大片內(nèi)存,除以 UV__UDP_DGRAM_MAXSIZ 表示一次最多接收幾個數(shù)據(jù)包
  chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
  if (chunks > ARRAY_SIZE(iov))
    chunks = ARRAY_SIZE(iov);
  for (k = 0; k < chunks; ++k) {
    iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
    iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
    // ...
  }
  do
    nread = recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
  while (nread == -1 && errno == EINTR);
  if (nread < 1) {
    if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
      handle->recv_cb(handle, 0, buf, NULL, 0);
    else
      handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
  } else {
    /* pass each chunk to the application */
    for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
      flags = UV_UDP_MMSG_CHUNK;
      if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
        flags |= UV_UDP_PARTIAL;
      chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
      handle->recv_cb(handle,
                      msgs[k].msg_len,
                      &chunk_buf,
                      msgs[k].msg_hdr.msg_name,
                      flags);
    }
    /* one last callback so the original buffer is freed */
    if (handle->recv_cb != NULL)
      handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
  }另外之前是通過 BackingStore 分配內(nèi)存來和 uv_buf_t 關(guān)聯(lián)來實(shí)現(xiàn)的,我們這里則是通過 malloc 直接分配的內(nèi)容,所以接收數(shù)據(jù)時就不能通過 uv_buf_t 來查看對應(yīng)的 BackingStore 了。
void UDPWrap::OnRecv(ssize_t nread,
                     const uv_buf_t& buf_,
                     const sockaddr* addr,
                     unsigned int flags) {
  Environment* env = this->env();
  Isolate* isolate = env->isolate();
  std::unique_ptr<BackingStore> bs;
  // 處理完本輪所有數(shù)據(jù)包,釋放內(nèi)存
  auto cleanup = OnScopeLeave([&]() {
    if (using_recvmmsg() && (nread <= 0 || (flags & UV_UDP_MMSG_FREE))) {
      release_buf();
    }
  });
  // 歷史邏輯
  if (!using_recvmmsg()) {
    bs = env->release_managed_buffer(buf_);
  }
  Local<Value> argv[] = {
      Integer::New(isolate, static_cast<int32_t>(nread)),
      object(),
      Undefined(isolate),
      Undefined(isolate)
  };
  if (nread < 0) {
    MakeCallback(env->onmessage_string(), arraysize(argv), argv);
    return;
  } else if (nread == 0) {
    bs = ArrayBuffer::NewBackingStore(isolate, 0);
  } else if (using_recvmmsg()) {
    // 創(chuàng)建一個 BackingStore 并把數(shù)據(jù)包的數(shù)據(jù)復(fù)制過去
    bs = ArrayBuffer::NewBackingStore(
        isolate, nread, BackingStoreInitializationMode::kUninitialized);
    memcpy(bs->Data(), buf_.base, nread);
  } else if (static_cast<size_t>(nread) != bs->ByteLength()) {
    std::unique_ptr<BackingStore> old_bs = std::move(bs);
    bs = ArrayBuffer::NewBackingStore(
        isolate, nread, BackingStoreInitializationMode::kUninitialized);
    memcpy(bs->Data(), old_bs->Data(), nread);
  }
  // ...
}上面的改動包括兩方面,一是兼容之前的收到數(shù)據(jù)時創(chuàng)建 BackingStore 的邏輯,這樣可以很大程度地復(fù)用之前的邏輯,二是判斷是否需要釋放內(nèi)存。
- 避免內(nèi)存泄漏 從上面的處理中可以看到,當(dāng)本輪的所有數(shù)據(jù)包處理完后,我們就會正常釋放申請的大片內(nèi)存,但是存在一個問題是假設(shè)分配了 1MB * 10 大小的內(nèi)存,然后通過 recvmmsg 從操作系統(tǒng)獲取了 10 個數(shù)據(jù)包,但是處理第一個數(shù)據(jù)包時用戶調(diào)了 close 關(guān)閉了 socket,這樣會導(dǎo)致 Libuv 不會回調(diào) Node.js 了,從而導(dǎo)致之前申請的大片內(nèi)存沒有釋放,造成內(nèi)存泄漏。所以在申請內(nèi)存時需要記錄這個內(nèi)存,然后 socket 關(guān)閉時釋放(獲取處理完所有的數(shù)據(jù)包后釋放)。
 
UDPWrap::~UDPWrap() {
  // Libuv does not release the memory of memory which allocated
  // by handle->alloc_cb when we call close in handle->read_cb,
  // so we should release the memory here if necessary.
  // 開啟了 recvmmsg 并且沒有釋放申請的內(nèi)存則釋放
  if (uv_udp_using_recvmmsg(reinterpret_cast<uv_udp_t*>(&handle_)) {
    release_buf();
  }
}
void release_buf() {
  if (mmsg_buf_.base != nullptr) {
    free(mmsg_buf_.base);
    mmsg_buf_ = uv_buf_init(nullptr, 0);
  }
}總結(jié)
recvmmsg 通過減少系統(tǒng)調(diào)用提升性能,但是不一定在什么場景下都有可觀的效果,比如在 QPS 高時可能比較有意義,另外因為每次都需要提前分配內(nèi)容,所以數(shù)據(jù)量小時可能會導(dǎo)致每次分配過多無用的內(nèi)存,然后又被釋放。該特性目前還沒有在 Node.js 中進(jìn)行性能測試。
- PR:https://github.com/nodejs/node/pull/59126
 















 
 
 






 
 
 
 