偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

我所理解的 Go 的 CSP 并發(fā)控制機(jī)制

開發(fā) 前端
本文將具體討論 Go 中的?并發(fā)控制機(jī)制 (concurrency control mechanisms)?,特別是基于 CSP (Communicating Sequential Processes) 的實(shí)現(xiàn),包括?chan?和?select?等關(guān)鍵要素的設(shè)計(jì)思路及核心實(shí)現(xiàn)細(xì)節(jié)。

你一定聽說過 Go 語(yǔ)言所倡導(dǎo)的這個(gè)核心并發(fā)原則:“不要通過共享內(nèi)存來通信,而要通過通信來共享內(nèi)存 (Don't communicate by sharing memory; instead, share memory by communicating)”。這一理念深刻影響了 Go 的并發(fā)設(shè)計(jì)。

本文將具體討論 Go 中的 并發(fā)控制機(jī)制 (concurrency control mechanisms) ,特別是基于 CSP (Communicating Sequential Processes) 的實(shí)現(xiàn),包括 chan 和 select 等關(guān)鍵要素的設(shè)計(jì)思路及核心實(shí)現(xiàn)細(xì)節(jié)。理解這些內(nèi)容,對(duì)于編寫出高效、安全的 Go 并發(fā)程序至關(guān)重要。本文假設(shè)讀者已經(jīng)對(duì) Go 的 GPM 調(diào)度模型 (GPM scheduling model) 有了比較深入的了解。

CSP, Communicating Sequential Processes

令我頗感驚訝的是,CSP 這個(gè)并發(fā)模型是由計(jì)算機(jī)科學(xué)家 托尼·霍爾 (Tony Hoare) 在 1978 年提出的。在那個(gè)個(gè)人計(jì)算機(jī)尚未普及、多核處理器更是遙不可及的年代,學(xué)術(shù)界和工業(yè)界普遍關(guān)注的重點(diǎn)是如何在單核處理器上實(shí)現(xiàn)有效的任務(wù)并發(fā)與切換,以及如何管理共享資源帶來的復(fù)雜性。

CSP 的核心思想是將獨(dú)立的、順序執(zhí)行的進(jìn)程作為基本的計(jì)算單元。這些進(jìn)程之間不共享內(nèi)存,而是通過顯式的 通道 (channels) 來進(jìn)行通信和同步。一個(gè)進(jìn)程向通道發(fā)送消息,另一個(gè)進(jìn)程從該通道接收消息。這種通信方式是同步的,即發(fā)送方會(huì)阻塞直到接收方準(zhǔn)備好接收,或者接收方會(huì)阻塞直到發(fā)送方發(fā)送了消息(對(duì)于無緩沖通道而言)。

Go 語(yǔ)言在原生層面通過 chan 的設(shè)計(jì),為 CSP 模型提供了強(qiáng)大的支持。這樣做的好處顯而易見:

  1. 簡(jiǎn)化并發(fā)邏輯 :通過將數(shù)據(jù)在不同 goroutine 之間傳遞,而不是共享狀態(tài),極大地降低了并發(fā)編程中數(shù)據(jù)競(jìng)爭(zhēng)的風(fēng)險(xiǎn)。開發(fā)者可以將注意力更多地放在消息的流動(dòng)和處理上,而不是復(fù)雜的鎖機(jī)制。
  2. 清晰的關(guān)系 :在任意時(shí)刻,數(shù)據(jù)要么屬于某個(gè) goroutine,要么正在通過 chan 進(jìn)行傳遞。這種清晰的關(guān)系使得推理程序的行為變得更加容易。
  3. 可組合性 :基于 chan 的組件更容易組合起來構(gòu)建更復(fù)雜的并發(fā)系統(tǒng)。

與主流的并發(fā)模型相比,Go 的 CSP 實(shí)現(xiàn)展現(xiàn)出其獨(dú)特性。

  • 對(duì)比 Java/pthread 的共享內(nèi)存模型 :Java 和 C++ (pthread) 等語(yǔ)言主要依賴共享內(nèi)存和鎖(如 mutex、semaphore)進(jìn)行并發(fā)控制。這種模型下,開發(fā)者需要非常小心地管理對(duì)共享數(shù)據(jù)的訪問,否則極易出現(xiàn) 死鎖 (deadlock) 和 競(jìng)態(tài)條件 (race condition) 。Go 的 CSP 模型通過 chan 將數(shù)據(jù)在 goroutine 間傳遞,避免了直接的內(nèi)存共享,從而在設(shè)計(jì)上減少了這類問題。內(nèi)存同步由 chan 的操作隱式完成。
  • 對(duì)比 Actor 模型 :Actor 模型(如 Akka、Erlang OTP 中的 gen_server)與 CSP 有相似之處,都強(qiáng)調(diào)通過消息傳遞進(jìn)行通信,避免共享狀態(tài)。主要區(qū)別在于 Actor 通常擁有自己的狀態(tài),并且 Actor 之間的通信是異步的,每個(gè) Actor 一般都有一個(gè)郵箱 (mailbox) 來存儲(chǔ)傳入的消息。而 Go 的 chan 通信可以是同步的(無緩沖 chan)或異步的(有緩沖 chan)。Go 的 goroutine 比 Actor 更輕量。
  • 對(duì)比 JavaScript 的異步回調(diào)/Promise :JavaScript (尤其是在 Node.js 環(huán)境中) 采用單線程事件循環(huán)和異步回調(diào)(或 Promise/async/await)來處理并發(fā)。這種方式避免了多線程帶來的復(fù)雜性,但在回調(diào)層級(jí)很深(回調(diào)地獄 callback hell)時(shí),代碼可讀性和維護(hù)性會(huì)下降。Promise 和 async/await 改善了這一點(diǎn),但其并發(fā)的本質(zhì)仍然是協(xié)作式的單任務(wù)切換,而非像 Go 那樣可以利用多核進(jìn)行并行計(jì)算的搶占式調(diào)度。

在調(diào)度方面,Go 的 goroutine 由 Go 運(yùn)行時(shí)進(jìn)行調(diào)度,是用戶態(tài)的輕量級(jí)線程,切換成本遠(yuǎn)低于操作系統(tǒng)線程。chan 的操作天然地與調(diào)度器集成,可以高效地掛起和喚醒 goroutine。在公平性方面,select 語(yǔ)句在處理多個(gè) chan 操作時(shí),會(huì)通過一定的隨機(jī)化策略來避免饑餓問題。Go 的并發(fā)原語(yǔ)設(shè)計(jì)精良,易于組合,使得構(gòu)建復(fù)雜的并發(fā)模式成為可能。

關(guān)于并發(fā)模型的更多更詳細(xì)的對(duì)比,讀者可以參考 Paul Butcher 的《七周七并發(fā)模型 (Seven Concurrency Models in Seven Weeks: When Threads Unravel) 》。雖已在我的書單中,但我也還未完全讀完,歡迎互相交流學(xué)習(xí)。

chan 具體是什么

chan 是 Go 語(yǔ)言中用于在不同 goroutine 之間傳遞數(shù)據(jù)和同步執(zhí)行的核心類型。它是一種類型化的管道,你可以通過它發(fā)送和接收特定類型的值。

我們從一個(gè)簡(jiǎn)單的 chan 用法開始:

package main

import (
    "fmt"
    "time"
)

func main() {
    // 創(chuàng)建一個(gè)字符串類型的無緩沖 channel
    messageChannel := make(chan string)

    go func() {
        // 向 channel 發(fā)送數(shù)據(jù)
        messageChannel <- "Hello from goroutine!"
        fmt.Println("Sender: Message sent.")
    }()

    go func() {
        // 從 channel 接收數(shù)據(jù)
        time.Sleep(1 * time.Second) // 模擬耗時(shí)操作,確保接收者后準(zhǔn)備好
        receivedMessage := <-messageChannel
        fmt.Println("Receiver: Received message:", receivedMessage)
    }()

    // 等待 goroutine 執(zhí)行完畢
    time.Sleep(2 * time.Second)
    fmt.Println("Main: Finished.")
}

在這個(gè)例子中,make(chan string) 創(chuàng)建了一個(gè)可以傳遞 string 類型數(shù)據(jù)的 chan。messageChannel <- "Hello" 是發(fā)送操作,它會(huì)將字符串發(fā)送到 chan 中。receivedMessage := <-messageChannel 是接收操作,它會(huì)從 chan 中讀取數(shù)據(jù)。對(duì)于無緩沖的 chan,發(fā)送操作會(huì)阻塞,直到另一個(gè) goroutine 對(duì)同一個(gè) chan 執(zhí)行接收操作;反之亦然,接收操作也會(huì)阻塞,直到有數(shù)據(jù)被發(fā)送。

這些簡(jiǎn)潔的 chan 操作符實(shí)際上是 Go 語(yǔ)言提供的 語(yǔ)法糖 (syntactic sugar) 。在底層,它們會(huì)轉(zhuǎn)換為運(yùn)行時(shí)的內(nèi)部函數(shù)調(diào)用。

  • 向 chan 發(fā)送數(shù)據(jù) ch <- v 大致對(duì)應(yīng)于運(yùn)行時(shí)函數(shù) runtime.chansend1(ch, v)(具體函數(shù)可能因版本和場(chǎng)景略有不同,如 chansend)。
  • 從 chan 接收數(shù)據(jù) v := <-ch 或 v, ok := <-ch 大致對(duì)應(yīng)于運(yùn)行時(shí)函數(shù) runtime.chanrecv1(ch, &v) 或 runtime.chanrecv2(ch, &v)(返回第二個(gè) bool 值表示 chan 是否關(guān)閉且已空)。
  • for v := range ch 循環(huán),在底層會(huì)持續(xù)嘗試從 chan 接收數(shù)據(jù),直到 chan 被關(guān)閉并且緩沖區(qū)為空。

要理解 chan 的行為,了解其內(nèi)部數(shù)據(jù)結(jié)構(gòu)至關(guān)重要。在 Go 的運(yùn)行時(shí)中,chan 的內(nèi)部表示是 runtime.hchan 結(jié)構(gòu)體(位于 src/runtime/chan.go)。其核心字段包括:

// src/runtime/chan.go
type hchan struct {
    qcount   uint            // 當(dāng)前隊(duì)列中剩余元素個(gè)數(shù) (current number of elements in the queue)
    dataqsiz uint            // 環(huán)形隊(duì)列的大小,即緩沖區(qū)大小 (size of the circular queue, i.e., buffer size)
    buf      unsafe.Pointer  // 指向環(huán)形隊(duì)列的指針 (pointer to the circular queue buffer)
    elemsize uint16          // channel 中元素的大小 (size of an element in the channel)
    closed   uint32          // 標(biāo)記 channel 是否關(guān)閉 (marks if the channel is closed)
    timer    *timer          // 可能與內(nèi)部調(diào)試或計(jì)時(shí)器相關(guān)的 select 優(yōu)化有關(guān)
    elemtype *_type          // channel 中元素的類型 (type of an element in the channel)
    sendx    uint            // 發(fā)送操作處理到的位置 (index for send operations)
    recvx    uint            // 接收操作處理到的位置 (index for receive operations)
    recvq    waitq           // 等待接收的 goroutine 隊(duì)列 (list of goroutines waiting to receive)
    sendq    waitq           // 等待發(fā)送的 goroutine 隊(duì)列 (list of goroutines waiting to send)
    bubble   *synctestBubble // 此字段通常僅在開啟了競(jìng)爭(zhēng)檢測(cè) (`-race`) 或特定的同步測(cè)試構(gòu)建 (`synctest`) 中出現(xiàn)。
                                // 用于輔助競(jìng)爭(zhēng)檢測(cè)器跟蹤 channel 操作的同步事件,幫助發(fā)現(xiàn)潛在的 data race。
                                // 對(duì)于常規(guī)的 channel 理解和使用,可以不必關(guān)注此字段。

    lock     mutex           // 保護(hù) hchan 中所有字段的鎖 (lock protecting all fields in hchan)
}

type waitq struct {  // 是一個(gè)雙向鏈表
    first *sudog
    last  *sudog
}
  • qcount:表示當(dāng)前 chan 緩沖區(qū)中實(shí)際存儲(chǔ)的元素?cái)?shù)量。
  • dataqsiz:表示 chan 的緩沖區(qū)大小。如果為 0,則該 chan 是無緩沖的。
  • buf:一個(gè)指針,指向底層存儲(chǔ)元素的環(huán)形緩沖區(qū)。只有在 dataqsiz > 0 時(shí)(即有緩沖 chan),這個(gè)字段才有意義。
  • closed:一個(gè)標(biāo)志位,表示 chan 是否已經(jīng)被關(guān)閉。
  • sendq 和 recvq:分別是等待發(fā)送數(shù)據(jù)的 goroutine 隊(duì)列和等待接收數(shù)據(jù)的 goroutine 隊(duì)列。它們是 sudog 結(jié)構(gòu)體(代表一個(gè)阻塞的 goroutine)組成的鏈表。
  • lock:一個(gè)互斥鎖,用于保護(hù) hchan 結(jié)構(gòu)體內(nèi)部字段的并發(fā)訪問,確保 chan 操作的原子性。

當(dāng)創(chuàng)建一個(gè) chan 時(shí),make(chan T, N),如果 N 為 0 或省略,則創(chuàng)建的是無緩沖 chan (dataqsiz 為 0,buf 為 nil)。如果 N 大于 0,則創(chuàng)建的是有緩沖 chan (dataqsiz 為 N,并分配相應(yīng)大小的 buf)。

chan 的并發(fā)控制

chan 的并發(fā)控制能力是其設(shè)計(jì)的核心,它緊密地與 Go 的 goroutine 調(diào)度器協(xié)同工作,以實(shí)現(xiàn)高效的同步和通信。

當(dāng)一個(gè) goroutine 嘗試對(duì) chan 進(jìn)行操作(發(fā)送或接收)時(shí),會(huì)首先獲取 hchan 結(jié)構(gòu)體中的 lock 互斥鎖,以保證操作的原子性和數(shù)據(jù)一致性。

發(fā)送操作 (ch <- v) 的邏輯

  1. 嘗試直接喚醒接收者 :如果 recvq (等待接收的 goroutine 隊(duì)列) 不為空,說明有 goroutine 因?yàn)閲L試從該 chan 接收數(shù)據(jù)而被阻塞。這時(shí),發(fā)送操作會(huì)直接將數(shù)據(jù)從發(fā)送方 goroutine 的棧(或堆,取決于數(shù)據(jù))復(fù)制到該等待的接收方 goroutine 的指定內(nèi)存位置,然后喚醒這個(gè)接收方 goroutine (將其標(biāo)記為可運(yùn)行狀態(tài),等待調(diào)度器調(diào)度執(zhí)行)。這對(duì)于無緩沖 chan 和緩沖 chan 空閑時(shí)是常見路徑。發(fā)送方 goroutine 通??梢岳^續(xù)執(zhí)行。
  2. 嘗試放入緩沖區(qū) :如果 recvq 為空,但 chan 有緩沖區(qū) (dataqsiz > 0) 且緩沖區(qū)未滿 (qcount < dataqsiz),發(fā)送操作會(huì)將數(shù)據(jù)從發(fā)送方復(fù)制到 buf 環(huán)形緩沖區(qū)中的下一個(gè)可用槽位,并增加 qcount。發(fā)送方 goroutine 繼續(xù)執(zhí)行。
  3. 阻塞發(fā)送者 :如果 recvq 為空,并且 chan 是無緩沖的 (dataqsiz == 0),或者 chan 是有緩沖的但緩沖區(qū)已滿 (qcount == dataqsiz),那么發(fā)送操作無法立即完成。此時(shí),發(fā)送方 goroutine 會(huì)被封裝成一個(gè) sudog 結(jié)構(gòu),包含要發(fā)送的數(shù)據(jù)的指針,并加入到 hchan 的 sendq (等待發(fā)送的 goroutine 隊(duì)列) 中。隨后,該發(fā)送方 goroutine 會(huì)調(diào)用 gopark 函數(shù),釋放 P (處理器),進(jìn)入 阻塞 (waiting) 狀態(tài),等待被接收方喚醒。

接收操作 (v := <-ch 或 v, ok := <-ch) 的邏輯

嘗試直接從發(fā)送者獲取或喚醒發(fā)送者 :如果 sendq (等待發(fā)送的 goroutine 隊(duì)列) 不為空,說明有 goroutine 因?yàn)閲L試向該 chan 發(fā)送數(shù)據(jù)而被阻塞。

  • 對(duì)于無緩沖 chan :接收操作會(huì)直接從 sendq 中隊(duì)首的 sudog (阻塞的發(fā)送者) 獲取數(shù)據(jù),將其復(fù)制到接收方 goroutine 的指定內(nèi)存位置,然后喚醒這個(gè)發(fā)送方 goroutine。接收方 goroutine 繼續(xù)執(zhí)行。
  • 對(duì)于有緩沖 chan (但緩沖區(qū)此時(shí)為空) :如果 sendq(等待發(fā)送的 goroutine 隊(duì)列)不為空,這表明此前因?yàn)榫彌_區(qū)已滿而有發(fā)送者 goroutine (GS) 被阻塞。現(xiàn)在一個(gè)接收者 goroutine (GR) 來了,并且緩沖區(qū)是空的 (qcount == 0)。此時(shí),接收操作會(huì)從 sendq 中取出第一個(gè)等待的發(fā)送者 GS,將其數(shù)據(jù)直接復(fù)制給當(dāng)前接收者 GR(或者復(fù)制到 GR 預(yù)期的內(nèi)存位置)。然后,發(fā)送者 GS 會(huì)被喚醒并可以繼續(xù)執(zhí)行。這個(gè)過程可以看作是一次“直接的數(shù)據(jù)交接”,盡管它是在緩沖 chan 的上下文中發(fā)生的。緩沖區(qū) hchan.buf 在此特定交互中可能不直接存儲(chǔ)這個(gè)傳遞中的數(shù)據(jù),或者數(shù)據(jù)只是邏輯上“通過”了一個(gè)緩沖區(qū)槽位以保持 sendx 和 recvx 索引的一致性。關(guān)鍵在于,一個(gè)等待的發(fā)送者被匹配并喚醒,其數(shù)據(jù)被成功傳遞。

嘗試從緩沖區(qū)獲取 :如果 sendq 為空,但 chan 有緩沖區(qū) (dataqsiz > 0) 且緩沖區(qū)不為空 (qcount > 0),接收操作會(huì)從 buf 環(huán)形緩沖區(qū)中取出一個(gè)元素,復(fù)制到接收方 goroutine 的指定內(nèi)存位置,減少 qcount,并相應(yīng)地移動(dòng) recvx 指針。接收方 goroutine 繼續(xù)執(zhí)行。

處理已關(guān)閉的 chan :如果 chan 已經(jīng)被關(guān)閉 (closed > 0) 并且緩沖區(qū)為空 (qcount == 0):

  • v := <-ch 會(huì)立即返回該 chan 元素類型的零值。
  • v, ok := <-ch 會(huì)立即返回元素類型的零值和 false 給 ok。

這使得 for v := range ch 循環(huán)能夠在 chan 關(guān)閉且數(shù)據(jù)取完后優(yōu)雅退出。

阻塞接收者 :如果 sendq 為空,chan 未關(guān)閉,并且 chan 是無緩沖的,或者 chan 是有緩沖的但緩沖區(qū)為空 (qcount == 0),那么接收操作無法立即完成。此時(shí),接收方 goroutine 會(huì)被封裝成一個(gè) sudog 結(jié)構(gòu),并加入到 hchan 的 recvq (等待接收的 goroutine 隊(duì)列) 中。隨后,該接收方 goroutine 調(diào)用 gopark 進(jìn)入阻塞狀態(tài),等待被發(fā)送方喚醒。

喚醒機(jī)制 :goroutine 的阻塞 (gopark) 和喚醒 (goready) 是由 Go 運(yùn)行時(shí)調(diào)度器核心管理的。當(dāng)一個(gè) goroutine 因?yàn)?nbsp;chan 操作需要阻塞時(shí),它會(huì)釋放當(dāng)前占用的 P,其狀態(tài)被標(biāo)記為 _Gwaiting。當(dāng)條件滿足(例如,數(shù)據(jù)被發(fā)送到 chan,或有 goroutine 準(zhǔn)備好從 chan 接收)時(shí),另一個(gè) goroutine (執(zhí)行對(duì)應(yīng) chan 操作的 goroutine) 會(huì)調(diào)用 goready 將阻塞的 goroutine 的狀態(tài)改為 _Grunnable,并將其放入運(yùn)行隊(duì)列,等待調(diào)度器分配 P 來執(zhí)行。

有緩沖 vs 無緩沖舉例

  • 無緩沖 chan (make(chan int))

發(fā)送者 ch <- 1 會(huì)阻塞,直到接收者 <-ch 準(zhǔn)備好。它們必須“握手”。

這常用于強(qiáng)同步,確保消息被處理。

  • 有緩沖 chan (make(chan int, 1))
  • 發(fā)送者 ch <- 1 可以立即完成(只要緩沖區(qū)未滿),不需要等待接收者。
  • 如果緩沖區(qū)滿了,比如 ch <- 1 之后再 ch <- 2 (假設(shè)容量為1),第二個(gè)發(fā)送者會(huì)阻塞。
  • 這允許一定程度的解耦和流量削峰。

chan 通信的本質(zhì) : chan 通信的本質(zhì)仍然是 內(nèi)存復(fù)制 。無論是直接在發(fā)送者和接收者 goroutine 之間傳遞,還是通過緩沖區(qū)中轉(zhuǎn),元素的值都會(huì)從源位置復(fù)制到目標(biāo)位置。對(duì)于指針或包含指針的復(fù)雜類型,復(fù)制的是指針值本身,而不是指針指向的數(shù)據(jù)。這意味著如果傳遞的是一個(gè)大數(shù)據(jù)結(jié)構(gòu)的指針,實(shí)際復(fù)制的開銷很小,但需要注意共享數(shù)據(jù)帶來的并發(fā)問題(盡管 CSP 的理念是避免共享)。

關(guān)閉一個(gè)有數(shù)據(jù)的 chan

當(dāng)一個(gè)有數(shù)據(jù)的 chan 被 close(ch) 時(shí):

  • 后續(xù)的發(fā)送操作 ch <- v 會(huì)引發(fā) panic。
  • 接收操作 <-ch 會(huì)繼續(xù)從緩沖區(qū)讀取剩余的值,直到緩沖區(qū)為空。
  • 當(dāng)緩沖區(qū)為空后,接收操作 v := <-ch 會(huì)立即返回元素類型的零值。
  • 接收操作 v, ok := <-ch 會(huì)返回元素類型的零值和 false。

Go 通過 hchan 的 closed 標(biāo)志和 qcount 來精確控制這些行為,確保 for v := range ch 循環(huán)在 chan 關(guān)閉且緩沖區(qū)耗盡后能夠自動(dòng)、優(yōu)雅地退出,因?yàn)榇藭r(shí) chanrecv 操作會(huì)返回 (zeroValue, false),range 機(jī)制檢測(cè)到 ok 為 false 就會(huì)終止循環(huán)。

原子操作 :hchan 內(nèi)部的關(guān)鍵字段(如 qcount, closed, sendx, recvx 以及對(duì) sendq 和 recvq 鏈表的操作)的訪問和修改,都受到 hchan.lock 這個(gè)互斥鎖的保護(hù)。因此,從外部視角看,對(duì) chan 的發(fā)送、接收和關(guān)閉操作都可以認(rèn)為是 原子性的 (atomic) ,它們要么完整執(zhí)行,要么不執(zhí)行(例如,在嘗試獲取鎖時(shí)被阻塞)。這種原子性是由 Go 運(yùn)行時(shí)的鎖機(jī)制來保證的,而非硬件層面的原子指令直接作用于整個(gè) chan 操作(盡管鎖的實(shí)現(xiàn)本身會(huì)用到硬件原子操作)。

select 語(yǔ)言層面原生的多路復(fù)用

select 語(yǔ)句是 Go 語(yǔ)言中實(shí)現(xiàn)并發(fā)控制的另一個(gè)強(qiáng)大工具,它允許一個(gè) goroutine 同時(shí)等待多個(gè)通信操作。select 會(huì)阻塞,直到其中一個(gè) case(通信操作)可以執(zhí)行,然后執(zhí)行該 case。如果多個(gè) case 同時(shí)就緒,select 會(huì) 偽隨機(jī)地 (pseudo-randomly) 選擇一個(gè)執(zhí)行,以保證公平性,避免某些 chan 總是優(yōu)先得到處理。

基本用法

ch1 := make(chan int)
ch2 := make(chan string)

// ... goroutines to send to ch1 and ch2

select {
case val1 := <-ch1:
    fmt.Printf("Received from ch1: %d\n", val1)
case str2 := <-ch2:
    fmt.Printf("Received from ch2: %s\n", str2)
case ch1 <- 10: // 也可以包含發(fā)送操作
    fmt.Println("Sent 10 to ch1")
default: // 可選的 default case
    fmt.Println("No communication was ready.")
    // default 會(huì)在沒有任何 case 就緒時(shí)立即執(zhí)行,使 select 非阻塞
}

底層實(shí)現(xiàn) :當(dāng) Go 代碼執(zhí)行到一個(gè) select 語(yǔ)句時(shí),編譯器和運(yùn)行時(shí)會(huì)協(xié)同工作。

  1. 收集 case :編譯器會(huì)生成代碼,將 select 語(yǔ)句中的所有 case(每個(gè) case 對(duì)應(yīng)一個(gè) chan 的發(fā)送或接收操作)收集起來,形成一個(gè) scase (select case) 結(jié)構(gòu)數(shù)組。每個(gè) scase 包含了操作的類型(發(fā)送/接收)、目標(biāo) chan 以及用于接收/發(fā)送數(shù)據(jù)的內(nèi)存地址。
  2. 亂序處理 :為了保證公平性,運(yùn)行時(shí)會(huì)先對(duì)這些 scase 進(jìn)行一個(gè)隨機(jī)的排序(通過 select_order 數(shù)組)。
  3. 輪詢檢查 :按照亂序后的順序,運(yùn)行時(shí)會(huì)遍歷所有的 case,檢查對(duì)應(yīng)的 chan 是否已經(jīng)就緒(即是否可以立即執(zhí)行發(fā)送或接收操作而不會(huì)阻塞)。
  • 發(fā)送操作 :檢查 chan 是否有等待的接收者,或者其緩沖區(qū)是否有空間。
  • 接收操作 :檢查 chan 是否有等待的發(fā)送者,或者其緩沖區(qū)是否有數(shù)據(jù),或者 chan 是否已關(guān)閉。
  1. 立即執(zhí)行 :如果在此輪詢過程中發(fā)現(xiàn)有任何一個(gè) case 可以立即執(zhí)行,運(yùn)行時(shí)會(huì)選擇第一個(gè)(按照亂序后的順序)就緒的 case,執(zhí)行相應(yīng)的 chan 操作(發(fā)送或接收數(shù)據(jù)),然后跳轉(zhuǎn)到該 case 對(duì)應(yīng)的代碼塊執(zhí)行。select 語(yǔ)句結(jié)束。
  2. default 處理 :如果在輪詢所有 case 后沒有發(fā)現(xiàn)任何一個(gè)可以立即執(zhí)行,并且 select 語(yǔ)句包含 default 子句,那么 default 子句的代碼塊會(huì)被執(zhí)行。select 語(yǔ)句結(jié)束。default 使得 select 可以成為一種非阻塞的檢查機(jī)制。
  3. 阻塞與喚醒 :如果輪詢后沒有 case 就緒,且沒有 default 子句,那么當(dāng)前 goroutine 就需要阻塞。
  • 對(duì)于每一個(gè) case 中的 chan,運(yùn)行時(shí)會(huì)將當(dāng)前 goroutine(表示為一個(gè) sudog)加入到該 chan 的 sendq 或 recvq 等待隊(duì)列中,并記錄下是哪個(gè) case 把它加入的。
  • 然后,當(dāng)前 goroutine 調(diào)用 gopark 進(jìn)入阻塞狀態(tài),等待被喚醒。
  • 當(dāng)任何一個(gè)被 select 監(jiān)聽的 chan 發(fā)生狀態(tài)變化(例如,有數(shù)據(jù)發(fā)送進(jìn)來,或有 goroutine 嘗試接收,或 chan 被關(guān)閉),并且這個(gè)變化使得某個(gè) case 的條件滿足時(shí),操作該 chan 的 goroutine 會(huì)負(fù)責(zé)喚醒因 select 而阻塞的 goroutine。
  • 被喚醒的 goroutine 會(huì)再次檢查哪個(gè) case 導(dǎo)致了喚醒(通過 sudog 中記錄的 hchan 信息),然后執(zhí)行該 case。在執(zhí)行選中的 case 之前,一個(gè)關(guān)鍵步驟是 將該 goroutine 的 sudog 從所有其他未被選中的 case 所對(duì)應(yīng)的 chan 的等待隊(duì)列 (sendq 或 recvq) 中移除 。

但是,移除操作時(shí)間復(fù)雜度是怎樣的?

實(shí)際上,hchan 中的 sendq 和 recvq (即 waitq 結(jié)構(gòu)) 都是 雙向鏈表 (doubly linked lists) 。sudog 結(jié)構(gòu)體自身包含了指向其在鏈表中前一個(gè)和后一個(gè) sudog 的指針 (prev 和 next)。當(dāng) select 語(yǔ)句決定喚醒一個(gè) goroutine 時(shí),它已經(jīng)擁有了指向該 goroutine 的 sudog 的指針。對(duì)于那些未被選中的 case,select 機(jī)制會(huì)遍歷這些 case,并針對(duì)每個(gè) case 對(duì)應(yīng)的 chan,利用已知的 sudog 指針以及其 prev 和 next 指針,在 O(1) 時(shí)間復(fù)雜度內(nèi)將其從該 chan 的等待隊(duì)列中移除(unlinking 操作)。因此,整個(gè)清理過程的復(fù)雜度與 select 語(yǔ)句中 case 的數(shù)量成正比(即 O(N_cases),其中 N_cases 是 select 中的 case 數(shù)量),而不是與等待隊(duì)列的實(shí)際長(zhǎng)度成正比,這保證了 select 機(jī)制在處理多個(gè) case 時(shí)的效率。

核心算法流程 :select 的核心可以概括為 runtime.selectgo 函數(shù)(位于 src/runtime/select.go)。這個(gè)函數(shù)實(shí)現(xiàn)了上述的收集、亂序、輪詢、阻塞和喚醒邏輯。

它首先嘗試一個(gè)“非阻塞”的輪詢,看是否有 case 能夠立即成功。如果找不到,并且沒有 default,它會(huì)將當(dāng)前 goroutine 注冊(cè)到所有相關(guān) chan 的等待隊(duì)列中,然后 gopark。當(dāng)其他 goroutine 對(duì)這些 chan 操作并喚醒當(dāng)前 goroutine 時(shí),selectgo 會(huì)被重新調(diào)度執(zhí)行,確定哪個(gè) case 被觸發(fā),完成數(shù)據(jù)交換,并從其他 chan 的等待隊(duì)列中清理當(dāng)前 goroutine。

公平性 :select 的公平性主要通過兩方面保證:

  • 隨機(jī)輪詢順序 :在檢查哪些 case 可以執(zhí)行時(shí),select 并不是固定地從第一個(gè) case 檢查到最后一個(gè),而是引入了一個(gè)隨機(jī)化的順序。這意味著如果同時(shí)有多個(gè) case 就緒,它們被選中的概率是均等的,避免了排在前面的 case 總是優(yōu)先響應(yīng)。
  • 喚醒機(jī)制 :當(dāng)一個(gè) goroutine 因 select 阻塞后,任何一個(gè)使其 case 成立的 chan 操作都可以將其喚醒。

這種設(shè)計(jì)使得 select 在處理多個(gè)并發(fā)事件源時(shí),能夠公平地響應(yīng),而不會(huì)因?yàn)?nbsp;case 的書寫順序?qū)е履承┦录火I死。

select 中多個(gè) chan 與死鎖

select 語(yǔ)句本身是一種避免在多個(gè)通道操作中選擇時(shí)發(fā)生死鎖的機(jī)制。它會(huì)選擇一個(gè) 可以立即執(zhí)行 的 case(發(fā)送或接收),如果多個(gè) case 同時(shí)就緒,它會(huì)偽隨機(jī)選擇一個(gè)。如果沒有 case 就緒且沒有 default 子句,則執(zhí)行 select 的 goroutine 會(huì)阻塞,直到至少一個(gè) case 變得可以執(zhí)行。

然而,雖然 select 本身旨在處理多路通道的就緒選擇,但它并不能完全阻止整個(gè)程序級(jí)別的死鎖。死鎖的發(fā)生通常是由于程序中 goroutine 之間形成了循環(huán)等待依賴關(guān)系,而 select 語(yǔ)句可能成為這種循環(huán)依賴的一部分:

所有通信方均阻塞

如果一個(gè) select 語(yǔ)句等待的多個(gè) chan,其對(duì)應(yīng)的發(fā)送方或接收方 goroutine 也都因?yàn)槠渌虮蛔枞?,并且無法再對(duì)這些 chan 進(jìn)行操作,那么這個(gè) select 語(yǔ)句可能會(huì)永久阻塞。如果這種情況導(dǎo)致程序中所有 goroutine 都無法繼續(xù)執(zhí)行,Go 運(yùn)行時(shí)會(huì)檢測(cè)到這種全局死鎖,并通常會(huì) panic,打印出 "fatal error: all goroutines are asleep - deadlock!"。

循環(huán)依賴

假設(shè)有兩個(gè) goroutine,G1 和 G2,以及兩個(gè) chan,chA 和 chB。

  • G1 執(zhí)行 select,其中一個(gè) case 是從 chA 接收,另一個(gè) case 是向 chB 發(fā)送。
  • G2 執(zhí)行 select,其中一個(gè) case 是從 chB 接收,另一個(gè) case 是向 chA 發(fā)送。

如果 G1 選擇了等待從 chA 接收,它就需要 G2 向 chA 發(fā)送。同時(shí),如果 G2 選擇了等待從 chB 接收,它就需要 G1 向 chB 發(fā)送。如果它們都做出了這樣的選擇(或者沒有其他路徑可以走),并且沒有其他 goroutine 來打破這個(gè)僵局,那么 G1 和 G2 就會(huì)相互等待,形成死鎖。

基于 hchan.lock 地址排序加鎖

這個(gè)策略用在 runtime.selectgo 函數(shù)(位于 src/runtime/select.go)中。

背景與問題 :select 語(yǔ)句可能涉及多個(gè) chan。每個(gè) hchan 結(jié)構(gòu)體內(nèi)部都有一個(gè)互斥鎖 lock,用于保護(hù)其內(nèi)部狀態(tài)(如緩沖區(qū)、等待隊(duì)列 sendq 和 recvq 等)的并發(fā)訪問。

當(dāng)一個(gè) goroutine 執(zhí)行 select 語(yǔ)句并且沒有 case能立即執(zhí)行(也沒有 default),它需要將自己(表示為一個(gè) sudog 結(jié)構(gòu))掛載到所有相關(guān) case 對(duì)應(yīng)的 chan 的等待隊(duì)列上。這個(gè)掛載操作以及后續(xù)可能的摘除操作,都需要獲取相應(yīng) hchan 的 lock。

如果 selectgo 在嘗試獲取多個(gè) hchan 的鎖時(shí),沒有一個(gè)固定的、全局一致的順序,就可能發(fā)生死鎖。例如:

  • goroutine 1 的 select 涉及 chanA 和 chanB,它嘗試先鎖 chanA 再鎖 chanB。
  • goroutine 2 的 select(或?qū)@些 chan 的其他并發(fā)操作)也涉及 chanA 和 chanB,但它嘗試先鎖 chanB 再鎖 chanA。

如果 G1 成功鎖定了 chanA 并等待 chanB,同時(shí) G2 成功鎖定了 chanB 并等待 chanA,那么 G1 和 G2 之間就會(huì)因?yàn)闋?zhēng)奪這些 hchan.lock 而發(fā)生死鎖。這與經(jīng)典的哲學(xué)家就餐問題中的死鎖場(chǎng)景類似。

解決方案:按鎖地址排序。 為了防止這種因獲取 hchan.lock 順序不一致而導(dǎo)致的死鎖,selectgo 函數(shù)在需要同時(shí)操作多個(gè) hchan(比如,將 goroutine 注冊(cè)到它們的等待隊(duì)列,或者從等待隊(duì)列中移除)時(shí),會(huì)執(zhí)行以下步驟:

  1. 收集 hchan :首先,它會(huì)收集 select 語(yǔ)句中所有 case 涉及的 hchan 指針。
  2. 排序 hchan :然后,它會(huì)根據(jù)這些 hchan 結(jié)構(gòu)體的 內(nèi)存地址 對(duì)它們進(jìn)行排序。通常是按地址從小到大的順序。由于每個(gè) hchan 內(nèi)部的 lock 字段是其一部分,按 hchan 地址排序等效于按 hchan.lock 的地址排序(只要 lock 字段在 hchan 結(jié)構(gòu)中的偏移是固定的)。
  3. 順序加鎖 :selectgo 會(huì)嚴(yán)格按照這個(gè)排好序的順序來依次獲取每個(gè) hchan 的 lock。
  4. 執(zhí)行操作 :在所有需要的鎖都成功獲取后,再執(zhí)行相應(yīng)的操作(如修改等待隊(duì)列)。
  5. 順序解鎖 :操作完成后,通常以與加鎖相反的順序釋放這些鎖。

通過確保所有需要同時(shí)鎖定多個(gè) hchan 的代碼路徑(主要是 selectgo)都遵循相同的“按地址排序后加鎖”的規(guī)則,Go 運(yùn)行時(shí)避免了在 hchan 鎖這個(gè)層級(jí)上發(fā)生死鎖。這是一種經(jīng)典的資源分級(jí)(resource hierarchy)或鎖排序(lock ordering)死鎖預(yù)防技術(shù)。

這個(gè)機(jī)制確保了 select 在管理其與多個(gè)通道的復(fù)雜交互時(shí),不會(huì)因?yàn)閮?nèi)部鎖的爭(zhēng)奪順序問題而陷入困境。

類型系統(tǒng)做到“讀寫分離”

Go 語(yǔ)言的類型系統(tǒng)為 chan 提供了一種優(yōu)雅的方式來實(shí)現(xiàn)“讀寫分離”,即限制對(duì) chan 的操作權(quán)限。這是通過 單向 chan (unidirectional channels) 實(shí)現(xiàn)的。

一個(gè)普通的 chan T 是雙向的,既可以發(fā)送數(shù)據(jù),也可以接收數(shù)據(jù)。但我們可以將其轉(zhuǎn)換為單向 chan:

  • chan<- T (send-only channel) :表示一個(gè)只能發(fā)送 T 類型數(shù)據(jù)的 chan。你不能從一個(gè) chan<- T 類型的 chan 中接收數(shù)據(jù)。
  • <-chan T (receive-only channel) :表示一個(gè)只能接收 T 類型數(shù)據(jù)的 chan。你不能向一個(gè) <-chan T 類型的 chan 發(fā)送數(shù)據(jù)。

本質(zhì)與實(shí)現(xiàn)

單向 chan 并不是一種全新的 chan 類型。它們本質(zhì)上是對(duì)同一個(gè)底層雙向 chan 的不同“視圖”或“接口”。當(dāng)你將一個(gè) chan T 賦值給一個(gè) chan<- T 或 <-chan T 類型的變量時(shí),并沒有創(chuàng)建新的 chan 結(jié)構(gòu),只是限制了通過該變量可以對(duì) chan 進(jìn)行的操作。

這種限制是在 編譯期 (compile-time) 由 Go 的類型檢查器強(qiáng)制執(zhí)行的。如果你嘗試對(duì)一個(gè) chan<- T 進(jìn)行接收操作,或者對(duì)一個(gè) <-chan T 進(jìn)行發(fā)送操作,編譯器會(huì)報(bào)錯(cuò)。

例如:

package main

import "fmt"

// sender 函數(shù)接受一個(gè)只能發(fā)送的 chan
func sender(ch chan<- string, message string) {
    ch <- message
    // msg := <-ch // 編譯錯(cuò)誤: invalid operation: cannot receive from send-only channel ch (variable of type chan<- string)
}

// receiver 函數(shù)接受一個(gè)只能接收的 chan
func receiver(ch <-chan string) {
    msg := <-ch
    fmt.Println("Received:", msg)
    // ch <- "pong" // 編譯錯(cuò)誤: invalid operation: cannot send to receive-only channel ch (variable of type <-chan string)
}

func main() {
    myChannel := make(chan string, 1)

    // 傳遞給 sender 時(shí),myChannel 被隱式轉(zhuǎn)換為 chan<- string
    go sender(myChannel, "ping")

    // 傳遞給 receiver 時(shí),myChannel 被隱式轉(zhuǎn)換為 <-chan string
    receiver(myChannel)

    // 也可以顯式轉(zhuǎn)換
    var sendOnlyChan chan<- string = myChannel
    var recvOnlyChan <-chan string = myChannel

    sendOnlyChan <- "hello again"
    fmt.Println(<-recvOnlyChan)
}

技巧與注意事項(xiàng)

  1. API 設(shè)計(jì) :在設(shè)計(jì)函數(shù)或方法時(shí),如果一個(gè) chan 參數(shù)僅用于發(fā)送數(shù)據(jù),應(yīng)將其類型聲明為 chan<- T;如果僅用于接收數(shù)據(jù),則聲明為 <-chan T。這使得函數(shù)的意圖更加清晰,并能在編譯期防止誤用。這是 Go 語(yǔ)言中一種重要的封裝和抽象手段。
  2. 所有權(quán) :通常,創(chuàng)建 chan 的 goroutine 擁有其“寫”端,并將“讀”端(或雙向 chan)傳遞給其他 goroutine?;蛘?,一個(gè)生產(chǎn)者 goroutine 創(chuàng)建 chan,并將其作為 <-chan T 返回給消費(fèi)者,這樣生產(chǎn)者負(fù)責(zé)寫入和關(guān)閉,消費(fèi)者只負(fù)責(zé)讀取。
  3. 關(guān)閉 chan :一個(gè)重要的規(guī)則是:只應(yīng)該由發(fā)送者關(guān)閉 chan,而不應(yīng)該由接收者關(guān)閉 。因?yàn)榻邮照邿o法知道是否還有其他發(fā)送者會(huì)向該 chan 發(fā)送數(shù)據(jù)。如果一個(gè) chan 被關(guān)閉,而發(fā)送者仍然嘗試向其發(fā)送數(shù)據(jù),會(huì)導(dǎo)致 panic。將 chan 的寫端權(quán)限(chan T 或 chan<- T)限定在負(fù)責(zé)發(fā)送和關(guān)閉的 goroutine 中,有助于遵守這一規(guī)則。
  4. 類型轉(zhuǎn)換 :一個(gè)雙向 chan T 可以被隱式或顯式地轉(zhuǎn)換為 chan<- T 或 <-chan T。但是,單向 chan 不能被轉(zhuǎn)換回雙向 chan,也不能在不同方向的單向 chan 之間直接轉(zhuǎn)換(例如,chan<- T 不能直接轉(zhuǎn)為 <-chan T)。

通過這種方式,Go 的類型系統(tǒng)在編譯階段就幫助開發(fā)者構(gòu)建更安全、更易于理解的并發(fā)程序,有效地體現(xiàn)了最小權(quán)限原則。

常見并發(fā)模式參考

利用 chan 和 select,Go 語(yǔ)言可以優(yōu)雅地實(shí)現(xiàn)許多經(jīng)典的并發(fā)模式。

首先,關(guān)于 for v := range ch 循環(huán),它確實(shí)是處理 chan 接收的一種便捷的語(yǔ)法糖。其本質(zhì)等價(jià)于:

for {
    v, ok := <-ch
    if !ok { // 如果 chan 被關(guān)閉且已空, ok 會(huì)是 false
        break // 退出循環(huán)
    }
    // ... 使用 v ...
}

range 循環(huán)會(huì)自動(dòng)處理檢查 ok 狀態(tài)的邏輯,使得代碼更簡(jiǎn)潔。

接下來介紹一些常見的基于 chan 和 select 的并發(fā)模式:

1. 扇入 (Fan-in)

扇入模式是將多個(gè)輸入 chan 合并到一個(gè)輸出 chan 中。這常用于將多個(gè)生產(chǎn)者產(chǎn)生的數(shù)據(jù)匯總給一個(gè)消費(fèi)者。

package main

import (
    "fmt"
    "sync"
    "time"
)

func produce(id int, ch chan<- string) {
    for i := 0; i < 3; i++ {
        msg := fmt.Sprintf("Producer %d: Message %d", id, i)
        ch <- msg
        time.Sleep(time.Millisecond * time.Duration(id*100)) // 模擬不同生產(chǎn)速度
    }
}

func fanIn(inputs ...<-chan string) <-chan string {
    out := make(chan string)
    var wg sync.WaitGroup

    for _, inputChan := range inputs {
        wg.Add(1)
        go func(ch <-chan string) {
            defer wg.Done()
            for val := range ch {
                out <- val
            }
        }(inputChan)
    }

    go func() {
        wg.Wait() // 等待所有輸入 goroutine 完成
        close(out)  // 然后關(guān)閉輸出 channel
    }()

    return out
}

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    ch3 := make(chan string)

    go produce(1, ch1)
    go produce(2, ch2)
    go produce(3, ch3)

    // 啟動(dòng)后立即關(guān)閉,因?yàn)?produce 函數(shù)內(nèi)部會(huì)發(fā)送數(shù)據(jù)然后 producer goroutine 結(jié)束
    // fanIn 需要知道何時(shí)停止,這里通過關(guān)閉輸入 ch 實(shí)現(xiàn)
    // 實(shí)際應(yīng)用中,關(guān)閉時(shí)機(jī)需要仔細(xì)設(shè)計(jì)
    go func() { time.Sleep(1 * time.Second); close(ch1) }()
    go func() { time.Sleep(1 * time.Second); close(ch2) }()
    go func() { time.Sleep(1 * time.Second); close(ch3) }()


    mergedOutput := fanIn(ch1, ch2, ch3)

    for msg := range mergedOutput {
        fmt.Println("Main received:", msg)
    }
    fmt.Println("All messages processed.")
}
Main received: Producer 3: Message 0
Main received: Producer 1: Message 0
Main received: Producer 2: Message 0
Main received: Producer 1: Message 1
Main received: Producer 2: Message 1
Main received: Producer 1: Message 2
Main received: Producer 3: Message 1
Main received: Producer 2: Message 2
Main received: Producer 3: Message 2
All messages processed.

在 fanIn 函數(shù)中,為每個(gè)輸入 chan 啟動(dòng)一個(gè) goroutine,將接收到的數(shù)據(jù)轉(zhuǎn)發(fā)到統(tǒng)一的 out 通道。使用 sync.WaitGroup 來確保在所有輸入 chan 都被處理完畢(通常是它們的生產(chǎn)者關(guān)閉了它們,導(dǎo)致 range 循環(huán)退出)后,再關(guān)閉 out 通道。

2. 工作池 (Worker Pool)

工作池模式通過啟動(dòng)固定數(shù)量的 goroutine (workers) 來處理來自一個(gè)任務(wù) chan 的任務(wù),并將結(jié)果發(fā)送到一個(gè)結(jié)果 chan。這可以控制并發(fā)數(shù)量,防止資源耗盡。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID    int
    Input int
}

type Result struct {
    TaskID int
    Output int
}

func worker(id int, tasks <-chan Task, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d started\n", id)
    for task := range tasks {
        fmt.Printf("Worker %d processing task %d with input %d\n", id, task.ID, task.Input)
        time.Sleep(time.Millisecond * 100) // 模擬工作
        results <- Result{TaskID: task.ID, Output: task.Input * 2}
    }
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    numTasks := 10
    numWorkers := 3

    tasks := make(chan Task, numTasks)
    results := make(chan Result, numTasks)
    var wg sync.WaitGroup

    // 啟動(dòng) workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, tasks, results, &wg)
    }

    // 分發(fā)任務(wù)
    for i := 1; i <= numTasks; i++ {
        tasks <- Task{ID: i, Input: i}
    }
    close(tasks) // 所有任務(wù)已發(fā)送,關(guān)閉 tasks channel,worker 會(huì)在處理完后退出

    // 等待所有 worker 完成
    // 需要一個(gè) goroutine 來等待 wg.Wait() 然后關(guān)閉 results channel
    // 否則主 goroutine 在收集結(jié)果時(shí)會(huì)死鎖
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集結(jié)果
    for result := range results {
        fmt.Printf("Main: Received result for task %d -> %d\n", result.TaskID, result.Output)
    }

    fmt.Println("All tasks processed.")
}
Worker 3 started
Worker 3 processing task 1 with input 1
Worker 2 started
Worker 2 processing task 2 with input 2
Worker 1 started
Worker 1 processing task 3 with input 3
Worker 2 processing task 5 with input 5
Worker 3 processing task 6 with input 6
Worker 1 processing task 4 with input 4
Main: Received result for task 3 -> 6
Main: Received result for task 2 -> 4
Main: Received result for task 1 -> 2
Worker 2 processing task 8 with input 8
Worker 3 processing task 9 with input 9
Worker 1 processing task 7 with input 7
Main: Received result for task 4 -> 8
Main: Received result for task 5 -> 10
Main: Received result for task 6 -> 12
Worker 3 processing task 10 with input 10
Worker 2 finished
Worker 1 finished
Main: Received result for task 9 -> 18
Main: Received result for task 8 -> 16
Main: Received result for task 7 -> 14
Worker 3 finished
Main: Received result for task 10 -> 20
All tasks processed.

3. 超時(shí)與取消 (Timeout and Cancellation)

select 語(yǔ)句非常適合處理操作超時(shí)。可以使用 time.After 創(chuàng)建一個(gè)在指定時(shí)間后發(fā)送信號(hào)的 chan。

package main

import (
    "fmt"
    "time"
)

func longOperation(done chan<- bool) {
    time.Sleep(3 * time.Second) // 模擬耗時(shí)操作
    done <- true
}

func main() {
    operationDone := make(chan bool)
    go longOperation(operationDone)

    select {
    case <-operationDone:
        fmt.Println("Operation completed successfully!")
    case <-time.After(2 * time.Second): // 設(shè)置2秒超時(shí)
        fmt.Println("Operation timed out!")
    }

    // Cancellation example using a done channel
    // (More complex cancellation often uses context.Context)
    quit := make(chan struct{}) // struct{} 作為信號(hào),不占用額外內(nèi)存

    worker := func(q <-chan struct{}) {
        for {
            select {
            case <-q:
                fmt.Println("Worker: told to quit. Cleaning up.")
                // Do cleanup
                fmt.Println("Worker: finished.")
                return
            default:
                // Do work
                fmt.Println("Worker: working...")
                time.Sleep(500 * time.Millisecond)
            }
        }
    }

    go worker(quit)

    time.Sleep(2 * time.Second)
    fmt.Println("Main: Signaling worker to quit.")
    close(quit) // 關(guān)閉 quit channel 作為取消信號(hào)
    time.Sleep(1 * time.Second) // 給 worker 一點(diǎn)時(shí)間退出
    fmt.Println("Main: Exiting.")
}
Operation timed out!
Worker: working...
Worker: working...
Worker: working...
Worker: working...
Main: Signaling worker to quit.
Worker: told to quit. Cleaning up.
Worker: finished.
Main: Exiting.

對(duì)于更復(fù)雜的取消場(chǎng)景,尤其是涉及多個(gè) goroutine 協(xié)作時(shí),Go 推薦使用 context.Context 包,它提供了更結(jié)構(gòu)化的方式來傳遞取消信號(hào)、截止時(shí)間等。

4. 節(jié)流 (Throttling) 與 背壓 (Backpressure)

節(jié)流 :限制操作的速率??梢允褂?nbsp;time.Ticker 或一個(gè)帶緩沖的 chan 作為令牌桶。

package main

import (
    "fmt"
    "time"
)

func main() {
    requests := make(chan int, 5) // 假設(shè)有5個(gè)請(qǐng)求要處理
    for i := 1; i <= 5; i++ {
        requests <- i
    }
    close(requests)

    limiter := time.NewTicker(500 * time.Millisecond) // 每500ms允許一個(gè)操作
    defer limiter.Stop()

    for req := range requests {
        <-limiter.C // 等待 limiter 發(fā)送信號(hào)
        fmt.Printf("Processing request %d at %v\n", req, time.Now().Format("15:04:05.000"))
    }
    fmt.Println("All requests processed.")
}
Processing request 1 at 22:44:20.729
Processing request 2 at 22:44:21.227
Processing request 3 at 22:44:21.728
Processing request 4 at 22:44:22.227
Processing request 5 at 22:44:22.732
All requests processed.

背壓 :當(dāng)消費(fèi)者處理不過來時(shí),通過阻塞生產(chǎn)者或減少生產(chǎn)速率來反向施加壓力。有緩沖 chan 本身就提供了一種簡(jiǎn)單的背壓機(jī)制:當(dāng)緩沖區(qū)滿時(shí),發(fā)送者會(huì)阻塞。更復(fù)雜的背壓可能需要監(jiān)控隊(duì)列長(zhǎng)度并動(dòng)態(tài)調(diào)整。

5. 令牌桶算法 (Token Bucket)使用一個(gè)帶緩沖的 chan 來實(shí)現(xiàn)令牌桶,控制對(duì)某個(gè)資源的訪問速率。

package main

import (
    "fmt"
    "time"
)

type TokenLimiter struct {
    tokenBucket chan struct{}
}

func NewTokenLimiter(capacity int, fillInterval time.Duration) *TokenLimiter {
    bucket := make(chan struct{}, capacity)
    // Initially fill the bucket
    for i := 0; i < capacity; i++ {
        bucket <- struct{}{}
    }

    limiter := &TokenLimiter{
        tokenBucket: bucket,
    }

    // Goroutine to refill tokens periodically
    go func() {
        ticker := time.NewTicker(fillInterval)
        defer ticker.Stop()
        for range ticker.C {
            select {
            case limiter.tokenBucket <- struct{}{}:
                // Token added
            default:
                // Bucket is full, do nothing
            }
        }
    }()
    return limiter
}

func (tl *TokenLimiter) Allow() bool {
    select {
    case <-tl.tokenBucket:
        return true // Got a token
    default:
        return false // No token available
    }
}

func (tl *TokenLimiter) WaitAndAllow() {
    <-tl.tokenBucket // Wait for a token
}


func main() {
    // Allow 2 operations per second, bucket capacity 5
    limiter := NewTokenLimiter(5, 500*time.Millisecond) // capacity, fill one token every 500ms

    for i := 1; i <= 10; i++ {
        // Non-blocking attempt
        // if limiter.Allow() {
        //  fmt.Printf("Request %d allowed at %s\n", i, time.Now().Format("15:04:05.000"))
        // } else {
        //  fmt.Printf("Request %d denied at %s\n", i, time.Now().Format("15:04:05.000"))
        // }

        // Blocking attempt
        limiter.WaitAndAllow()
        fmt.Printf("Request %d processed at %s\n", i, time.Now().Format("15:04:05.000"))
        // Simulate some work so the timing is observable
        // If no work, all will seem to pass quickly after initial burst
        if i < 5 { // First 5 might go through quickly due to initial capacity
            time.Sleep(100 * time.Millisecond)
        } else {
            time.Sleep(600 * time.Millisecond) // Make it slower than fill rate to see blocking
        }
    }
    fmt.Println("All operations attempted.")
}
// Non-blocking attempt
Request 1 allowed at 22:53:00.261
Request 2 allowed at 22:53:00.265
Request 3 allowed at 22:53:00.265
Request 4 allowed at 22:53:00.265
Request 5 allowed at 22:53:00.265
Request 6 denied at 22:53:00.265
Request 7 denied at 22:53:00.265
Request 8 denied at 22:53:00.265
Request 9 denied at 22:53:00.265
Request 10 denied at 22:53:00.265
All operations attempted.
// Blocking attempt
Request 1 processed at 22:51:00.763
Request 2 processed at 22:51:00.868
Request 3 processed at 22:51:00.968
Request 4 processed at 22:51:01.073
Request 5 processed at 22:51:01.175
Request 6 processed at 22:51:01.775
Request 7 processed at 22:51:02.377
Request 8 processed at 22:51:02.979
Request 9 processed at 22:51:03.583
Request 10 processed at 22:51:04.185
All operations attempted.

這些模式只是冰山一角,Go 的 chan 和 select 提供了構(gòu)建各種復(fù)雜并發(fā)系統(tǒng)的基礎(chǔ)模塊。理解它們的行為和組合方式是掌握 Go 并發(fā)編程的關(guān)鍵。

責(zé)任編輯:武曉燕 來源: Piper蛋窩
相關(guān)推薦

2025-05-26 00:05:00

2025-05-28 03:00:00

2025-06-09 01:15:00

2024-01-22 10:18:32

平臺(tái)工程開發(fā)人員技術(shù)

2024-02-21 12:14:00

Gochannel?panic?

2019-10-08 10:37:46

設(shè)計(jì)技術(shù)程序員

2016-11-29 16:46:17

存儲(chǔ)閃存經(jīng)濟(jì)學(xué)

2015-11-09 10:12:08

大數(shù)據(jù)個(gè)性化推薦

2024-06-17 08:40:16

2025-01-15 09:13:53

2024-01-29 00:35:00

Go并發(fā)開發(fā)

2021-07-28 08:32:58

Go并發(fā)Select

2024-04-07 00:04:00

Go語(yǔ)言Map

2023-01-30 15:41:10

Channel控制并發(fā)

2012-05-09 09:09:58

訪問控制

2024-07-30 12:24:23

2012-06-02 00:55:44

HibernateflushJava

2019-09-26 09:42:44

Go語(yǔ)言JavaPython

2023-10-08 09:34:11

Java編程

2019-08-19 12:50:00

Go垃圾回收前端
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)