偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

提升 Node.js UDP 的性能?

開發(fā) 前端
recvmmsg 通過減少系統(tǒng)調(diào)用提升性能,但是不一定在什么場景下都有可觀的效果,比如在 QPS 高時可能比較有意義,另外因為每次都需要提前分配內(nèi)容,所以數(shù)據(jù)量小時可能會導(dǎo)致每次分配過多無用的內(nèi)存,然后又被釋放。

網(wǎng)絡(luò) IO 性能一直是熱門的討論話題,不管是操作系統(tǒng)還是應(yīng)用軟件也在不斷優(yōu)化網(wǎng)絡(luò) IO 的性能。我們知道系統(tǒng)調(diào)用是相對耗時的,所以性能優(yōu)化的一種方式就是減少系統(tǒng)調(diào)用。本文介紹最近嘗試在 Node.js 中通過引入 recvmmsg 提升 UDP 接收數(shù)據(jù)性能的相關(guān)內(nèi)容,相比之前每調(diào)一次系統(tǒng)調(diào)用讀取一個數(shù)據(jù)包,recvmmsg 可以一次讀取多個包,從而減少系統(tǒng)調(diào)用提升性能。

UDP 的實(shí)現(xiàn)

下面是在 Node.js 中使用 UDP 的例子。

const dgram = require('dgram')

const socket = dgram.createSocket({type: 'udp4', msgCount: 10});
socket.bind(9999, function() {
    socket.on('message',function() {
      
    });
})

其底層實(shí)現(xiàn)如下。

  • 初始化 udp handle。
r = uv_udp_init(env->event_loop(), &handle_);
  • 注冊讀事件。
int UDPWrap::RecvStart() {
  int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
  return err;
}

uv_udp_recv_start 用于注冊讀事件,當(dāng)有數(shù)據(jù)到來時執(zhí)行 OnAlloc 分配內(nèi)存,然后執(zhí)行 OnRecv 消費(fèi)數(shù)據(jù)。

  • 分配保存數(shù)據(jù)的內(nèi)存。 OnAlloc 實(shí)現(xiàn)如下:
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
  return env()->allocate_managed_buffer(suggested_size);
}

uv_buf_t Environment::allocate_managed_buffer(const size_t suggested_size) {
  std::unique_ptr<BackingStore> bs = ArrayBuffer::NewBackingStore(
      isolate(),
      suggested_size,
      BackingStoreInitializationMode::kUninitialized);
  uv_buf_t buf = uv_buf_init(static_cast<char*>(bs->Data()), bs->ByteLength());
  released_allocated_buffers_.emplace(buf.base, std::move(bs));
  return buf;
}

Node.js 通過 V8 的 BackingStore 申請內(nèi)存,然后把這塊內(nèi)存?zhèn)鹘o Libuv 讀取數(shù)據(jù),最后記錄內(nèi)存首地址和 BackingStore 的映射關(guān)系,后續(xù)會用到。

  • 消費(fèi)數(shù)據(jù) Libuv 讀取數(shù)據(jù)后會調(diào) OnRecv 通知調(diào)用者。
void UDPWrap::OnRecv(ssize_t nread,
                     const uv_buf_t& buf_,
                     const sockaddr* addr,
                     unsigned int flags) {
  Environment* env = this->env();
  Isolate* isolate = env->isolate();
  // 通過 buf 找到對應(yīng)的 BackingStore,然后回調(diào) JS
  std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
  Local<Value> argv[] = {
      Integer::New(isolate, static_cast<int32_t>(nread)),
      object(),
      Undefined(isolate),
      Undefined(isolate)
  };

  if (nread < 0) {
    MakeCallback(env->onmessage_string(), arraysize(argv), argv);
    return;
  } else if (nread == 0) {
    bs = ArrayBuffer::NewBackingStore(isolate, 0);
  } else if (static_cast<size_t>(nread) != bs->ByteLength()) { // 讀取的數(shù)據(jù)和預(yù)分配的不一致,需要創(chuàng)建新的 BackingStore 并把數(shù)據(jù)復(fù)制過去
    std::unique_ptr<BackingStore> old_bs = std::move(bs);
    bs = ArrayBuffer::NewBackingStore(
        isolate, nread, BackingStoreInitializationMode::kUninitialized);
    memcpy(bs->Data(), old_bs->Data(), nread);
  }
  // ...
  // ...
}

可以看到 UDP 的實(shí)現(xiàn)不算復(fù)雜。接著看如何在這個基礎(chǔ)上引入 recvmmsg 能力。

引入 recvmmsg

Libuv 本身支持 recvmmsg,所以我們不需要關(guān)系平臺兼容性問題(目前支持多個平臺,但并不支持在所有平臺中使用)。

  • 設(shè)置使用 recvmmsg。
env->event_loop(), &handle_, AF_UNSPEC | UV_UDP_RECVMMSG);

通過 UV_UDP_RECVMMSG 標(biāo)記可以使得 Libuv 使用 recvmmsg 接收 UDP 數(shù)據(jù)。

  • 分配內(nèi)存。 分配內(nèi)存這里有點(diǎn)棘手,使用 UV_UDP_RECVMMSG 時需要分配一大塊內(nèi)容,足以保存多個數(shù)據(jù)包的數(shù)據(jù),而之前的實(shí)現(xiàn)中,每次是通過 BackingStore 分配一塊獨(dú)立的內(nèi)容,用完后就釋放的。所以這里不能使用 BackingStore 了,直接使用原生的內(nèi)存分配。
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
  if (using_recvmmsg()) {
    // msg_count_ 表示每次讀取多少個數(shù)據(jù)包
    suggested_size *= msg_count_;
    // 在當(dāng)前對象記錄正在使用的內(nèi)存,后面分析
    mmsg_buf_ = uv_buf_init(reinterpret_cast<char*>(malloc(suggested_size)),
                            suggested_size);
    return mmsg_buf_;
  }
  return env()->allocate_managed_buffer(suggested_size);
}

通過 malloc 分配接收多個數(shù)據(jù)包所需要的一大片內(nèi)存,每個數(shù)據(jù)包對應(yīng)其中的一個分片。

  • 消費(fèi)數(shù)據(jù) 消費(fèi)數(shù)據(jù)和之前的邏輯也有些不同,之前是消費(fèi)完數(shù)據(jù)后就可以釋放對應(yīng)的內(nèi)存了,但是引入 recvmmsg 后,每次是分配一大塊內(nèi)存的,每個數(shù)據(jù)包對應(yīng)其中的一個分片,所以消費(fèi)完一個數(shù)據(jù)包還不能釋放整個內(nèi)存,需要全部數(shù)據(jù)包才能釋放這個內(nèi)容。Libuv 的實(shí)現(xiàn)如下:
// buf->len 對應(yīng) OnAlloc 分片的一大片內(nèi)存,除以 UV__UDP_DGRAM_MAXSIZ 表示一次最多接收幾個數(shù)據(jù)包
  chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
  if (chunks > ARRAY_SIZE(iov))
    chunks = ARRAY_SIZE(iov);
  for (k = 0; k < chunks; ++k) {
    iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
    iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
    // ...
  }

  do
    nread = recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
  while (nread == -1 && errno == EINTR);

  if (nread < 1) {
    if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
      handle->recv_cb(handle, 0, buf, NULL, 0);
    else
      handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
  } else {
    /* pass each chunk to the application */
    for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
      flags = UV_UDP_MMSG_CHUNK;
      if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
        flags |= UV_UDP_PARTIAL;

      chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
      handle->recv_cb(handle,
                      msgs[k].msg_len,
                      &chunk_buf,
                      msgs[k].msg_hdr.msg_name,
                      flags);
    }

    /* one last callback so the original buffer is freed */
    if (handle->recv_cb != NULL)
      handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
  }

另外之前是通過 BackingStore 分配內(nèi)存來和 uv_buf_t 關(guān)聯(lián)來實(shí)現(xiàn)的,我們這里則是通過 malloc 直接分配的內(nèi)容,所以接收數(shù)據(jù)時就不能通過 uv_buf_t 來查看對應(yīng)的 BackingStore 了。

void UDPWrap::OnRecv(ssize_t nread,
                     const uv_buf_t& buf_,
                     const sockaddr* addr,
                     unsigned int flags) {
  Environment* env = this->env();
  Isolate* isolate = env->isolate();
  std::unique_ptr<BackingStore> bs;
  // 處理完本輪所有數(shù)據(jù)包,釋放內(nèi)存
  auto cleanup = OnScopeLeave([&]() {
    if (using_recvmmsg() && (nread <= 0 || (flags & UV_UDP_MMSG_FREE))) {
      release_buf();
    }
  });
  // 歷史邏輯
  if (!using_recvmmsg()) {
    bs = env->release_managed_buffer(buf_);
  }
  Local<Value> argv[] = {
      Integer::New(isolate, static_cast<int32_t>(nread)),
      object(),
      Undefined(isolate),
      Undefined(isolate)
  };

  if (nread < 0) {
    MakeCallback(env->onmessage_string(), arraysize(argv), argv);
    return;
  } else if (nread == 0) {
    bs = ArrayBuffer::NewBackingStore(isolate, 0);
  } else if (using_recvmmsg()) {
    // 創(chuàng)建一個 BackingStore 并把數(shù)據(jù)包的數(shù)據(jù)復(fù)制過去
    bs = ArrayBuffer::NewBackingStore(
        isolate, nread, BackingStoreInitializationMode::kUninitialized);
    memcpy(bs->Data(), buf_.base, nread);
  } else if (static_cast<size_t>(nread) != bs->ByteLength()) {
    std::unique_ptr<BackingStore> old_bs = std::move(bs);
    bs = ArrayBuffer::NewBackingStore(
        isolate, nread, BackingStoreInitializationMode::kUninitialized);
    memcpy(bs->Data(), old_bs->Data(), nread);
  }
  // ...
}

上面的改動包括兩方面,一是兼容之前的收到數(shù)據(jù)時創(chuàng)建 BackingStore 的邏輯,這樣可以很大程度地復(fù)用之前的邏輯,二是判斷是否需要釋放內(nèi)存。

  • 避免內(nèi)存泄漏 從上面的處理中可以看到,當(dāng)本輪的所有數(shù)據(jù)包處理完后,我們就會正常釋放申請的大片內(nèi)存,但是存在一個問題是假設(shè)分配了 1MB * 10 大小的內(nèi)存,然后通過 recvmmsg 從操作系統(tǒng)獲取了 10 個數(shù)據(jù)包,但是處理第一個數(shù)據(jù)包時用戶調(diào)了 close 關(guān)閉了 socket,這樣會導(dǎo)致 Libuv 不會回調(diào) Node.js 了,從而導(dǎo)致之前申請的大片內(nèi)存沒有釋放,造成內(nèi)存泄漏。所以在申請內(nèi)存時需要記錄這個內(nèi)存,然后 socket 關(guān)閉時釋放(獲取處理完所有的數(shù)據(jù)包后釋放)。
UDPWrap::~UDPWrap() {
  // Libuv does not release the memory of memory which allocated
  // by handle->alloc_cb when we call close in handle->read_cb,
  // so we should release the memory here if necessary.
  // 開啟了 recvmmsg 并且沒有釋放申請的內(nèi)存則釋放
  if (uv_udp_using_recvmmsg(reinterpret_cast<uv_udp_t*>(&handle_)) {
    release_buf();
  }
}

void release_buf() {
  if (mmsg_buf_.base != nullptr) {
    free(mmsg_buf_.base);
    mmsg_buf_ = uv_buf_init(nullptr, 0);
  }
}

總結(jié)

recvmmsg 通過減少系統(tǒng)調(diào)用提升性能,但是不一定在什么場景下都有可觀的效果,比如在 QPS 高時可能比較有意義,另外因為每次都需要提前分配內(nèi)容,所以數(shù)據(jù)量小時可能會導(dǎo)致每次分配過多無用的內(nèi)存,然后又被釋放。該特性目前還沒有在 Node.js 中進(jìn)行性能測試。

  1. PR:https://github.com/nodejs/node/pull/59126
責(zé)任編輯:武曉燕 來源: 編程雜技
相關(guān)推薦

2015-12-14 10:39:14

2019-07-09 14:50:15

Node.js前端工具

2013-11-01 09:34:56

Node.js技術(shù)

2015-03-10 10:59:18

Node.js開發(fā)指南基礎(chǔ)介紹

2020-12-28 08:48:44

JS工具fastify

2020-12-14 15:40:59

Nodefastifyjs

2022-05-25 10:04:43

Go編程

2013-10-23 17:17:31

Node.jsdoT

2022-06-07 08:07:05

GoNode.js

2020-05-29 15:33:28

Node.js框架JavaScript

2012-02-03 09:25:39

Node.js

2021-12-25 22:29:57

Node.js 微任務(wù)處理事件循環(huán)

2015-11-04 09:18:41

Node.js應(yīng)用性能

2020-12-14 08:55:00

Node.js服務(wù)性框架

2011-09-09 14:23:13

Node.js

2011-09-02 14:47:48

Node

2011-11-01 10:30:36

Node.js

2011-09-08 13:46:14

node.js

2012-10-24 14:56:30

IBMdw

2011-11-10 08:55:00

Node.js
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號