探索 Go 并發(fā)編程的背后
在操作系統(tǒng)中,進(jìn)程是資源分配的單位,線程是調(diào)度的單位,Go 在線程的基礎(chǔ)上提供了更細(xì)粒度的調(diào)度單位-協(xié)程。協(xié)程跑在多個(gè)線程中,由 Go 運(yùn)行時(shí)調(diào)度,可以原生利用系統(tǒng)的多核,實(shí)現(xiàn)強(qiáng)大的并發(fā)能力,但是同時(shí)也帶來了復(fù)雜性。并發(fā)編程需要謹(jǐn)慎和更多的思考,一不小心可能就會導(dǎo)致 panic、死鎖或并發(fā)引起的邏輯問題。本文嘗試探索 Go 并發(fā)編程中的一些問題,以及看 Go 是如何解決的。
并發(fā)編程的問題
在并發(fā)編程中,多個(gè)協(xié)程往往需要對共享數(shù)據(jù)進(jìn)行操作,這時(shí)候情況就會變得復(fù)雜,下面看幾個(gè)例子。
原子性
原子性問題是指執(zhí)行一個(gè)操作無法一步完成,而是需要執(zhí)行多個(gè)步驟,但是多個(gè)實(shí)體同時(shí)執(zhí)行時(shí)會存在問題,比如下面的例子。
package main
import (
    "sync"
)
func main() {
    for {
        var count int
        var wg sync.WaitGroup
        wg.Add(2)
        go func() {
            defer wg.Done()
            count++
        }()
        go func() {
            defer wg.Done()
            count++
        }()
        wg.Wait()
        if count != 2 {
            panic("count should be 2")
        }
    }
}上面代碼中,因?yàn)?count++ 不是原子操作,需要分解成多個(gè)步驟,所以 count 可能會出現(xiàn)不等于 2 的情況。在單核中,協(xié)程 1 可能執(zhí)行到獲取 count = 0 時(shí)發(fā)生了調(diào)度,然后協(xié)程 2 執(zhí)行了 count++ 并回寫,最后協(xié)程 1 也執(zhí)行了 count++ 并回寫,最終結(jié)果是 1。在多核中,協(xié)程 1 和 2 同時(shí)獲取了 count 的值是 0,然后加一后寫入,最終結(jié)果是 1。類似的問題還有在 32 位系統(tǒng)寫入 64 位數(shù)據(jù),或者內(nèi)存不對齊時(shí),寫入了一部分?jǐn)?shù)據(jù)被另一個(gè)協(xié)程讀取了,導(dǎo)致讀取了錯(cuò)誤的數(shù)據(jù)或非法的指針地址。
指令重排
編譯器和 CPU 為了性能優(yōu)化可能會對指令進(jìn)行重排,指令重排會保證單個(gè)線程內(nèi)的邏輯符合預(yù)期,在多個(gè)線程邏輯解耦的情況是非常有意義的,但是如果多線程共享了內(nèi)存則可能會出現(xiàn)問題,因?yàn)榇a的執(zhí)行順序和我們寫的代碼不一致,可能會導(dǎo)致邏輯出錯(cuò)。首先看一個(gè) C++ 的例子。
#include <iostream>
#include <thread>
using namespace std;
bool flag = false;
int value = 0;
void func1() {
    value = 11111;
    // asm volatile("" ::: "memory");  // 編譯器內(nèi)存屏障
    flag = true;
}
void func2() {
    if (flag) {
        // asm volatile("" ::: "memory");  
        if (value == 0) {
            cout << "value == 0" << endl;
            exit(0);
        }
    }
}
int main() {
    while(1) {
        flag = false;
        value = 0;
        thread t1 = thread(func1);
        thread t2 = thread (func2);
        t1.join();
        t2.join();
    }
    return 0;
}通過 g++ -O2 main.cpp 編譯上面的代碼然后執(zhí)行,最終會輸入 value = 0。這個(gè)問題看起來非常詭異,和我們的代碼邏輯不太符合預(yù)期??赡艿脑蛴校?/span>
- 編譯器或 CPU 發(fā)生了指令重排,線程 1 執(zhí)行了 flag = true,后執(zhí)行 value = 11111。
 - 內(nèi)存可見性問題,即線程 1 中執(zhí)行變量 1 和 2 的寫操作,但是變量 2 先回刷內(nèi)存,線程 2 會讀取最新的變量 2,但是讀取到舊的變量 1。
 
大多數(shù)弱內(nèi)存順序的 CPU 都會保證線程 2 看到 flag 為 true 時(shí),value = 11111 是成立的,所以不是內(nèi)存可見性問題。接著在代碼中加入 asm volatile("" ::: "memory") 編譯器屏障發(fā)現(xiàn)依然會輸出 value = 0,說明不是因?yàn)榫幾g器指令重排引起的,通過編譯后的代碼也可以看到編譯器的確沒有重排指令。
__Z5func1v:                             ## @_Z5func1v
        .cfi_startproc
## %bb.0:
        pushq        %rbp
        .cfi_def_cfa_offset 16
        .cfi_offset %rbp, -16
        movq        %rsp, %rbp
        .cfi_def_cfa_register %rbp
        movl        $11111, _value(%rip)  ## 先賦值 value,再賦值 flag
        movb        $1, _flag(%rip)
        popq        %rbp
        retq
        .cfi_endproc
                                        ## -- End function
        .globl        __Z5func2v                      ## -- Begin function _Z5func2v
        .p2align        4, 0x90
__Z5func2v:                             ## @_Z5func2v
        .cfi_startproc
## %bb.0:
        pushq        %rbp
        .cfi_def_cfa_offset 16
        .cfi_offset %rbp, -16
        movq        %rsp, %rbp
        .cfi_def_cfa_register %rbp
        cmpb        $0, _flag(%rip)  ## 先判斷 flag
        je        LBB1_2
## %bb.1:
        cmpl        $0, _value(%rip) ## 再判斷 value
        je        LBB1_3所以只可能是 CPU 指令重排,我們再設(shè)置一下 CPU 內(nèi)存屏障禁止指令重排看一下。
#include <iostream>
#include <thread>
using namespace std;
bool flag = false;
int value = 0;
void func1() {
    value = 11111;
    // 禁止寫寫重排
    asm volatile("dmb ishst" ::: "memory");  
    flag = true;
}
void func2() {
    if (flag) {
        // 禁止讀讀重排
        asm volatile("dmb ishl" ::: "memory");  
        if (value == 0) {
            cout << "value == 0" << endl;
            exit(0);
        }
    }
}
int main() {
    while(1) {
        flag = false;
        value = 0;
        thread t1 = thread(func1);
        thread t2 = thread (func2);
        t1.join();
        t2.join();
    }
    return 0;
}這樣就不會出現(xiàn) value = 0 了。
在 Go 中也會存在一樣的問題。
package main
import (
    "sync"
)
func main() {
    for {
        var flag bool
        var count int
        var wg sync.WaitGroup
        wg.Add(2)
        go func() {
            defer wg.Done()
            count = 1
            flag = true
        }()
        go func() {
            defer wg.Done()
            if flag && count != 1 {
                panic("count should be 1")
            }
        }()
        wg.Wait()
    }
}上面的代碼會發(fā)生 panic,這里可以通過設(shè)置內(nèi)存屏障解決這個(gè)問題,但是實(shí)踐中我們一般使用 Go 提供的 API。
package main
/*
void Store() {
     asm volatile("dmb ishst" ::: "memory");
}
void Load() {
     asm volatile("dmb ishld" ::: "memory");
}
*/
import "C"
import (
    "sync"
)
func main() {
    for {
        var flag bool
        var count int
        var wg sync.WaitGroup
        wg.Add(2)
        go func() {
            defer wg.Done()
            count = 1
            C.Store()
            flag = true
        }()
        go func() {
            defer wg.Done()
            if flag {
                C.Load()
                if count != 1 {
                    panic("count should be 1")
                }
            }
        }()
        wg.Wait()
    }
}內(nèi)存可見性
因?yàn)槊總€(gè) CPU 都有自己的 L1/2 緩存,所以 CPU 0 寫入的數(shù)據(jù),CPU 1 不一定讀到最新的。比如下面的例子。
package main
func main() {
    var count int
    // 先執(zhí)行
    go func() {
        count = 1
    }()
    // 后執(zhí)行
    go func() {
        if count != 1 {
            panic("count should be 1")
        }
    }()
}上面的代碼假設(shè)協(xié)程 1 先執(zhí)行完畢后協(xié)程 2 才執(zhí)行,協(xié)程 2 可能會看不到協(xié)程 1 的寫入,也就是 count 不等于 1。
從之前的例子中可以看到,在并發(fā)編程中存在幾個(gè)問題。
- 原子性:一個(gè)操作需要多個(gè)步驟完成,多個(gè)線程同時(shí)執(zhí)行這個(gè)操作,導(dǎo)致數(shù)據(jù)錯(cuò)亂。
 - 指令重排:編譯器和 CPU 為了性能優(yōu)化會對指令進(jìn)行重排,出現(xiàn)代碼的執(zhí)行順序和我們寫的代碼不一致,導(dǎo)致邏輯出錯(cuò)。
 - 內(nèi)存可見性:因?yàn)?CPU 有自己的獨(dú)立的緩存,所以 CPU 可能不會立刻看到另一個(gè) CPU 寫入的值,但是 CPU 會提供對應(yīng)的指令保證讓開發(fā)者可以保證內(nèi)存的可見性。
 
并發(fā)編程的方案
以上的這些問題非常繁瑣且復(fù)雜,不同的編譯器、不同的編譯器版本和不同的 CPU 架構(gòu)處理方式都是不一樣的,但是幸好 Go 通過 sync 包為我們解決了這些問題。
原子操作
package main
import (
    "sync"
    "sync/atomic"
)
func main() {
    for {
        var count atomic.Int32
        var wg sync.WaitGroup
        wg.Add(2)
        go func() {
            defer wg.Done()
            count.Add(1)
        }()
        go func() {
            defer wg.Done()
            count.Add(1)
        }()
        wg.Wait()
        if count.Load() != 2 {
            panic("count should be 2")
        }
    }
}通過 atomic.Int32 我們就可以保證對 count 加一的計(jì)算是原子的,并且每次操作完對另一個(gè)協(xié)程來說都是可見的,從而保證 count 的值最終是 2,接下來看一下 atomic.Int32 Add 的實(shí)現(xiàn)。
TEXT ·Xadd(SB), NOSPLIT, $0-12
    MOVL        ptr+0(FP), BX
    MOVL        delta+4(FP), AX
    MOVL        AX, CX
    LOCK
    XADDL       AX, 0(BX)
    ADDL        CX, AX
    MOVL        AX, ret+8(FP)
    RET從 Xadd 的實(shí)現(xiàn)可以看到最終是通過 LOCK 和 XADDL 指令實(shí)現(xiàn)了原子操作,LOCK 指令不僅保證了 XADDL 指令的執(zhí)行是原子的,同時(shí)實(shí)現(xiàn)了類似內(nèi)存屏障的功能,禁止了指令重排和保證 LOCK 之前的指令讀取的數(shù)據(jù)是最新的以及 LOCK 指令后的寫操作會對其他協(xié)程可見。
互斥鎖
package main
import (
    "sync"
)
func main() {
    for {
        var mutex sync.Mutex
        var flag bool
        var count int
        var wg sync.WaitGroup
        wg.Add(2)
        go func() {
            defer wg.Done()
            mutex.Lock()
            defer mutex.Unlock()
            count = 1
            flag = true
        }()
        go func() {
            defer wg.Done()
            mutex.Lock()
            defer mutex.Unlock()
            if flag && count != 1 {
                panic("count should be 1")
            }
        }()
        wg.Wait()
    }
}通過互斥鎖,我們可以保證最終讀到 flag 為 true 時(shí) count 肯定是 1。那么這個(gè)是如何保證的呢?接下來看一下鎖的實(shí)現(xiàn),我們假設(shè)協(xié)程 1 執(zhí)行完后協(xié)程 2 才執(zhí)行。Lock 的實(shí)現(xiàn)如下。
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
}Lock 是通過 CAS 實(shí)現(xiàn)加鎖的,CAS 的實(shí)現(xiàn)如下。
TEXT ·Cas(SB), NOSPLIT, $0-13
    MOVL        ptr+0(FP), BX
    MOVL        old+4(FP), AX
    MOVL        new+8(FP), CX
    LOCK
    CMPXCHGL        CX, 0(BX)
    SETEQ        ret+12(FP)
    RETCAS 是通過 LOCK 和 CMPXCHGL 實(shí)現(xiàn)的。
- 保證內(nèi)存可見性,也就是說如果其他代碼獲取了鎖,這里可以感知到,否則就大家都拿到了鎖就有問題了。
 - 實(shí)現(xiàn)了原子性保證自己拿到鎖時(shí)別人肯定拿不到鎖,
 - 拿到鎖后也會被其他代碼感知到。
 
成功拿到鎖后設(shè)置了 flag 和 value,然后調(diào) Unlock,Unlock 實(shí)現(xiàn)如下。
func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}AddInt32 底層是 LOCK 和 XADDL 指令實(shí)現(xiàn)的,所以可以保證 Unlock 之前的寫入對其他協(xié)程可見。通過互斥鎖我們就保證了操作的互斥性和內(nèi)存可見性。
WaitGroup
package main
import "sync"
func main() {
    for {
        var count int
        var wg sync.WaitGroup
        wg.Add(1)
        go func() {
            defer wg.Done()
            count = 1
        }()
        wg.Wait()
        if count != 1 {
            panic("count should be 1")
        }
    }
}通過 wg.Done() 可以保證 count = 1 的寫入對另一個(gè)協(xié)程可見。
結(jié)論
在并發(fā)編程中,涉及到指令重排、內(nèi)存可見性和內(nèi)存屏障等一系列復(fù)雜的概念,并且不同的編譯器、版本、CPU 行為都不一致。所以在 Go 并發(fā)編程中,如果需要在協(xié)程間會操作共享數(shù)據(jù)一定要使用 sync 包提供的能力或 channel 來保證操作的原子性和數(shù)據(jù)的可見性,否則可能會出現(xiàn)一些隱藏且晦澀的問題。最后以一個(gè)例子結(jié)尾,下面的例子在 Intel CPU 下是沒問題的,但是在 M4 下就會 panic。
package main
import "sync"
type Client struct {
    Count int
}
type Singleton struct {
    client *Client
    metux  sync.Mutex
}
func (s *Singleton) Get() (client *Client, err error) {
    if s.client != nil {
        return s.client, nil
    }
    s.metux.Lock()
    defer s.metux.Unlock()
    // double check
    if s.client == nil {
        s.client, err = &Client{Count: 1}, nil
        if err != nil {
            return nil, err
        }
    }
    return s.client, nil
}
func main() {
    for {
        singleton := &Singleton{}
        var wg sync.WaitGroup
        wg.Add(2)
        go func() {
            defer wg.Done()
            singleton.Get()
        }()
        go func() {
            defer wg.Done()
            client, _ := singleton.Get()
            if client != nil && client.Count != 1 {
                panic("count should be 1")
            }
        }()
        wg.Wait()
    }
}














 
 
 
















 
 
 
 