偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Go 運行起來是怎樣的,你知道嗎?

開發(fā) 前端
Go 可以通過 go 關鍵字創(chuàng)建多個協(xié)程,這些協(xié)程是跑在多個線程中的,天然利用了多核能力,但是如果使用了公共的數(shù)據(jù)結構,需要通過互斥機制保證數(shù)據(jù)的正確性,而又因為搶占式調度的存在,盡管我們只跑在一個線程中,對共享數(shù)據(jù)的修改也會存在競態(tài)條件。

當我們使用一門語言或一個軟件時,我們都是面向 API 或文檔來使用它們的,很多時候我們更關注的是如何使用它們來解決業(yè)務的問題,往往不需要了解它具體是如何運行的,比如它說可以通過 read 來讀取一個文件,通過 accept 來獲取一個 TCP 連接,當我們需要時按需調用就行。但是了解運行時的細節(jié)不僅有助于我們更了解相關的技術,而且有助于我們解決碰到的問題,比如之前在 Libuv 中存在慢 IO(DNS 解析)太多導致快 IO(文件 IO)無法執(zhí)行的問題,從而影響了軟件的運行性能。本文主要介紹 Go 運行時的一些細節(jié),但是細節(jié)太多太復雜,無法一一描述。

了解 Go 的運行時細節(jié)前先看一下一些著名軟件的情況。

Redis

Redis 是一個基于事件驅動+非阻塞 IO 的單線程應用。

  1. 在啟動后會啟動一個服務器并把服務器對應的 fd 注冊到事件驅動模塊中,開始事件循環(huán)。
  2. 當連接到來時就會收到讀事件,然后通過 accept 獲取一個新的 socket 并把該 socket 和讀寫事件注冊到事件驅動模塊中。
  3. 當數(shù)據(jù)到來時調 read 讀取。
  4. 解析并處理請求。
  5. 調用 write 返回數(shù)據(jù)。 這是 Redis 的常見的執(zhí)行流程。但是除此之外,還有一些額外的邏輯。
  6. 通過子線程處理數(shù)據(jù)在內存和硬盤間的交換。
  7. 通過子進程進行 AOF 重寫和 RDB。
  8. 通過子線程刷 AOF 數(shù)據(jù)到硬盤。
  9. 維護一個定時器數(shù)據(jù)結構,在每輪中判斷過期的定時器,通過事件驅動模塊的阻塞時間保證定時器的按時執(zhí)行。 6.0 后 Redis 甚至把網(wǎng)絡 IO 的讀寫也放到了子線程,但是整體來看執(zhí)行的流程還是比較好理解的。

Nginx

Nginx 是一個基于事件驅動+非阻塞 IO 的單線程應用。但是 Nginx 可以啟動多個子進程,因為 Ngnix 和 Redis 的場景不一樣,Redis 是在進程的內存中維護數(shù)據(jù)的,多進程很難維護進程間的數(shù)據(jù)同步和一致性,除非是每個進程維護不同的數(shù)據(jù)集,按 key 進行哈希讀寫,類似集群模式。而 Nginx 是無狀態(tài)的,可以橫行擴容最大化利用資源,在每個子進程內,Nginx 和 Redis 的架構差不多,主體流程也是啟動一個服務器,然后啟動事件循環(huán),處理網(wǎng)絡 IO 事件和定時器,再通過線程池處理一些耗時和阻塞式的操作,如文件 IO 的。多進程帶來的一個問題是多個進程需要監(jiān)聽一個端口,所以需要解決多進程監(jiān)聽同一個端口和處理驚群問題,早期的 Nginx 是通過共享一個 socket + 自己解決驚群問題,現(xiàn)在已經(jīng)支持通過操作系統(tǒng)的 REUSEPORT 特性。

Node.js

Node.js 是一個基于事件驅動+非阻塞 IO 的單線程應用,架構上是由單線程執(zhí)行事件循環(huán)+線程池組成。Node.js 支持創(chuàng)建多進程,每個進程內支持創(chuàng)建多個子線程,每個子線程都是一個獨立的事件循環(huán)并共享線程池。進程間監(jiān)聽端口支持共享 socket、文件描述符傳遞和 REUSEPORT 三種模式。另外 Node.js 已經(jīng)支持異步 IO io_uring。

Go

Go 是一個基于事件驅動+非阻塞 IO 的多線程應用。相對前面幾個軟件來說,Go 的底層并不是簡單的注冊事件,執(zhí)行回調那么簡單,Go 運行時的流程和前面介紹的幾個軟件有很大的不同。

  1. 實現(xiàn)了協(xié)程,并通過 n:m 模式原生利用了多核能力。
  2. 通過 hadnoff 機制實現(xiàn)系統(tǒng)調用等阻塞線程的操作,而不是通過線程池。
  3. 支持協(xié)作式和搶占式調度,避免單個協(xié)程影響整體系統(tǒng)的性能。
  4. 支持棧自動擴所容。
  5. 支持以同步的方式寫異步代碼,而不是回調(Node.js 也支持,但是不徹底)。

下面看一下 Go 是如何實現(xiàn)這些能力的。

啟動過程

TEXT _rt0_386(SB),NOSPLIT,$8
 JMP runtime·rt0_go(SB)
  
TEXT runtime·rt0_go(SB),NOSPLIT|NOFRAME|TOPFRAME,$0
 CALL runtime·args(SB)
 CALL runtime·osinit(SB)
 CALL runtime·schedinit(SB)

 // 創(chuàng)建主協(xié)程
 PUSHL $runtime·mainPC(SB) // entry
 CALL runtime·newproc(SB)
 POPL AX

 // 開始調度
 CALL runtime·mstart(SB)

 CALL runtime·abort(SB)
 RET

Go 啟動時,初始化完數(shù)據(jù)結構后,就以 runtime·mainPC(runtime·main)為參數(shù),調用 runtime·newproc 創(chuàng)建了第一個協(xié)程,可以簡單理解為 Go 內部維護了一個協(xié)程隊列,接著調 runtime·mstart 開始調度協(xié)程的執(zhí)行,可以簡單理解為從協(xié)程隊列中選擇一個就緒的協(xié)程執(zhí)行。

TEXT runtime·mstart(SB),NOSPLIT|TOPFRAME,$0
 CALL runtime·mstart0(SB)
 RET // not reached

runtime·mstart 繼續(xù)調 runtime·mstart0。

func mstart0() { 
 mstart1()
}

func mstart1() {
  // 注冊信號處理函數(shù),實現(xiàn)搶占式調度
 if gp.m == &m0 {
  mstartm0()
 }
  
 // 開始調度
 schedule()
}

因為現(xiàn)在只有剛才創(chuàng)建的主協(xié)程,所以自然就會調度主協(xié)程執(zhí)行,主協(xié)程代碼如下。

func main() {
 mp := getg().m
  // 啟動 sysmon 線程
 systemstack(func() {
    newm(sysmon, nil, -1)
  })
  // 開始 gc 協(xié)程
 gcenable()
  // 執(zhí)行用戶的 main 函數(shù)
 fn := main_main 
 fn()
}

主協(xié)程啟動了一個 sysmon 線程(后面介紹)和一個 gc 相關的協(xié)程,最后執(zhí)行用戶的 main 函數(shù),這樣 Go 程序就執(zhí)行起來了,比如下面的例子。

package main

import "net"

func main() {
 listener, _ := net.Listen("tcp", ":8080")
 for {
  conn, _ := listener.Accept()
    go func() {
      conn.Read(...)
      conn.Write(...)
      conn.Close()
    }()
 }
}

當調用 Accept 時,主協(xié)程就阻塞了,但是主線程并沒有阻塞,這時候主線程會執(zhí)行其他任務,因為這時候沒有其他任務需要執(zhí)行,所以主線程會阻塞在事件驅動模塊等待連接的到來,我們如果在開頭加上以下代碼,可以看到輸出,說明主線程沒有阻塞。

time.AfterFunc(1*time.Second, func() {
  println("1 seconds later")
})

以同步方式寫異步代碼

我們知道操作系統(tǒng)的 accept/read/write 等系統(tǒng)調用在不滿足條件的情況默認是會引起線程阻塞的,那么為什么 Go 里并不會引起線程阻塞,而僅僅是引起協(xié)程阻塞呢?這就是 Go 的一個特點:以同步方式寫異步代碼。這種方式利于編寫代碼和理解代碼,比如在 Node.js 中,我們需要接收一個 TCP 連接上的數(shù)據(jù)需要通過事件回調的方式來寫。

const socket = net.connect(...);
socket.on('data', function(data) {});

這種方式讓我們很難理解代碼的執(zhí)行路徑,尤其是回調里又嵌套回調時就更復雜了,雖然 Promise 可以一定程度上緩解這個問題,但是 Node.js 從架構上就是基于事件回調的,很多地方還是避免不了異步回調的寫法。在 Go 中,寫法就非常簡單,其底層使用的是非阻塞 IO,再結合協(xié)程切換機制實現(xiàn)的。接下來以 Read 為例,看看具體的實現(xiàn)。

func (c *conn) Read(b []byte) (int, error) {
 n, err := c.fd.Read(b)
 return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
 n, err = fd.pfd.Read(p)
 return n, wrapSyscallError(readSyscallName, err)
}

func (fd *FD) Read(p []byte) (int, error) {
  // 獲取鎖
 if err := fd.readLock(); err != nil {
  return 0, err
 }
 defer fd.readUnlock()
  // 判斷是否超時或錯誤
 if err := fd.pd.prepareRead(fd.isFile); err != nil {
  return 0, err
 }
 for {
    // 以非阻塞方式執(zhí)行系統(tǒng)調用 read
  n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
  if err != nil {
   n = 0
      // 沒有數(shù)據(jù)并且是 IO 多路復用模塊支持監(jiān)聽的 fd 類型
   if err == syscall.EAGAIN && fd.pd.pollable() {
        // 阻塞協(xié)程
    if err = fd.pd.waitRead(fd.isFile); err == nil {
     continue
    }
   }
  }
  err = fd.eofError(n, err)
  return n, err
 }
}

func (pd *pollDesc) waitRead(isFile bool) error {
 return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
 res := runtime_pollWait(pd.runtimeCtx, mode)
 return convertErr(res, isFile)
}

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
 for !netpollblock(pd, int32(mode), false) {
  errcode = netpollcheckerr(pd, int32(mode))
  if errcode != pollNoError {
   return errcode
  }
 }
 return pollNoError
}

// pollDesc 是對一個 fd、事件和關聯(lián)的協(xié)程的封裝
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 gpp := &pd.rg
 if mode == 'w' {
  gpp = &pd.wg
 }

 // set the gpp semaphore to pdWait
 for {
  // 把 pollDesc 切換成 pdWait 狀態(tài)
  if gpp.CompareAndSwap(pdNil, pdWait) {
   break
  }
 }

 gopark(netpollblockcommit, unsafe.Pointer(gpp), ...)
 
  // 事件就緒后改成 pdNil 狀態(tài)
 old := gpp.Swap(pdNil)
 return old == pdReady
}

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, ...) {
 mp.waitlock = lock
 mp.waitunlockf = unlockf
 releasem(mp)
 mcall(park_m)
}

func park_m(gp *g) {
 mp := getg().m
  // 把當前協(xié)程改成 _Gwaiting 狀態(tài)
 casgstatus(gp, _Grunning, _Gwaiting)

 if fn := mp.waitunlockf; fn != nil {
    // 把 pollDesc 的 rg 字段改成協(xié)程結構體的地址
    // atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
  ok := fn(gp, mp.waitlock)
 }
  // 重新調度其他協(xié)程執(zhí)行
 schedule()
}

可以看到 Read 在沒有數(shù)據(jù)可讀時,調用協(xié)程會被修改成等待狀態(tài),等待事件的發(fā)生,同時發(fā)生調度選擇其他協(xié)程繼續(xù)運行,所以一個協(xié)程的阻塞影響的只是自己,而不是影響到整個線程,這大大地提供了資源的利用率和執(zhí)行效率。

那么阻塞的協(xié)程什么時候又是怎么被喚醒的呢?Go 會在 sysmon 線程、調度等時機執(zhí)行 netpool 獲取就緒的事件,從而處理相關的協(xié)程。

func sysmon() {
  if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
    // 更新上次 poll 的時間
    sched.lastpoll.CompareAndSwap(lastpoll, now)
    // 通過 IO 多路復用模塊獲取就緒的事件(所以關聯(lián)的 g)列表
    list, delta := netpoll(0) // non-blocking - returns list of goroutines
    if !list.empty() {
      incidlelocked(-1)
      // 把就緒的 g 放入隊列等待調度
      injectglist(&list)
      incidlelocked(1)
      netpollAdjustWaiters(delta)
    }
  }
}

func netpoll(delay int64) (gList, int32) {
 var tp *timespec
 var ts timespec
 
 var events [64]keventt
retry:
  // 獲取就緒事件
 n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
 var toRun gList
 delta := int32(0)
  // 逐個處理
 for i := 0; i < int(n); i++ {
  ev := &events[i]
  var mode int32
  switch ev.filter {
  case _EVFILT_READ:
   mode += 'r'
  case _EVFILT_WRITE:
   mode += 'w'
  }
  if mode != 0 {
   var pd *pollDesc
      // 找到 pollDesc 中記錄的等待協(xié)程
   pd = (*pollDesc)(unsafe.Pointer(ev.udata))
   pd.setEventErr(ev.flags == _EV_ERROR, tag)
      // 修改狀態(tài)
   delta += netpollready(&toRun, pd, mode)
  }
 }
 return toRun, delta
}

func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
 delta := int32(0)
 var rg, wg *g
  // 修改狀態(tài)
 if mode == 'r' || mode == 'r'+'w' {
  rg = netpollunblock(pd, 'r', true, &delta)
 }
 if mode == 'w' || mode == 'r'+'w' {
  wg = netpollunblock(pd, 'w', true, &delta)
 }
 if rg != nil {
  toRun.push(rg)
 }
 if wg != nil {
  toRun.push(wg)
 }
 return delta
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
 gpp := &pd.rg
 if mode == 'w' {
  gpp = &pd.wg
 }

 for {
  old := gpp.Load()
  new := pdReady
    // 設置 pollDesc 的 rg 或 wg 為 pdReady,返回等待的協(xié)程
  if gpp.CompareAndSwap(old, new) {
   *delta -= 1
   return (*g)(unsafe.Pointer(old))
  }
 }
}

Go 最終把就緒的協(xié)程放入就緒隊列等待調度執(zhí)行。

系統(tǒng)調用

有了 IO 多路復用模塊,IO 操作只注冊事件,阻塞協(xié)程,然后數(shù)據(jù)就緒時喚醒協(xié)程,并以非阻塞的方式調用 read 讀取數(shù)據(jù)就行。但是很可惜,IO 多路復用模塊并不支持所有類型的 IO,比如 epoll 就不支持普通文件的 IO,所以文件 IO 就只能直接以阻塞的方式調系統(tǒng)調用來實現(xiàn)了,但是調系統(tǒng)調用不僅耗時而且可能會引起線程阻塞,又因為 Go gmp 機制中,m 需要獲取 p 才能執(zhí)行 g,一旦線程阻塞就會凍結一個 m、g、p,而 p 被凍結后,p 里面的協(xié)程就沒法執(zhí)行了,所以這時候需要一種方式讓 p 能脫離出來被其他線程處理,這就是 Go 的 handoff 機制。handoff 機制不僅在文件 IO 中使用,在調用其他系統(tǒng)調用時也會使用。接著看一下打開一個文件的過程。

func Open(name string) (*File, error) {
 return OpenFile(name, O_RDONLY, 0)
}

func OpenFile(name string, flag int, perm FileMode) (*File, error) {
 f, err := openFileNolog(name, flag, perm)
 if err != nil {
  return nil, err
 }
 f.appendMode = flag&O_APPEND != 0

 return f, nil
}

func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
 ignoringEINTR(func() error {
  r, s, e = open(name, flag|syscall.O_CLOEXEC, syscallMode(perm))
  return e
 })
  // ...
 return f, nil
}

func open(path string, flag int, perm uint32) (int, poll.SysFile, error) {
 fd, err := syscall.Open(path, flag, perm)
 return fd, poll.SysFile{}, err
}

func Open(path string, mode int, perm uint32) (fd int, err error) {
 r0, _, e1 := syscall(abi.FuncPCABI0(libc_open_trampoline), uintptr(unsafe.Pointer(_p0)), uintptr(mode), uintptr(perm))

 return
}

func syscall_syscall(fn, a1, a2, a3 uintptr) (r1, r2, err uintptr) {
 args := struct{ fn, a1, a2, a3, r1, r2, err uintptr }{fn, a1, a2, a3, r1, r2, err}
 // 執(zhí)行系統(tǒng)調用前的處理
  entersyscall()
 libcCall(unsafe.Pointer(abi.FuncPCABI0(syscall)), unsafe.Pointer(&args))
  // 執(zhí)行完系統(tǒng)調用前的處理
 exitsyscall()
 return args.r1, args.r2, args.err
}

可以看到最終在執(zhí)行系統(tǒng)調用時會先進行一些特殊的處理,看一下 entersyscall。

func entersyscall() {
 fp := getcallerfp()
 reentersyscall(getcallerpc(), getcallersp(), fp)
}

func reentersyscall(pc, sp, bp uintptr) {
 trace := traceAcquire()
 gp := getg()
  // 把當前協(xié)程改成 _Gsyscall 狀態(tài)
 casgstatus(gp, _Grunning, _Gsyscall)
 gp.m.syscalltick = gp.m.p.ptr().syscalltick
  // 接觸 m 和 p 的關系
 pp := gp.m.p.ptr()
 pp.m = 0
  // m 中保存當前的 p,執(zhí)行完系統(tǒng)調用后優(yōu)先獲取該 p
 gp.m.oldp.set(pp)
 gp.m.p = 0
  // 把 p 的狀態(tài)改成 _Psyscall
 atomic.Store(&pp.status, _Psyscall)
}

這里只是需改了下數(shù)據(jù),并不會直接執(zhí)行 handoff 機制,執(zhí)行完 reentersyscall 后,協(xié)程和所在的線程就陷入系統(tǒng)調用了,然后 sysmon 線程會定時處理相關的邏輯,sysmon 中有一段搶占的邏輯。

func retake(now int64) uint32 {
 n := 0

 // 遍歷所有 p
 for i := 0; i < len(allp); i++ {
  pp := allp[i]
  pd := &pp.sysmontick
  s := pp.status
    // 處理處于系統(tǒng)調用的 p
  if s == _Psyscall {
   // 把 p 改成空閑狀態(tài)
   if atomic.Cas(&pp.status, s, _Pidle) {
        // 處理 p 上的協(xié)程
    handoffp(pp)
   } 
  }
 }
}

sysmon 把處于系統(tǒng)調度的 p 交給其他空閑線程或新建線程進行處理。

func handoffp(pp *p) {
 // 還有 g 需要處理,創(chuàng)建新的線程(m)
 if !runqempty(pp) || sched.runqsize != 0 {
  startm(pp, false, false)
  return
 }
}

這樣就保證了 p 上的協(xié)程可以被及時處理。

睡眠

Go 中可以通過 time.Sleep 讓協(xié)程定時睡眠一段時間,time.Sleep 實現(xiàn)如下。

func timeSleep(ns int64) {
 gp := getg()
 t := gp.timer
 if t == nil {
  t = new(timer)
    // 設置超時時間函數(shù)和參數(shù)
  t.init(goroutineReady, gp)
  gp.timer = t
 }
 when := nanotime() + ns
 gp.sleepWhen = when
  // 阻塞協(xié)程
 gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1)
}

time.Sleep 首先設置了超時時間函數(shù)和參數(shù),然后把協(xié)程改成阻塞狀態(tài)并觸發(fā)重新調度,最后執(zhí)行 resetForSleep 注冊定時器,Go 在調度時,會判斷是否有定時器超時。

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
 mp := getg().m
 pp := mp.p.ptr()
 now, pollUntil, _ := pp.timers.check(0)
}

func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
  // 最快超時的定時器時間
 next := ts.wakeTime()
 if next == 0 {
  // No timers to run or adjust.
  return now, 0, false
 }

 now = nanotime()

 if len(ts.heap) > 0 {
  ts.adjust(now, false)
  for len(ts.heap) > 0 {
   // 處理超時的定時器,如果超時的話
   if tw := ts.run(now); tw != 0 {
    if tw > 0 {
     pollUntil = tw
    }
    break
   }
   ran = true
  }
 }
 return now, pollUntil, ran
}

func (ts *timers) run(now int64) int64 {
 tw := ts.heap[0]
 t := tw.timer
 t.lock()
 if t.when > now {
  // Not ready to run.
  t.unlock()
  return t.when
 }
 t.unlockAndRun(now)
 return 0
}

func (t *timer) unlockAndRun(now int64) {
 f := t.f
 arg := t.arg
 seq := t.seq
 var next int64
 delay := now - t.when
 f(arg, seq, delay)
}

對于 time.Sleep 來時,f 對應的函數(shù)是 goroutineReady。

func goroutineReady(arg any, _ uintptr, _ int64) {
 goready(arg.(*g), 0)
}

func goready(gp *g, traceskip int) {
 systemstack(func() {
  ready(gp, traceskip, true)
 })
}

func ready(gp *g, traceskip int, next bool) {
 // 獲取當前線程的 m
 mp := acquirem()
 // 修改 g 的狀態(tài)為就緒,等待調度
 casgstatus(gp, _Gwaiting, _Grunnable)
 // 把 g 放到 m 關聯(lián)到 p 的 g 隊列
 runqput(mp.p.ptr(), gp, next)
  // 喚醒/創(chuàng)建線程處理
 wakep()
 releasem(mp)
}

goroutineReady 最終把協(xié)程加入就緒隊列,等待調度。

搶占式調度

和之前在函數(shù)里插入監(jiān)測點的方式不一樣,現(xiàn)在 Go 已經(jīng)通過信號機制支持搶占式調度,防止某個協(xié)程執(zhí)行的 CPU 時間過長,因為信號機制具有非常高的優(yōu)先級,通過信號可以徹底解決協(xié)程長期占據(jù) CPU 的問題。Go 在初始化時會注冊信號的處理函數(shù)。

func initsig(preinit bool) {
 for i := uint32(0); i < _NSIG; i++ {
  setsig(i, abi.FuncPCABIInternal(sighandler))
 }
}

func setsig(i uint32, fn uintptr) {
 var sa usigactiont
 sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTART
 sa.sa_mask = ^uint32(0)
  // 設置信號處理函數(shù)
 fn = abi.FuncPCABI0(sigtramp)
 *(*uintptr)(unsafe.Pointer(&sa.__sigaction_u)) = fn
 sigaction(i, &sa, nil)
}

然后在 sysmon 線程中定時判斷是否有協(xié)程執(zhí)行的時間過長。

func retake(now int64) uint32 {
 // 遍歷所有 p
 for i := 0; i < len(allp); i++ {
  pp := allp[i]
  s := pp.status
  if s == _Prunning {
   if pd.schedwhen+forcePreemptNS <= now {
    preemptone(pp)
   }
  }
  }
}

func preemptone(pp *p) bool {
 mp := pp.m.ptr()
 // 設置搶占標記
 gp.preempt = true
 gp.stackguard0 = stackPreempt
 // 給協(xié)程所在的線程 m 發(fā)信號進行搶占處理
 if preemptMSupported && debug.asyncpreemptoff == 0 {
  pp.preempt = true
  preemptM(mp)
 }

 return true
}

func preemptM(mp *m) {
  // 還沒發(fā)送則發(fā)送信號
 if mp.signalPending.CompareAndSwap(0, 1) {
  signalM(mp, sigPreempt)
 }
}

// 給指定線程發(fā)送信號
func signalM(mp *m, sig int) {
 pthread_kill(pthread(mp.procid), uint32(sig))
}

給指定線程發(fā)送信號后,信號處理函數(shù)就會在對應線程的上下文執(zhí)行,從而獲取到該線程上一直占用 CPU 的協(xié)程,信號處理函數(shù)是 sigtramp。

TEXT runtime·sigtramp(SB),NOSPLIT|TOPFRAME,$28
 // Save callee-saved C registers, since the caller may be a C signal handler.
 MOVL BX, bx-4(SP)
 MOVL BP, bp-8(SP)
 MOVL SI, si-12(SP)
 MOVL DI, di-16(SP)
 // We don't save mxcsr or the x87 control word because sigtrampgo doesn't
 // modify them.

 MOVL (28+4)(SP), BX
 MOVL BX, 0(SP)
 MOVL (28+8)(SP), BX
 MOVL BX, 4(SP)
 MOVL (28+12)(SP), BX
 MOVL BX, 8(SP)
 CALL runtime·sigtrampgo(SB)

 MOVL di-16(SP), DI
 MOVL si-12(SP), SI
 MOVL bp-8(SP),  BP
 MOVL bx-4(SP),  BX
 RET

最終執(zhí)行 sigtrampgo。

func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) {
 c := &sigctxt{info, ctx}
 gp := sigFetchG(c)
 setg(gp.m.gsignal)
 sighandler(sig, info, ctx, gp)
}

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
 gsignal := getg()
 mp := gsignal.m
 c := &sigctxt{info, ctxt}
 if sig == sigPreempt && debug.asyncpreemptoff == 0 && !delayedSignal {
  doSigPreempt(gp, c)
 }
}

func doSigPreempt(gp *g, ctxt *sigctxt) {
 if wantAsyncPreempt(gp) {
  if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
      // 修改內存,注入 asyncPreempt 地址
   ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
  }
 }
}

func (c *sigctxt) pushCall(targetPC, resumePC uintptr) {
 sp := c.sp() - 16 // SP needs 16-byte alignment
 c.set_sp(sp)
 *(*uint64)(unsafe.Pointer(uintptr(sp))) = c.lr()
 *(*uint64)(unsafe.Pointer(uintptr(sp - goarch.PtrSize))) = c.r29()
 c.set_lr(uint64(resumePC))
 c.set_pc(uint64(targetPC))
}

sigtrampgo 最終修改了內存地址注入 asyncPreempt 函數(shù)地址,信號處理結束后執(zhí)行 asyncPreempt,asyncPreempt 繼續(xù)執(zhí)行 asyncPreempt2。

func asyncPreempt2() {
 gp := getg()
 if gp.preemptStop {
  mcall(preemptPark)
 } else {
  mcall(gopreempt_m)
 }
}

func gopreempt_m(gp *g) {
 goschedImpl(gp, true)
}

func goschedImpl(gp *g, preempted bool) {
 // 把協(xié)程改成就緒狀態(tài)
 casgstatus(gp, _Grunning, _Grunnable)
  // 解除 m 和 g 的關系
 dropg()
  // 消耗太多 CPU 了,把 g 放入全局隊列
 globrunqput(gp)
  // 調度其他協(xié)程執(zhí)行
 schedule()
}

總結

Node.js / Redis / Nginx 等軟件的架構都是單線程的,所有的任務都是在單個線程中被串行執(zhí)行,盡管底層有線程池(處理耗時或阻塞式操作),但是線程池對用戶是不感知的,我們的可以理解為我們的任務或代碼是在單個線程中執(zhí)行的,比如 Redis 命令就是串行執(zhí)行的,不需要擔心多線程的問題,Node.js 的代碼也是單線程中執(zhí)行的,不需要擔心數(shù)據(jù)競爭問題,另外這些軟件都是基于異步回調的,代碼邏輯會比較割裂,對編寫和理解代碼來說有一定的負擔。

但是在 Go 中情況有所不同。Go 可以通過 go 關鍵字創(chuàng)建多個協(xié)程,這些協(xié)程是跑在多個線程中的,天然利用了多核能力,但是如果使用了公共的數(shù)據(jù)結構,需要通過互斥機制保證數(shù)據(jù)的正確性,而又因為搶占式調度的存在,盡管我們只跑在一個線程中,對共享數(shù)據(jù)的修改也會存在競態(tài)條件??偟膩碚f,Go 的架構是在多個線程上通過 gmp 機制運行多個協(xié)程,并在必要的時候進行搶占式調度,單個協(xié)程內執(zhí)行時,不同的阻塞式 API 其底層實現(xiàn)是不一樣的,一般來說,大多數(shù) API(網(wǎng)絡 IO、睡眠) 都是阻塞協(xié)程不阻塞線程,其原理是把協(xié)程改成阻塞狀態(tài)并放到等待隊列中,在合適的時機并且滿足條件時把它放到就緒隊列等待調度,而部分 API(文件讀寫或其他系統(tǒng)調用)是會引起線程阻塞,這時候 Go 通過 handoff 機制保證其他協(xié)程的執(zhí)行,但是這些對于用戶都是無感的,單協(xié)程內代碼是串行執(zhí)行的。Go 在原生利用多核、同步寫異步代碼和搶占式調度上對用戶來說是比較有意義的,寫過 Node.js 的同學應該更加深有體會。

責任編輯:武曉燕 來源: 編程雜技
相關推薦

2023-12-20 08:23:53

NIO組件非阻塞

2021-11-10 15:37:49

Go源碼指令

2024-02-19 00:00:00

Docker輕量級容器

2025-01-16 16:41:00

ObjectConditionJDK

2024-10-05 00:00:00

HTTPS性能HTTP/2

2024-07-30 08:22:47

API前端網(wǎng)關

2024-11-08 09:48:38

異步編程I/O密集

2024-06-20 08:06:30

2024-12-04 08:40:19

2022-11-28 00:04:17

2024-01-15 12:16:37

2023-01-09 08:00:41

JavaScript閉包

2013-02-27 10:27:44

GitHub

2022-05-09 08:56:27

Go淺拷貝接口

2015-10-23 09:34:16

2024-06-27 10:51:28

生成式AI領域

2023-07-11 00:12:05

2024-09-02 00:30:41

Go語言場景

2023-12-12 08:41:01

2024-10-09 08:54:31

點贊
收藏

51CTO技術棧公眾號