Go并發(fā)機(jī)制解密:Goroutine調(diào)度
Goroutine 是 Go 編程語言中一個極具特色的設(shè)計(jì),也是其并發(fā)能力的核心亮點(diǎn)之一。Goroutine 本質(zhì)上是一種協(xié)程(Coroutine),是實(shí)現(xiàn)并行計(jì)算的關(guān)鍵。使用 Goroutine 非常簡單,只需通過 go 關(guān)鍵字即可啟動一個協(xié)程,協(xié)程會以異步方式運(yùn)行。程序無需等待 Goroutine 完成即可繼續(xù)執(zhí)行后續(xù)代碼。
go func() // 使用 go 關(guān)鍵字啟動一個協(xié)程
II. Goroutine 的內(nèi)部原理
概念介紹
并發(fā)(Concurrency)
在單個 CPU 上,可以同時執(zhí)行多個任務(wù)。在極短的時間內(nèi),CPU 會在任務(wù)之間快速切換(例如,先執(zhí)行一小段程序 A,然后迅速切換到程序 B)。從宏觀上看,這種任務(wù)的時間上有重疊,似乎是同時執(zhí)行的,但從微觀上看,實(shí)際上是順序執(zhí)行的。這種現(xiàn)象稱為并發(fā)。
并行(Parallelism)
當(dāng)系統(tǒng)擁有多個 CPU 時,每個 CPU 可以同時運(yùn)行任務(wù),且各自不需要爭奪資源。多個任務(wù)真正同時運(yùn)行,這種現(xiàn)象稱為并行。
進(jìn)程(Process)
當(dāng) CPU 在多個程序之間切換時,如果不保存之前程序的狀態(tài)(即上下文),直接切換到下一個程序,那么之前程序的一系列狀態(tài)會丟失。為了解決這個問題,引入了進(jìn)程的概念。進(jìn)程為程序執(zhí)行分配所需的資源,因此進(jìn)程是程序運(yùn)行的基本資源單位(也可以看作程序執(zhí)行的實(shí)體)。例如,運(yùn)行一個文本編輯器時,該進(jìn)程會管理所有資源,如文本緩沖區(qū)的內(nèi)存空間、文件操作資源等。
線程(Thread)
CPU 在多個進(jìn)程之間切換時,由于需要進(jìn)入內(nèi)核模式并讀取用戶模式數(shù)據(jù),切換開銷較大。隨著進(jìn)程數(shù)量增加,CPU 調(diào)度會消耗大量資源。為了解決這一問題,引入了線程的概念。線程本身消耗的資源很少,它們共享進(jìn)程內(nèi)的資源。線程的調(diào)度開銷比進(jìn)程小得多。例如,在一個 Web 服務(wù)器應(yīng)用中,可以使用多個線程同時處理不同的客戶端請求,這些線程共享服務(wù)器進(jìn)程的資源(如網(wǎng)絡(luò)連接和內(nèi)存緩存)。
協(xié)程(Coroutine)
協(xié)程擁有自己的寄存器上下文和棧。當(dāng)協(xié)程被調(diào)度切換時,會保存當(dāng)前的寄存器上下文和棧;當(dāng)切換回來時,則恢復(fù)之前保存的上下文和棧。因此,協(xié)程可以保留上一次調(diào)用的狀態(tài)(即所有局部狀態(tài)的特定組合)。每次重新進(jìn)入?yún)f(xié)程時,相當(dāng)于返回到上次調(diào)用時的狀態(tài),即邏輯流程中上次退出的位置。
線程和進(jìn)程的操作由系統(tǒng)接口觸發(fā),最終由系統(tǒng)執(zhí)行;而協(xié)程的操作由用戶程序自身執(zhí)行。Goroutine 就是一種協(xié)程。
調(diào)度模型簡介
Goroutine 的強(qiáng)大并發(fā)能力通過 GPM 調(diào)度模型實(shí)現(xiàn)。以下是 Goroutine 調(diào)度模型的核心結(jié)構(gòu):
調(diào)度器中的四個重要結(jié)構(gòu)
- M(Machine)表示內(nèi)核級線程。每個 M 對應(yīng)一個線程,Goroutine 運(yùn)行在 M 上。例如,當(dāng)一個 Goroutine 被啟動以執(zhí)行復(fù)雜計(jì)算時,該 Goroutine 會被分配到一個 M 上執(zhí)行。M 是一個較大的結(jié)構(gòu),包含小對象內(nèi)存緩存(mcache)、當(dāng)前正在執(zhí)行的 Goroutine、隨機(jī)數(shù)生成器等信息。
- G(Goroutine)表示 Goroutine。它有自己的棧,用于存儲函數(shù)調(diào)用信息,還有一個指令指針,用于指定執(zhí)行位置。此外,G 還包含其他信息(如等待的通道信息),這些信息用于調(diào)度。例如,當(dāng)一個 Goroutine 等待從通道接收數(shù)據(jù)時,該信息會存儲在 G 結(jié)構(gòu)中。
- P(Processor)全稱為 Processor,主要用于執(zhí)行 Goroutine??梢詫⑵湟暈槿蝿?wù)分發(fā)器。P 維護(hù)一個 Goroutine 隊(duì)列,存儲需要由其執(zhí)行的所有 Goroutine。例如,當(dāng)創(chuàng)建多個 Goroutine 時,這些 Goroutine 會被添加到 P 的隊(duì)列中等待調(diào)度。
- Sched(Scheduler)表示調(diào)度器??梢钥醋魇侵醒胝{(diào)度中心,維護(hù) M 和 G 的隊(duì)列,以及調(diào)度器的一些狀態(tài)信息,確保整個系統(tǒng)的高效調(diào)度。
調(diào)度的實(shí)現(xiàn)
調(diào)度模型圖
如圖所示,有兩個物理線程 M,每個 M 綁定一個處理器 P,并運(yùn)行一個 Goroutine。
- P 的數(shù)量可以通過 GOMAXPROCS() 設(shè)置。它實(shí)際上表示真正的并發(fā)級別,即可以同時運(yùn)行的 Goroutine 數(shù)量。
- 圖中灰色的 Goroutine 尚未運(yùn)行,處于就緒狀態(tài),等待被調(diào)度。P 維護(hù)了這些 Goroutine 的隊(duì)列(稱為運(yùn)行隊(duì)列 runqueue)。
- 在 Go 語言中,啟動一個 Goroutine 非常簡單:只需使用 go function。每次執(zhí)行 go 語句時,都會將一個 Goroutine 添加到運(yùn)行隊(duì)列末尾。在下一個調(diào)度點(diǎn),會從運(yùn)行隊(duì)列中取出一個 Goroutine 執(zhí)行。
當(dāng)某個操作系統(tǒng)線程(如 M0)被阻塞時(如下圖所示),P 會切換到另一個線程(如 M1)。M1 可能是新創(chuàng)建的,也可能是從線程緩存中取出的。
線程阻塞切換圖
當(dāng) M0 返回時,它需要嘗試獲取一個 P 來運(yùn)行 Goroutine。如果無法獲取 P,它會將 Goroutine 放入全局運(yùn)行隊(duì)列,并進(jìn)入休眠狀態(tài)(進(jìn)入線程緩存)。所有 P 會定期檢查全局運(yùn)行隊(duì)列,并運(yùn)行其中的 Goroutine;否則,全局運(yùn)行隊(duì)列中的 Goroutine 將永遠(yuǎn)無法執(zhí)行。
III. Goroutine 的使用
基本用法
設(shè)置 Goroutine 的運(yùn)行 CPU 數(shù)量。Go 的最新版本默認(rèn)會自動設(shè)置。
num := runtime.NumCPU() // 獲取主機(jī)的邏輯 CPU 數(shù)量
runtime.GOMAXPROCS(num) // 根據(jù)主機(jī) CPU 數(shù)量設(shè)置 Goroutine 的最大并發(fā)級別
使用示例
示例 1:簡單的 Goroutine 計(jì)算
package main
import (
"fmt"
"time"
)
func cal(a int, b int) {
c := a + b
fmt.Printf("%d + %d = %d\n", a, b, c)
}
func main() {
for i := 0; i < 10; i++ {
go cal(i, i+1) // 啟動 10 個 Goroutine 進(jìn)行計(jì)算
}
time.Sleep(time.Second * 2) // 等待所有任務(wù)完成
}
運(yùn)行結(jié)果:
8 + 9 = 17
9 + 10 = 19
4 + 5 = 9
...
Goroutine 異常捕獲
當(dāng)啟動多個 Goroutine 時,如果其中一個發(fā)生異常且未處理,整個程序會終止。因此,建議在每個 Goroutine 的函數(shù)中添加異常處理??梢允褂?nbsp;recover 函數(shù)捕獲異常。
package main
import (
"fmt"
"time"
)
func addele(a []int, i int) {
deferfunc() {
if err := recover(); err != nil {
fmt.Println("add ele fail")
}
}()
a[i] = i
fmt.Println(a)
}
func main() {
Arry := make([]int, 4)
for i := 0; i < 10; i++ {
go addele(Arry, i)
}
time.Sleep(time.Second * 2)
}
運(yùn)行結(jié)果:
add ele fail
[0 0 0 0]
[0 1 0 0]
...
Goroutine 的同步
由于 Goroutine 是異步執(zhí)行的,主程序可能在 Goroutine 完成前退出。為確保所有 Goroutine 完成后再退出,Go 提供了 sync 包和 channel 來解決同步問題。
示例 1:使用 sync.WaitGroup 同步 Goroutine
package main
import (
"fmt"
"sync"
)
func cal(a int, b int, n *sync.WaitGroup) {
c := a + b
fmt.Printf("%d + %d = %d\n", a, b, c)
defer n.Done()
}
func main() {
var go_sync sync.WaitGroup
for i := 0; i < 10; i++ {
go_sync.Add(1)
go cal(i, i+1, &go_sync)
}
go_sync.Wait()
}
運(yùn)行結(jié)果:
9 + 10 = 19
2 + 3 = 5
...
Goroutine 間的通信
Goroutine 本質(zhì)上是協(xié)程,可以通過 channel 實(shí)現(xiàn)通信或數(shù)據(jù)共享。
示例:使用 channel 模擬生產(chǎn)者-消費(fèi)者模式
package main
import (
"fmt"
"sync"
)
func Productor(mychan chan int, data int, wait *sync.WaitGroup) {
mychan <- data
fmt.Println("product data:", data)
wait.Done()
}
func Consumer(mychan chan int, wait *sync.WaitGroup) {
a := <-mychan
fmt.Println("consumer data:", a)
wait.Done()
}
func main() {
datachan := make(chanint, 100)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go Productor(datachan, i, &wg)
}
for j := 0; j < 10; j++ {
wg.Add(1)
go Consumer(datachan, &wg)
}
wg.Wait()
}
運(yùn)行結(jié)果:
product data: 0
consumer data: 0
...