深入理解 Go 高性能網(wǎng)絡框架 nbio
前言
nbio 項目還包括建立在 nbio 基礎(chǔ)上的nbhttp,但這不在我們的討論范圍之內(nèi)。
與 evio 一樣,nbio 也采用經(jīng)典的 Reactor 模式。事實上,Go 中的許多異步網(wǎng)絡框架都是基于這種模式設(shè)計的。
我們先看看如何執(zhí)行 nbio 代碼。

(1) 服務器:
package main
import (
"fmt"
"github.com/lesismal/nbio"
)
func main() {
g := nbio.NewGopher(nbio.Config{
Network: "tcp",
Addrs: []string{":8888"},
MaxWriteBufferSize: 6 * 1024 * 1024,
})
g.OnData(func(c *nbio.Conn, data []byte) {
c.Write(append([]byte{}, data...))
})
err := g.Start()
if err != nil {
fmt.Printf("nbio.Start failed: %v\n", err)
return
}
defer g.Stop()
g.Wait()
}package main
import (
"fmt"
"github.com/lesismal/nbio"
)
func main() {
g := nbio.NewGopher(nbio.Config{
Network: "tcp",
Addrs: []string{":8888"},
MaxWriteBufferSize: 6 * 1024 * 1024,
})
g.OnData(func(c *nbio.Conn, data []byte) {
c.Write(append([]byte{}, data...))
})
err := g.Start()
if err != nil {
fmt.Printf("nbio.Start failed: %v\n", err)
return
}
defer g.Stop()
g.Wait()
}我們用nbio.NewGopher() 函數(shù)創(chuàng)建新的引擎實例,通過nbio.Config 結(jié)構(gòu)來配置引擎實例,包括:
- Network(網(wǎng)絡):使用的網(wǎng)絡類型,本例中為 "TCP"。
- Addrs(地址):服務器應該監(jiān)聽的地址和端口,這里是":8888"(監(jiān)聽本地計算機的 8888 端口)。
- MaxWriteBufferSize(最大寫緩沖區(qū)大?。簩懢彌_區(qū)的最大大小,此處設(shè)置為 6MB。
我們還可以進一步探索其他配置。然后,我們通過引擎實例g.OnData() 注冊數(shù)據(jù)接收回調(diào)函數(shù),該回調(diào)函數(shù)會在收到數(shù)據(jù)時調(diào)用。回調(diào)函數(shù)需要兩個參數(shù):連接對象c 和接收到的數(shù)據(jù)data。在回調(diào)函數(shù)中,通過c.Write() 方法將接收到的數(shù)據(jù)寫回客戶端。
(2) 客戶端:
package main
import (
"bytes"
"context"
"fmt"
"math/rand"
"time"
"github.com/lesismal/nbio"
"github.com/lesismal/nbio/logging"
)
func main() {
var (
ret []byte
buf = make([]byte, 1024*1024*4)
addr = "localhost:8888"
ctx, _ = context.WithTimeout(context.Background(), 60*time.Second)
)
logging.SetLevel(logging.LevelInfo)
rand.Read(buf)
g := nbio.NewGopher(nbio.Config{})
done := make(chan int)
g.OnData(func(c *nbio.Conn, data []byte) {
ret = append(ret, data...)
if len(ret) == len(buf) {
if bytes.Equal(buf, ret) {
close(done)
}
}
})
err := g.Start()
if err != nil {
fmt.Printf("Start failed: %v\n", err)
}
defer g.Stop()
c, err := nbio.Dial("tcp", addr)
if err != nil {
fmt.Printf("Dial failed: %v\n", err)
}
g.AddConn(c)
c.Write(buf)
select {
case <-ctx.Done():
logging.Error("timeout")
case <-done:
logging.Info("success")
}
}乍一看似乎有點繁瑣,實際上服務器和客戶端共享同一套結(jié)構(gòu)。
客戶端通過nbio.Dial 與服務器連接,連接成功后封裝到nbio.Conn 中。這里nbio.Conn 實現(xiàn)了標準庫中的net.Conn 接口,最后通過g.AddConn(c) 添加此連接,并向服務器寫入數(shù)據(jù)。服務器收到數(shù)據(jù)后,其處理邏輯是將數(shù)據(jù)原封不動發(fā)送回客戶端,客戶端收到數(shù)據(jù)后,會觸發(fā)OnData 回調(diào),該回調(diào)會檢查收到的數(shù)據(jù)長度是否與發(fā)送的數(shù)據(jù)長度一致,如果一致,則關(guān)閉連接。
下面深入探討幾個關(guān)鍵結(jié)構(gòu)。
type Engine struct {
//...
sync.WaitGroup
//...
mux sync.Mutex
wgConn sync.WaitGroup
network string
addrs []string
//...
connsStd map[*Conn]struct{}
connsUnix []*Conn
listeners []*poller
pollers []*poller
onOpen func(c *Conn)
onClose func(c *Conn, err error)
onRead func(c *Conn)
onData func(c *Conn, data []byte)
onReadBufferAlloc func(c *Conn) []byte
onReadBufferFree func(c *Conn, buffer []byte)
//...
}Engine 本質(zhì)上是核心管理器,負責管理所有監(jiān)聽器、輪詢器和工作輪詢器。
這兩種輪詢器有什么區(qū)別?
區(qū)別在于責任不同。
監(jiān)聽輪詢器只負責接受新連接。當一個新的客戶端conn 到達時,它會從pollers 中選擇一個工作輪詢器,并將conn 添加到相應的工作輪詢器中。隨后,工作輪詢器負責處理該連接的讀/寫事件。
因此當我們啟動程序時,如果只監(jiān)聽一個地址,程序中的輪詢次數(shù)等于 1(監(jiān)聽器輪詢器)+pollerNum。
通過上述字段,可以自定義配置和回調(diào)。例如,可以在新連接到達時設(shè)置onOpen 回調(diào)函數(shù),或在數(shù)據(jù)到達時設(shè)置onData 回調(diào)函數(shù)等。
type Conn struct {
mux sync.Mutex
p *poller
fd int
//...
writeBuffer []byte
//...
DataHandler func(c *Conn, data []byte)
}Conn 結(jié)構(gòu)代表網(wǎng)絡連接,每個Conn 只屬于一個輪詢器。當數(shù)據(jù)一次寫不完時,剩余數(shù)據(jù)會先存儲在writeBuffer 中,等待下一個可寫事件繼續(xù)寫入。
type poller struct {
g *Engine
epfd int
evtfd int
index int
shutdown bool
listener net.Listener
isListener bool
unixSockAddr string
ReadBuffer []byte
pollType string
}至于poller 結(jié)構(gòu),這是一個抽象概念,用于管理底層多路復用 I/O 操作(如 Linux 的 epoll、Darwin 的 kqueue 等)。
注意pollType,nbio 默認使用電平觸發(fā)(LT)模式的 epoll,但用戶也可以將其設(shè)置為邊緣觸發(fā)(ET)模式。
介紹完基本結(jié)構(gòu)后,我們來看看代碼流程。
當啟動服務器代碼時,調(diào)用Start:
func (g *Engine) Start() error {
//...
switch g.network {
// 第一部分: 初始化 listener
case "unix", "tcp", "tcp4", "tcp6":
for i := range g.addrs {
ln, err := newPoller(g, true, i)
if err != nil {
for j := 0; j < i; j++ {
g.listeners[j].stop()
}
return err
}
g.addrs[i] = ln.listener.Addr().String()
g.listeners = append(g.listeners, ln)
}
//...
// 第二部分: 初始化一定數(shù)量的輪詢器
for i := 0; i < g.pollerNum; i++ {
p, err := newPoller(g, false, i)
if err != nil {
for j := 0; j < len(g.listeners); j++ {
g.listeners[j].stop()
}
for j := 0; j < i; j++ {
g.pollers[j].stop()
}
return err
}
g.pollers[i] = p
}
//...
// 第三部分: 啟動所有工作輪詢器
for i := 0; i < g.pollerNum; i++ {
g.pollers[i].ReadBuffer = make([]byte, g.readBufferSize)
g.Add(1)
go g.pollers[i].start()
}
// 第四部分: 啟動所有監(jiān)聽器
for _, l := range g.listeners {
g.Add(1)
go l.start()
}
//... (忽略 UDP)
//...
}代碼比較容易理解,分為四個部分:
第一部分:初始化監(jiān)聽器
根據(jù)g.network 值(如 "unix"、"tcp"、"tcp4"、"tcp6"),為每個要監(jiān)聽的地址創(chuàng)建一個新的輪詢器。該輪詢器主要管理監(jiān)聽套接字上的事件。如果在創(chuàng)建過程中發(fā)生錯誤,則停止所有先前創(chuàng)建的監(jiān)聽器并返回錯誤信息。
第二部分:初始化一定數(shù)量的輪詢器
創(chuàng)建指定數(shù)量(pollerNum)的輪詢器,用于處理已連接套接字上的讀/寫事件。如果在創(chuàng)建過程中發(fā)生錯誤,將停止所有監(jiān)聽器和之前創(chuàng)建的工作輪詢器,然后返回錯誤信息。
第三部分:啟動所有工作輪詢器投票站
為每個輪詢器分配讀緩沖區(qū)并啟動。
第四部分:啟動所有監(jiān)聽器
啟動之前創(chuàng)建的所有監(jiān)聽器,并開始監(jiān)聽各自地址上的連接請求。
關(guān)于輪詢器的啟動:
func (p *poller) start() {
defer p.g.Done()
//...
if p.isListener {
p.acceptorLoop()
} else {
defer func() {
syscall.Close(p.epfd)
syscall.Close(p.evtfd)
}()
p.readWriteLoop()
}
}分為兩種情況。如果是監(jiān)聽輪詢器:
func (p *poller) acceptorLoop() {
// 如果不希望將當前 goroutine 調(diào)度到其他操作線程。
if p.g.lockListener {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}
p.shutdown = false
for !p.shutdown {
conn, err := p.listener.Accept()
if err == nil {
var c *Conn
c, err = NBConn(conn)
if err != nil {
conn.Close()
continue
}
// p.g.pollers[c.Hash()%len(p.g.pollers)].addConn(c)
} else {
var ne net.Error
if ok := errors.As(err, &ne); ok && ne.Timeout() {
logging.Error("NBIO[%v][%v_%v] Accept failed: temporary error, retrying...", p.g.Name, p.pollType, p.index)
time.Sleep(time.Second / 20)
} else {
if !p.shutdown {
logging.Error("NBIO[%v][%v_%v] Accept failed: %v, exit...", p.g.Name, p.pollType, p.index, err)
}
break
}
}
}
}監(jiān)聽輪詢器等待新連接的到來,并在接受后將其封裝到nbio.Conn 中,并將Conn 添加到相應的工作輪詢器中。
func (p *poller) addConn(c *Conn) {
c.p = p
if c.typ != ConnTypeUDPServer {
p.g.onOpen(c)
}
fd := c.fd
p.g.connsUnix[fd] = c
err := p.addRead(fd)
if err != nil {
p.g.connsUnix[fd] = nil
c.closeWithError(err)
logging.Error("[%v] add read event failed: %v", c.fd, err)
}
}這里一個有趣的設(shè)計是對conn 的管理。該結(jié)構(gòu)是個切片,直接使用conn 的fd 作為索引。這樣做的好處是:
- 在連接數(shù)較多的情況下,垃圾回收時的負擔要比使用 map 小。
- 可以防止序列號問題。
最后,通過調(diào)用addRead 將相應的conn fd 添加到 epoll 中。
func (p *poller) addRead(fd int) error {
switch p.g.epollMod {
case EPOLLET:
return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN | syscall.EPOLLET})
default:
return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.E
pollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN})
}
}這里不注冊寫事件是合理的,因為新連接上沒有數(shù)據(jù)要發(fā)送。這種方法避免了一些不必要的系統(tǒng)調(diào)用,從而提高了程序性能。
如果啟動的是工作輪詢器,它的工作就是等待新增conn 事件,并進行相應處理。
func (p *poller) readWriteLoop() {
//...
msec := -1
events := make([]syscall.EpollEvent, 1024)
//...
for !p.shutdown {
n, err := syscall.EpollWait(p.epfd, events, msec)
if err != nil && !errors.Is(err, syscall.EINTR) {
return
}
if n <= 0 {
msec = -1
continue
}
msec = 20
// 遍歷事件
for _, ev := range events[:n] {
fd := int(ev.Fd)
switch fd {
case p.evtfd:
default:
c := p.getConn(fd)
if c != nil {
if ev.Events&epollEventsError != 0 {
c.closeWithError(io.EOF)
continue
}
// 如果可寫,則刷新數(shù)據(jù)
if ev.Events&epollEventsWrite != 0 {
c.flush()
}
// 讀取事件
if ev.Events&epollEventsRead != 0 {
if p.g.onRead == nil {
for i := 0; i < p.g.maxConnReadTimesPerEventLoop; i++ {
buffer := p.g.borrow(c)
rc, n, err := c.ReadAndGetConn(buffer)
if n > 0 {
p.g.onData(rc, buffer[:n])
}
p.g.payback(c, buffer)
//...
if n < len(buffer) {
break
}
}
} else {
p.g.onRead(c)
}
}
} else {
syscall.Close(fd)
}
}
}
}
}這段代碼也很簡單,等待事件到來,遍歷事件列表,并相應處理每個事件。
func EpollWait(epfd int, events []EpollEvent, msec int) (n int, err error)在EpollWait 中,只有msec 是用戶可修改的。通常,我們設(shè)置msec = -1 使函數(shù)阻塞,直到至少有一個事件發(fā)生;否則,函數(shù)將無限期阻塞。當事件較少時,這種方法非常有用,能最大限度減少 CPU 占用。
如果想盡快響應事件,可以設(shè)置msec = 0,這樣EpollWait 就能立即返回,無需等待任何事件。在這種情況下,程序可能會更頻繁調(diào)用EpollWait,可以在事件發(fā)生后立即處理事件,從而提高 CPU 使用率。
如果程序可以容忍一定延遲,并且希望降低 CPU 占用率,可以將msec 設(shè)置為正數(shù)。這樣,EpollWait 就會在指定時間內(nèi)等待事件發(fā)生。如果在這段時間內(nèi)沒有事件發(fā)生,函數(shù)將返回,可以選擇稍后再次調(diào)用EpollWait。這種方法可以降低 CPU 占用率,但可能導致響應時間延長。
nbio 會根據(jù)事件計數(shù)調(diào)整msec 值。如果計數(shù)大于 0,則msec 設(shè)置為 20。
字節(jié)跳動的 netpoll 代碼與此類似;如果事件計數(shù)大于 0 ,則將msec 設(shè)置為 0;如果事件計數(shù)小于或等于 0,則將msec 設(shè)置為-1,然后調(diào)用Gosched() 以主動退出當前 goroutine。
var msec = -1
for {
n, err = syscall.EpollWait(epfd, events, msec)
if n <= 0 {
msec = -1
runtime.Gosched()
continue
}
msec = 0
...
}不過,nbio 中的自愿切換代碼已被注釋掉。根據(jù)作者的解釋,最初他參考了字節(jié)跳動的方法,并添加了自愿切換功能。
不過,在對 nbio 進行性能測試時發(fā)現(xiàn),添加或不添加自愿切換功能對性能并無明顯影響,因此最終決定將其刪除。
事件處理部分
如果是可讀事件,則可以通過內(nèi)置或自定義內(nèi)存分配器獲取相應的緩沖區(qū),然后調(diào)用ReadAndGetConn 讀取數(shù)據(jù),無需每次都分配緩沖區(qū)。
如果是可寫事件,則會調(diào)用flush 發(fā)送緩沖區(qū)中未發(fā)送的數(shù)據(jù)。
func (c *Conn) flush() error {
//.....
old := c.writeBuffer
n, err := c.doWrite(old)
if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
//.....
}
if n < 0 {
n = 0
}
left := len(old) - n
// 描述尚未完成,因此將其余部分存儲在writeBuffer中以備下次寫入。
if left > 0 {
if n > 0 {
c.writeBuffer = mempool.Malloc(left)
copy(c.writeBuffer, old[n:])
mempool.Free(old)
}
// c.modWrite()
} else {
mempool.Free(old)
c.writeBuffer = nil
if c.wTimer != nil {
c.wTimer.Stop()
c.wTimer = nil
}
// 解釋完成后,首先將conn重置為僅讀取事件。
c.resetRead()
//...
}
c.mux.Unlock()
return nil
}邏輯也很簡單,有多少就寫多少,如果寫不完,就把剩余數(shù)據(jù)放回writeBuffer,然后在epollWait 觸發(fā)時再次寫入。
如果寫入完成,則不再有數(shù)據(jù)要寫入,將此連接的事件重置為讀取事件。
主邏輯基本上就是這樣。
等等,最初提到有新連接進入時,只注冊了連接的讀事件,并沒有注冊寫事件。寫事件是什么時候注冊的?
當然是在調(diào)用conn.Write 時注冊的。
g := nbio.NewGopher(nbio.Config{
Network: "tcp",
Addrs: []string{":8888"},
MaxWriteBufferSize: 6 * 1024 * 1024,
})
g.OnData(func(c *nbio.Conn, data []byte) {
c.Write(append([]byte{}, data...))
})當 Conn 數(shù)據(jù)到達時,底層會在讀取數(shù)據(jù)后回調(diào)OnData 函數(shù),此時可以調(diào)用Write 向另一端發(fā)送數(shù)據(jù)。
g := nbio.NewGopher(nbio.Config{
Network: "tcp",
Addrs: []string{":8888"},
MaxWriteBufferSize: 6 * 1024 * 1024,
})
g.OnData(func(c *nbio.Conn, data []byte) {
c.Write(append([]byte{}, data...))
})
// 當數(shù)據(jù)到達conn時,底層將讀取數(shù)據(jù)并回調(diào)OnData函數(shù)。此時,您可以調(diào)用Write來向另一端發(fā)送數(shù)據(jù)。
func (c *Conn) Write(b []byte) (int, error) {
//....
n, err := c.write(b)
if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
//.....
return n, err
}
if len(c.writeBuffer) == 0 {
if c.wTimer != nil {
c.wTimer.Stop()
c.wTimer = nil
}
} else {
//仍然有數(shù)據(jù)未寫入,添加寫事件。
c.modWrite()
}
//.....
return n, err
}
func (c *Conn) write(b []byte) (int, error) {
//...
if len(c.writeBuffer) == 0 {
n, err := c.doWrite(b)
if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
return n, err
}
//.....
left := len(b) - n
// 未完成,將剩余數(shù)據(jù)寫入writeBuffer。
if left > 0 && c.typ == ConnTypeTCP {
c.writeBuffer = mempool.Malloc(left)
copy(c.writeBuffer, b[n:])
c.modWrite()
}
return len(b), nil
}
// 如果writeBuffer中仍有未寫入的數(shù)據(jù),則還將追加新數(shù)據(jù)。
c.writeBuffer = mempool.Append(c.writeBuffer, b...)
return len(b), nil
}當數(shù)據(jù)未完全寫入時,剩余數(shù)據(jù)將被放入writeBuffer,觸發(fā)執(zhí)行modWrite,并將conn 的寫入事件注冊到 epoll。
總結(jié)
與 evio 相比,nbio 沒有蜂群效應。
Evio 通過不斷喚醒無效的 epoll 來實現(xiàn)邏輯正確性。Nbio 盡量減少系統(tǒng)調(diào)用,減少不必要的開銷。
在可用性方面,nbio 實現(xiàn)了標準庫net.Conn,許多設(shè)置都是可配置的,允許用戶進行高度靈活的定制。
預分配緩沖區(qū)用于讀寫操作,以提高應用程序性能。
總之,nbio 是個不錯的高性能無阻塞網(wǎng)絡框架。
參考資料:
[1]Analyzing High-Performance Network Framework nbio in Go:https://levelup.gitconnected.com/analyzing-high-performance-network-framework-nbio-in-go-9c35f295b5ad

























