Go:有了 Sync 為什么還有 Atomic?
Go 是一種擅長(zhǎng)并發(fā)的語(yǔ)言,啟動(dòng)新的 goroutine 就像輸入 “go” 一樣簡(jiǎn)單。隨著你發(fā)現(xiàn)自己構(gòu)建的系統(tǒng)越來(lái)越復(fù)雜,正確保護(hù)對(duì)共享資源的訪(fǎng)問(wèn)以防止競(jìng)爭(zhēng)條件變得極其重要。此類(lèi)資源可能包括可即時(shí)更新的配置(例如功能標(biāo)志)、內(nèi)部狀態(tài)(例如斷路器狀態(tài))等。
01 什么是競(jìng)態(tài)條件?
對(duì)于大多數(shù)讀者來(lái)說(shuō),這可能是基礎(chǔ)知識(shí),但由于本文的其余部分取決于對(duì)競(jìng)態(tài)條件的理解,因此有必要進(jìn)行簡(jiǎn)短的復(fù)習(xí)。競(jìng)態(tài)條件是一種情況,在這種情況下,程序的行為取決于其他不可控事件的順序或時(shí)間。在大多數(shù)情況下,這種情況是一個(gè)錯(cuò)誤,因?yàn)榭赡軙?huì)發(fā)生不希望的結(jié)果。
舉個(gè)具體的例子或許更容易理解:
- // race_condition_test.go
- package main
- import (
- "fmt"
- "sort"
- "sync"
- "testing"
- )
- func Test_RaceCondition(t *testing.T) {
- var s = make([]int, 0)
- wg := sync.WaitGroup{}
- // spawn 10 goroutines to modify the slice in parallel
- for i := 0; i < 10; i++ {
- wg.Add(1)
- go func(i int) {
- defer wg.Done()
- s = append(s, i) //add a new item to the slice
- }(i)
- }
- wg.Wait()
- sort.Ints(s) //sort the response to have comparable results
- fmt.Println(s)
- }
執(zhí)行一:
- $ go test -v race_condition_test.go
- === RUN Test_RaceCondition
- [0 1 2 3 4 5 6 7 8 9]
- --- PASS: Test_RaceCondition (0.00s)
這里看起來(lái)一切都很好。這是我們預(yù)期的輸出。該程序迭代了 10 次,并在每次迭代時(shí)將索引添加到切片中。
執(zhí)行二:
- === RUN Test_RaceCondition
- [0 3]
- --- PASS: Test_RaceCondition (0.00s)
等等,這里發(fā)生了什么?這次我們的響應(yīng)切片中只有兩個(gè)元素。這是因?yàn)榍衅膬?nèi)容 s 在加載和修改之間發(fā)生了變化,導(dǎo)致程序覆蓋了一些結(jié)果。這種特殊的競(jìng)態(tài)條件是由數(shù)據(jù)競(jìng)爭(zhēng)引起的,在這種情況下,多個(gè) goroutine 嘗試同時(shí)訪(fǎng)問(wèn)特定的共享變量,并且這些 goroutine 中的至少一個(gè)嘗試修改它。(注意,以上結(jié)果并非一定如此,每次運(yùn)行結(jié)果可能都不相同)
如果你使用 -race 標(biāo)志執(zhí)行測(cè)試,go 甚至?xí)嬖V你存在數(shù)據(jù)競(jìng)爭(zhēng)并幫助你準(zhǔn)確定位:
- $ go test race_condition_test.go -race
- ==================
- WARNING: DATA RACE
- Read at 0x00c000132048 by goroutine 9:
- command-line-arguments.Test_RaceCondition.func1()
- /home/sfinlay/go/src/benchmarks/race_condition_test.go:20 +0xb4
- command-line-arguments.Test_RaceCondition·dwrap·1()
- /home/sfinlay/go/src/benchmarks/race_condition_test.go:21 +0x47
- Previous write at 0x00c000132048 by goroutine 8:
- command-line-arguments.Test_RaceCondition.func1()
- /home/sfinlay/go/src/benchmarks/race_condition_test.go:20 +0x136
- command-line-arguments.Test_RaceCondition·dwrap·1()
- /home/sfinlay/go/src/benchmarks/race_condition_test.go:21 +0x47
- Goroutine 9 (running) created at:
- command-line-arguments.Test_RaceCondition()
- /home/sfinlay/go/src/benchmarks/race_condition_test.go:18 +0xc5
- testing.tRunner()
- /usr/local/go/src/testing/testing.go:1259 +0x22f
- testing.(*T).Run·dwrap·21()
- /usr/local/go/src/testing/testing.go:1306 +0x47
- Goroutine 8 (finished) created at:
- command-line-arguments.Test_RaceCondition()
- /home/sfinlay/go/src/benchmarks/race_condition_test.go:18 +0xc5
- testing.tRunner()
- /usr/local/go/src/testing/testing.go:1259 +0x22f
- testing.(*T).Run·dwrap·21()
- /usr/local/go/src/testing/testing.go:1306 +0x47
- ==================
02 并發(fā)控制
保護(hù)對(duì)這些共享資源的訪(fǎng)問(wèn)通常涉及常見(jiàn)的內(nèi)存同步機(jī)制,例如通道或互斥鎖。
這是將競(jìng)態(tài)條件調(diào)整為使用互斥鎖的相同測(cè)試用例:
- func Test_NoRaceCondition(t *testing.T) {
- var s = make([]int, 0)
- m := sync.Mutex{}
- wg := sync.WaitGroup{}
- // spawn 10 goroutines to modify the slice in parallel
- for i := 0; i < 10; i++ {
- wg.Add(1)
- go func(i int) {
- m.Lock()
- defer wg.Done()
- defer m.Unlock()
- s = append(s, i)
- }(i)
- }
- wg.Wait()
- sort.Ints(s) //sort the response to have comparable results
- fmt.Println(s)
- }
這次它始終返回所有 10 個(gè)整數(shù),因?yàn)樗_保每個(gè) goroutine 僅在沒(méi)有其他人執(zhí)行時(shí)才讀寫(xiě)切片。如果第二個(gè) goroutine 同時(shí)嘗試獲取鎖,它必須等到前一個(gè) goroutine 完成(即直到它解鎖)。
然而,對(duì)于高吞吐量系統(tǒng),性能變得非常重要,因此減少鎖爭(zhēng)用(即一個(gè)進(jìn)程或線(xiàn)程試圖獲取另一個(gè)進(jìn)程或線(xiàn)程持有的鎖的情況)變得更加重要。執(zhí)行此操作的最基本方法之一是使用讀寫(xiě)鎖 ( sync.RWMutex) 而不是標(biāo)準(zhǔn) sync.Mutex,但是 Go 還提供了一些原子內(nèi)存原語(yǔ)即 atomic 包。
03 原子
Go 的 atomic 包提供了用于實(shí)現(xiàn)同步算法的低級(jí)原子內(nèi)存原語(yǔ)。這聽(tīng)起來(lái)像是我們需要的東西,所以讓我們嘗試用 atomic 重寫(xiě)該測(cè)試:
- import "sync/atomic"
- func Test_RaceCondition_Atomic(t *testing.T) {
- var s = atomic.Value{}
- s.Store([]int{}) // store empty slice as the base
- wg := sync.WaitGroup{}
- // spawn 10 goroutines to modify the slice in parallel
- for i := 0; i < 10; i++ {
- wg.Add(1)
- go func(i int) {
- defer wg.Done()
- s1 := s.Load().([]int)
- s.Store(append(s1, i)) //replace the slice with a new one containing the new item
- }(i)
- }
- wg.Wait()
- s1 := s.Load().([]int)
- sort.Ints(s1) //sort the response to have comparable results
- fmt.Println(s1)
- }
執(zhí)行結(jié)果:
- === RUN Test_RaceCondition_Atomic
- [1 3]
- --- PASS: Test_RaceCondition_Atomic (0.00s)
什么?這和我們之前遇到的問(wèn)題完全一樣,那么這個(gè)包有什么好處呢?
04 讀取-復(fù)制-更新
atomic 不是靈丹妙藥,它顯然不能替代互斥鎖,但是當(dāng)涉及到可以使用讀取-復(fù)制-更新[1]模式管理的共享資源時(shí),它非常出色。在這種技術(shù)中,我們通過(guò)引用獲取當(dāng)前值,當(dāng)我們想要更新它時(shí),我們不修改原始值,而是替換指針(因此沒(méi)有人訪(fǎng)問(wèn)另一個(gè)線(xiàn)程可能訪(fǎng)問(wèn)的相同資源)。前面的示例無(wú)法使用此模式實(shí)現(xiàn),因?yàn)樗鼞?yīng)該隨著時(shí)間的推移擴(kuò)展現(xiàn)有資源而不是完全替換其內(nèi)容,但在許多情況下,讀取-復(fù)制-更新是完美的。
這是一個(gè)基本示例,我們可以在其中獲取和存儲(chǔ)布爾值(例如,對(duì)于功能標(biāo)志很有用)。在這個(gè)例子中,我們正在執(zhí)行一個(gè)并行基準(zhǔn)測(cè)試,比較原子和讀寫(xiě)互斥:
- package main
- import (
- "sync"
- "sync/atomic"
- "testing"
- )
- type AtomicValue struct{
- value atomic.Value
- }
- func (b *AtomicValue) Get() bool {
- return b.value.Load().(bool)
- }
- func (b *AtomicValue) Set(value bool) {
- b.value.Store(value)
- }
- func BenchmarkAtomicValue_Get(b *testing.B) {
- atomB := AtomicValue{}
- atomB.value.Store(false)
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- atomB.Get()
- }
- })
- }
- /************/
- type MutexBool struct {
- mutex sync.RWMutex
- flag bool
- }
- func (mb *MutexBool) Get() bool {
- mb.mutex.RLock()
- defer mb.mutex.RUnlock()
- return mb.flag
- }
- func BenchmarkMutexBool_Get(b *testing.B) {
- mb := MutexBool{flag: true}
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- mb.Get()
- }
- })
- }
結(jié)果:
- cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
- BenchmarkAtomicValue_Get
- BenchmarkAtomicValue_Get-8 1000000000 0.5472 ns/op
- BenchmarkMutexBool_Get
- BenchmarkMutexBool_Get-8 24966127 48.80 ns/op
結(jié)果很清楚。atomic 的速度提高了 89 倍以上。并且可以通過(guò)使用更原始的類(lèi)型來(lái)進(jìn)一步改進(jìn):
- type AtomicBool struct{ flag int32 }
- func (b *AtomicBool) Get() bool {
- return atomic.LoadInt32(&(b.flag)) != 0
- }
- func (b *AtomicBool) Set(value bool) {
- var i int32 = 0
- if value {
- i = 1
- }
- atomic.StoreInt32(&(b.flag), int32(i))
- }
- func BenchmarkAtomicBool_Get(b *testing.B) {
- atomB := AtomicBool{flag: 1}
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- atomB.Get()
- }
- })
- }
- cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
- BenchmarkAtomicBool_Get
- BenchmarkAtomicBool_Get-8 1000000000 0.3161 ns/op
此版本比互斥鎖版本快 154 倍以上。
寫(xiě)操作也顯示出明顯的差異(盡管規(guī)模并不那么令人印象深刻):
- func BenchmarkAtomicBool_Set(b *testing.B) {
- atomB := AtomicBool{flag: 1}
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- atomB.Set(true)
- }
- })
- }
- /************/
- func BenchmarkAtomicValue_Set(b *testing.B) {
- atomB := AtomicValue{}
- atomB.value.Store(false)
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- atomB.Set(true)
- }
- })
- }
- /************/
- func BenchmarkMutexBool_Set(b *testing.B) {
- mb := MutexBool{flag: true}
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- mb.Set(true)
- }
- })
- }
結(jié)果:
- cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
- BenchmarkAtomicBool_Set
- BenchmarkAtomicBool_Set-8 64624705 16.79 ns/op
- BenchmarkAtomicValue_Set
- BenchmarkAtomicValue_Set-8 47654121 26.43 ns/op
- BenchmarkMutexBool_Set
- BenchmarkMutexBool_Set-8 20124637 66.50 ns/op
在這里我們可以看到 atomic 在寫(xiě)入時(shí)比在讀取時(shí)慢得多,但仍然比互斥鎖快得多。有趣的是,我們可以看到互斥鎖讀取和寫(xiě)入之間的差異不是很明顯(慢 30%)。盡管如此, atomic 仍然表現(xiàn)得更好(比互斥鎖快 2-4 倍)。
05 為什么 atomic 這么快?
簡(jiǎn)而言之,原子操作很快,因?yàn)樗鼈円蕾?lài)于原子 CPU 指令而不是依賴(lài)外部鎖。使用互斥鎖時(shí),每次獲得鎖時(shí),goroutine 都會(huì)短暫暫停或中斷,這種阻塞占使用互斥鎖所花費(fèi)時(shí)間的很大一部分。原子操作可以在沒(méi)有任何中斷的情況下執(zhí)行。
06 atomic 總是答案嗎?
正如我們?cè)谝粋€(gè)早期示例中已經(jīng)證明的那樣,atomic 無(wú)法解決所有問(wèn)題,某些操作只能使用互斥鎖來(lái)解決。
考慮以下示例,該示例演示了我們使用 map 作為內(nèi)存緩存的常見(jiàn)模式:
- package main
- import (
- "sync"
- "sync/atomic"
- "testing"
- )
- //Don't use this implementation!
- type AtomicCacheMap struct {
- value atomic.Value //map[int]int
- }
- func (b *AtomicCacheMap) Get(key int) int {
- return b.value.Load().(map[int]int)[key]
- }
- func (b *AtomicCacheMap) Set(key, value int) {
- oldMap := b.value.Load().(map[int]int)
- newMap := make(map[int]int, len(oldMap)+1)
- for k, v := range oldMap {
- newMap[k] = v
- }
- newMap[key] = value
- b.value.Store(newMap)
- }
- func BenchmarkAtomicCacheMap_Get(b *testing.B) {
- atomM := AtomicCacheMap{}
- atomM.value.Store(testMap)
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- atomM.Get(0)
- }
- })
- }
- func BenchmarkAtomicCacheMap_Set(b *testing.B) {
- atomM := AtomicCacheMap{}
- atomM.value.Store(testMap)
- var i = 0
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- atomM.Set(i, i)
- i++
- }
- })
- }
- /************/
- type MutexCacheMap struct {
- mutex sync.RWMutex
- value map[int]int
- }
- func (mm *MutexCacheMap) Get(key int) int {
- mm.mutex.RLock()
- defer mm.mutex.RUnlock()
- return mm.value[key]
- }
- func (mm *MutexCacheMap) Set(key, value int) {
- mm.mutex.Lock()
- defer mm.mutex.Unlock()
- mm.value[key] = value
- }
- func BenchmarkMutexCacheMap_Get(b *testing.B) {
- mb := MutexCacheMap{value: testMap}
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- mb.Get(0)
- }
- })
- }
- func BenchmarkMutexCacheMap_Set(b *testing.B) {
- mb := MutexCacheMap{value: testMap}
- var i = 0
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- mb.Set(i, i)
- i++
- }
- })
- }
結(jié)果:
- cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz
- BenchmarkAtomicCacheMap_Get
- BenchmarkAtomicCacheMap_Get-8 301664540 4.194 ns/op
- BenchmarkAtomicCacheMap_Set
- BenchmarkAtomicCacheMap_Set-8 87637 95889 ns/op
- BenchmarkMutexCacheMap_Get
- BenchmarkMutexCacheMap_Get-8 20000959 54.63 ns/op
- BenchmarkMutexCacheMap_Set
- BenchmarkMutexCacheMap_Set-8 5012434 267.2 ns/op
哎呀,這種表現(xiàn)是痛苦的。這意味著,當(dāng)必須復(fù)制大型結(jié)構(gòu)時(shí),atomic 的性能非常差。不僅如此,此代碼還包含競(jìng)態(tài)條件。就像本文開(kāi)頭的切片案例一樣,原子緩存示例具有競(jìng)態(tài)條件,其中可能會(huì)在復(fù)制 map 和存儲(chǔ) map 的時(shí)間之間添加新的緩存條目,在這種情況下,新條目將丟失。在這種情況下,該 -race 標(biāo)志不會(huì)檢測(cè)到任何數(shù)據(jù)競(jìng)爭(zhēng),因?yàn)闆](méi)有對(duì)同一 map 的并發(fā)訪(fǎng)問(wèn)。
07 注意事項(xiàng)
Go 的文檔[2]警告了 atomic 包的潛在誤用:
這些函數(shù)需要非常小心才能正確使用。除了特殊的低級(jí)應(yīng)用程序,同步最好使用通道或 sync 包的工具來(lái)完成。通過(guò)通信共享內(nèi)存;不要通過(guò)共享內(nèi)存進(jìn)行通信。
開(kāi)始使用 atomic 包時(shí),你可能會(huì)遇到的第一個(gè)問(wèn)題是:
- panic: sync/atomic: store of inconsistently typed value into Value
使用 atomic.Store,確保每次調(diào)用方法時(shí)都存儲(chǔ)完全相同的類(lèi)型很重要。這聽(tīng)起來(lái)很容易,但通常并不像聽(tīng)起來(lái)那么簡(jiǎn)單:
- package main
- import (
- "fmt"
- "sync/atomic"
- )
- //Our own custom error type which implements the error interface
- type CustomError struct {
- Code int
- Message string
- }
- func (e CustomError) Error() string {
- return fmt.Sprintf("%d: %s", e.Code, e.Message)
- }
- func InternalServerError(msg string) error {
- return CustomError{Code: 500, Message: msg}
- }
- func main() {
- var (
- err1 error = fmt.Errorf("error happened")
- err2 error = InternalServerError("another error happened")
- )
- errVal := atomic.Value{}
- errVal.Store(err1)
- errVal.Store(err2) //panics here
- }
兩個(gè)值都是 error 類(lèi)型是不夠的,因?yàn)樗鼈冎皇菍?shí)現(xiàn)了錯(cuò)誤接口。它們的具體類(lèi)型仍然不同,因此 atomic 不喜歡它。
08 總結(jié)
競(jìng)態(tài)條件很糟糕,應(yīng)該保護(hù)對(duì)共享資源的訪(fǎng)問(wèn)?;コ怏w很酷,但由于鎖爭(zhēng)用而趨于緩慢。對(duì)于某些讀取-復(fù)制-更新模式有意義的情況(這往往是動(dòng)態(tài)配置之類(lèi)的東西,例如特性標(biāo)志、日志級(jí)別或 map 或結(jié)構(gòu)體,一次填充例如通過(guò) JSON 解析等),尤其是當(dāng)讀取次數(shù)比寫(xiě)入次數(shù)多時(shí)。atomic 通常不應(yīng)用于其他用例(例如,隨時(shí)間增長(zhǎng)的變量,如緩存),并且該特性的使用需要非常小心。
可能最重要的方法是將鎖保持在最低限度,如果你在在考慮原子等替代方案,請(qǐng)務(wù)必在投入生產(chǎn)之前對(duì)其進(jìn)行廣泛的測(cè)試和試驗(yàn)。
原文鏈接:https://www.sixt.tech/golangs-atomic
參考資料
[1]讀取-復(fù)制-更新: https://en.wikipedia.org/wiki/Read-copy-update
[2]文檔: https://pkg.go.dev/sync/atomic
本文轉(zhuǎn)載自微信公眾號(hào)「幽鬼」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系幽鬼公眾號(hào)。































