講透Go中的并發(fā)接收控制結(jié)構(gòu)Select
select 與 switch
讓我們來復(fù)習(xí)一下switch語句,在switch語句中,會(huì)逐個(gè)匹配case語句(可以是值也可以是表達(dá)式),一個(gè)一個(gè)的判斷過去,直到有符合的語句存在,執(zhí)行匹配的語句內(nèi)容后跳出switch。
- func demo(number int){
- switch{
- case number >= 90:
- fmt.Println("優(yōu)秀")
- default:
- fmt.Println("太搓了")
- }
- }
而 select 用于處理通道,它的語法與 switch 非常類似。每個(gè) case 語句里必須是一個(gè) channel 操作。它既可以用于 channel 的數(shù)據(jù)接收,也可以用于 channel 的數(shù)據(jù)發(fā)送。
- func foo() {
- chanInt := make(chan int)
- defer close(chanInt)
- go func() {
- select {
- case data, ok := <-chanInt:
- if ok {
- fmt.Println(data)
- }
- default:
- fmt.Println("全部阻塞")
- }
- }()
- chanInt <- 1
- }
輸出1
- 這是一個(gè)簡單的接收發(fā)送模型
- 如果 select 的多個(gè)分支都滿足條件,則會(huì)隨機(jī)的選取其中一個(gè)滿足條件的分支。
- 第 6 行加上 ok 是因?yàn)樯弦还?jié)講過,如果不加會(huì)導(dǎo)致通道關(guān)閉時(shí)收到零值
- 回憶之前的知識(shí),讓接收和發(fā)送在不同的goroutine里,否則會(huì)死鎖
這個(gè)程序存在什么問題?
假如發(fā)送太慢,所有case都處于阻塞狀態(tài),會(huì)直接執(zhí)行default的內(nèi)容。這里加一行sleep試試。
- func bar() {
- chanInt := make(chan int)
- defer close(chanInt)
- go func() {
- ....
- }()
- time.Sleep(time.Second)
- chanInt <- 1
- }
- 倒數(shù)第二行加了sleep 1 秒,導(dǎo)致select語句提前結(jié)束
- 猜測一下會(huì)輸出全部阻塞嗎?
- 全部阻塞
- fatal error: all goroutines are asleep - deadlock!
- goroutine 1 [chan send]:
- main.bar()
是會(huì)輸出全部阻塞的。
因?yàn)榻邮請?zhí)行完了,退出了goroutine,而發(fā)送才剛剛執(zhí)行到,沒有與其匹配的接收,故死鎖。
正確的做法是把接收套在循環(huán)里面。
- func baz() {
- chanInt := make(chan int)
- defer close(chanInt)
- go func() {
- for {
- select {
- ...
- }
- }
- }()
- chanInt <- 1
- }
- 不再死鎖了
- 假如程序不停止,會(huì)出現(xiàn)一個(gè)泄露的goroutine,永遠(yuǎn)的在for循環(huán)中無法跳出,此時(shí)引入下一節(jié)的內(nèi)容
通知機(jī)制
Go 語言總是簡單和靈活的,雖然沒有針對提供專門的機(jī)制來處理退出,但我們可以自己組合
- func main() {
- chanInt, done := make(chan int), make(chan struct{})
- defer close(chanInt)
- defer close(done)
- go func() {
- for {
- select {
- case <-chanInt:
- case <-done:
- break
- }
- }
- }()
- done <- struct{}{}
- }
沒有給chanInt發(fā)送任何東西,按理說會(huì)阻塞,導(dǎo)致goroutine泄露
但可以使用額外的通道完成協(xié)程的退出控制
這種方式還可以做到周期性處理任務(wù),下一節(jié)我們再詳細(xì)講解
case 的并發(fā)性
case是有并發(fā)屬性的,比如兩次輸入,分別等待 1、2 秒,再進(jìn)行兩次讀取,會(huì)花 3 秒時(shí)間嗎?
- func main() {
- c1,c2 := make(chan string), make(chan string)
- close(c1)
- close(c2)
- go func() {
- time.Sleep(time.Second * 1)
- c1 <- "one"
- }()
- go func() {
- time.Sleep(time.Second * 2)
- c2 <- "two"
- }()
- start := time.Now() // 獲取當(dāng)前時(shí)間
- for i := 0; i < 2; i++ {
- select {
- case <-c1:
- case <-c2:
- }
- }
- elapsed := time.Since(start)
- // 這里沒有用到3秒,為什么?
- fmt.Println("該函數(shù)執(zhí)行完成耗時(shí):", elapsed)
- }
以上代碼先初始化兩個(gè) channel c1 和 c2,然后開啟兩個(gè) goroutine 分別往 c1 和 c2 寫入數(shù)據(jù),再通過 select 監(jiān)聽兩個(gè) channel,從中讀取數(shù)據(jù)并輸出。
運(yùn)行結(jié)果如下:
- $ go run channel.go
- received one
- received two
- 該函數(shù)執(zhí)行完成耗時(shí):2.004695535s
這充分說明case是并發(fā)的,但要注意此處的并發(fā)是 case 對channel阻塞做出的特殊處理。
case并發(fā)的原理
假如case后左邊和右邊跟了函數(shù),會(huì)執(zhí)行函數(shù),我們來探索一下。
定義A、B函數(shù),作用相同
- func A() int {
- fmt.Println("start A")
- time.Sleep(1 * time.Second)
- fmt.Println("end A")
- return 1
- }
定義函數(shù)lee,請問該函數(shù)執(zhí)行完成耗時(shí)多少呢?
- func lee() {
- ch, done := make(chan int), make(chan struct{})
- defer close(ch)
- go func() {
- select {
- case ch <- A():
- case ch <- B():
- case <-done:
- }
- }()
- done <- struct{}{}
- }
答案是 2 秒
- start A
- end A
- start B
- end B
- main.leespend time: 2.003504395s
- select 掃描是從左到右從上到下的,按這個(gè)順序先求值,如果是函數(shù)會(huì)先執(zhí)行函數(shù)。
- 然后立馬判斷是否可以立即執(zhí)行(這里是指 case 是否會(huì)因?yàn)閳?zhí)行而阻塞)。
- 所以兩個(gè)函數(shù)都會(huì)進(jìn)入,而且是先進(jìn)入 A 再進(jìn)入 B,兩個(gè)函數(shù)都會(huì)執(zhí)行完,所以等待時(shí)間會(huì)累計(jì)。
如果都不會(huì)阻塞,此時(shí)就會(huì)使用一個(gè)偽隨機(jī)的算法,去選中一個(gè) case,只要選中了其他就被放棄了。
超時(shí)控制
我們來模擬一個(gè)更真實(shí)點(diǎn)的例子,讓程序一段時(shí)間超時(shí)退出。
定義一個(gè)結(jié)構(gòu)體
- type Worker struct {
- stream <-chan int //處理
- timeout time.Duration //超時(shí)
- done chan struct{} //結(jié)束信號(hào)
- }
定義初始化函數(shù)
- func NewWorker(stream <-chan int, timeout int) *Worker {
- return &Worker{
- stream: stream,
- timeout: time.Duration(timeout) * time.Second,
- done: make(chan struct{}),
- }
- }
定義超時(shí)處理函數(shù)
- func (w *Worker) afterTimeStop() {
- go func() {
- time.Sleep(w.timeout)
- w.done <- struct{}{}
- }()
- }
- 超過時(shí)間發(fā)送結(jié)束信號(hào)
接收數(shù)據(jù)并處理函數(shù)
- func (w *Worker) Start() {
- w.afterTimeStop()
- for {
- select {
- case data, ok := <-w.stream:
- if !ok {
- return
- }
- fmt.Println(data)
- case <-w.done:
- close(w.done)
- return
- }
- }
- }
- 收到結(jié)束信號(hào)關(guān)閉函數(shù)
- 這樣的方法就可以讓程序在等待 1 秒后繼續(xù)執(zhí)行,而不會(huì)因?yàn)?ch 讀取等待而導(dǎo)致程序停滯。
- func main() {
- stream := make(chan int)
- defer close(stream)
- w := NewWorker(stream, 3)
- w.Start()
- }
實(shí)際 3 秒到程序運(yùn)行結(jié)束。
這種方式巧妙地實(shí)現(xiàn)了超時(shí)處理機(jī)制,這種方法不僅簡單,在實(shí)際項(xiàng)目開發(fā)中也是非常實(shí)用的。
小結(jié)
本節(jié)介紹了select的用法以及包含的陷阱,我們學(xué)會(huì)了
- case是并發(fā)的
- case只針對通道傳輸阻塞做特殊處理,如果有計(jì)算將會(huì)先進(jìn)行計(jì)算
- 掃描是從左到右從上到下的,按這個(gè)順序先求值,如果是函數(shù)會(huì)先執(zhí)行函數(shù)。如果函數(shù)運(yùn)行時(shí)間長,時(shí)間會(huì)累計(jì)
- 在case全部阻塞時(shí),會(huì)執(zhí)行default中的內(nèi)容
- 可使用結(jié)束信號(hào),讓select退出
- 延時(shí)發(fā)送結(jié)束信號(hào)可以實(shí)現(xiàn)超時(shí)自動(dòng)退出的功能
問題:為什么w.stream沒有程序向他發(fā)送數(shù)據(jù),卻沒有死鎖呢?
本節(jié)源碼位置 https://github.com/golang-minibear2333/golang/blob/master/4.concurrent/4.5-select”
本文轉(zhuǎn)載自微信公眾號(hào)「機(jī)智的程序員小熊」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系機(jī)智的程序員小熊公眾號(hào)。