徒手用 Go 寫個(gè) Redis 服務(wù)器(Godis)
今天給大家?guī)?lái)的開源項(xiàng)目是 Godis:一個(gè)用 Go 語(yǔ)言實(shí)現(xiàn)的 Redis 服務(wù)器。支持:
- 5 種數(shù)據(jù)結(jié)構(gòu)(string、list、hash、set、sortedset)
- 自動(dòng)過期(TTL)
- 發(fā)布訂閱、地理位置、持久化等功能
你或許不需要自己實(shí)現(xiàn) Redis 服務(wù),但你是否厭煩了每天都是寫增刪改查的業(yè)務(wù)代碼,想提高編程水平試圖從零寫個(gè)項(xiàng)目打開 IDE 卻發(fā)現(xiàn)無(wú)從下手?
動(dòng)手造輪子一定是提高編程能力的好辦法,下面就帶大家用 Go 從頭開始寫一個(gè) Redis 服務(wù)器(Godis),從中你將學(xué)到:
- 如何編寫 Go 語(yǔ)言 TCP 服務(wù)器
- 設(shè)計(jì)并實(shí)現(xiàn)安全可靠的通信協(xié)議(redis 協(xié)議)
- 如何使用 Go 語(yǔ)言開發(fā)高并發(fā)程序
- 設(shè)計(jì)和實(shí)現(xiàn)分布式集群以及分布式事務(wù)
- 熟悉鏈表、哈希表、跳表以及時(shí)間輪等常用數(shù)據(jù)結(jié)構(gòu)
千萬(wàn)不要擔(dān)心內(nèi)容太難,學(xué)不會(huì)或者沒有 Go 語(yǔ)言基礎(chǔ)!!雖然示例代碼是 Go 但不會(huì)影響你理解 Redis 的原理和底層協(xié)議以及高性能的秘密。而且作者為了照顧到廣大讀者,對(duì)技術(shù)的講解做了優(yōu)化。示例代碼在原項(xiàng)目基礎(chǔ)上做了簡(jiǎn)化,并逐行地加了注釋。如果是高級(jí)玩家,請(qǐng)直接訪問項(xiàng)目閱讀源碼:
https://github.com/HDT3213/godis
下面正文開始,讓我們一起撥開 Redis 的迷霧。
一、寫個(gè) TCP 服務(wù)器
眾所周知 Redis 是 C/S 模型,使用 TCP 協(xié)議進(jìn)行通信。接下來(lái)就從實(shí)現(xiàn) TCP 服務(wù)端開始。作為廣泛用于服務(wù)端的編程語(yǔ)言 Golang 提供了非常簡(jiǎn)潔的 TCP 接口,所以實(shí)現(xiàn)起來(lái)十分方便。示例代碼:
- func ListenAndServe(address string) {
- // 綁定監(jiān)聽地址
- listener, err := net.Listen("tcp", address)
- if err != nil {
- log.Fatal(fmt.Sprintf("listen err: %v", err))
- }
- defer listener.Close()
- log.Println(fmt.Sprintf("bind: %s, start listening...", address))
- for {
- // Accept 會(huì)一直阻塞直到有新的連接建立或者listen中斷才會(huì)返回
- conn, err := listener.Accept()
- if err != nil {
- // 通常是由于listener被關(guān)閉無(wú)法繼續(xù)監(jiān)聽導(dǎo)致的錯(cuò)誤
- log.Fatal(fmt.Sprintf("accept err: %v", err))
- }
- // 開啟新的 goroutine 處理該連接
- go Handle(conn)
- }
- }
- func Handle(conn net.Conn) {
- reader := bufio.NewReader(conn)
- for {
- // ReadString 會(huì)一直阻塞直到遇到分隔符 '\n'
- // 遇到分隔符后 ReadString 會(huì)返回上次遇到分隔符到現(xiàn)在收到的所有數(shù)據(jù)
- // 若在遇到分隔符之前發(fā)生異常, ReadString 會(huì)返回已收到的數(shù)據(jù)和錯(cuò)誤信息
- msg, err := reader.ReadString('\n')
- if err != nil {
- // 通常遇到的錯(cuò)誤是連接中斷或被關(guān)閉,用io.EOF表示
- if err == io.EOF {
- log.Println("connection close")
- } else {
- log.Println(err)
- }
- return
- }
- b := []byte(msg)
- // 將收到的信息發(fā)送給客戶端
- conn.Write(b)
- }
- }
- func main() {
- ListenAndServe(":8000")
- }
:ok_hand: 至此只用了 40 行代碼就搞定服務(wù)端啦!啟動(dòng)上面的 TCP 服務(wù)后,在終端中輸入 telnet 127.0.0.1 8000
就可以連接到剛寫好的服務(wù)器,它會(huì)將你發(fā)送的消息原樣返回給你(所以請(qǐng)不要罵它):
這個(gè) TCP 服務(wù)器的非常簡(jiǎn)單,主協(xié)程調(diào)用 accept 函數(shù)來(lái)監(jiān)聽端口,接受新連接后開啟一個(gè) Goroutine 來(lái)處理它。這種簡(jiǎn)單的阻塞 IO 模型有些類似于早期的 Tomcat/Apache 服務(wù)器。
阻塞 IO 模型是使用 一個(gè)線程處理一個(gè)連接 ,在沒有收到新數(shù)據(jù)時(shí)監(jiān)聽線程處于阻塞狀態(tài),直到數(shù)據(jù)就緒后線程被喚醒進(jìn)行處理。因?yàn)樽枞?IO 模型需要開啟大量線程并且頻繁地進(jìn)行上下文切換,所以它的效率很低。而 Redis 使用的 epoll 技術(shù)(IO 多路復(fù)用)用 一個(gè)線程處理大量連接 ,極大地提高了吞吐量。那么我們的 TCP 服務(wù)器會(huì)比 Redis 慢很多嗎?
當(dāng)然不會(huì),Golang 利用 Goroutine 調(diào)度開銷遠(yuǎn)遠(yuǎn)小于線程調(diào)度開銷的優(yōu)勢(shì)封裝出 goroutine-per-connection
風(fēng)格的極簡(jiǎn)接口,而且 net/tcp 庫(kù)將 epoll 封裝成了阻塞 IO 的樣子,在享受 epoll 高性能的同時(shí)避免了原生 epoll 接口所需的復(fù)雜異步代碼。
在作者的電腦上 Redis 每秒可以響應(yīng) 10.6k 個(gè) PING 命令,而 Godis(完整代碼) 的吞吐量為 9.2 kqps 相差并不大。想了解更多 Golang 高性能的:secret:密,可以搜索 go netpoller
或者 go 語(yǔ)言 網(wǎng)絡(luò)輪詢器
關(guān)鍵字
另外,合格的 TCP 的服務(wù)器在關(guān)閉的時(shí)候不應(yīng)該一停了之,而需要完成響應(yīng)已接收的請(qǐng)求、釋放 TCP 連接等必要的清理工作。這個(gè)功能我們一般稱為 優(yōu)雅關(guān)閉
或者 graceful shutdown
,優(yōu)雅關(guān)閉步驟:
- 首先,關(guān)閉 listener 停止接受新連接
- 然后,遍歷所有存活連接逐個(gè)關(guān)閉
優(yōu)雅關(guān)閉的代碼比較多,這里就不完整貼出了。
二、透視 Redis 協(xié)議
在解決完通信后,下一步就是搞清楚 Redis 的協(xié)議,其實(shí)就是一套序列化協(xié)議類似 JSON、Protocol Buffers,你看底層其實(shí)也就是一些基礎(chǔ)的知識(shí)。
自 Redis 2.0 以后的通信統(tǒng)一為 RESP 協(xié)議(REdis Serialization Protocol),該協(xié)議易于實(shí)現(xiàn)不僅可以高效的被程序解析,還能夠被人類讀懂容易調(diào)試。
RESP 是一個(gè)二進(jìn)制安全的文本協(xié)議,工作于 TCP 協(xié)議上。RESP 以行作為單位,客戶端和服務(wù)器發(fā)送的命令或數(shù)據(jù)一律以 \r\n
(CRLF)作為換行符。
二進(jìn)制安全是指允許協(xié)議中出現(xiàn)任意字符而不會(huì)導(dǎo)致故障。比如 C 語(yǔ)言的字符串以 \0
作為結(jié)尾不允許字符串中間出現(xiàn) \0
,而 Go 語(yǔ)言的 string 則允許出現(xiàn) \0
,我們說 Go 語(yǔ)言的 string 是二進(jìn)制安全的,而 C 語(yǔ)言字符串不是二進(jìn)制安全的。
RESP 的二進(jìn)制安全性允許我們?cè)?key 或者 value 中包含 \r
或者 \n
這樣的特殊字符。在使用 Redis 存儲(chǔ) protobuf、msgpack 等二進(jìn)制數(shù)據(jù)時(shí),二進(jìn)制安全性尤為重要。
RESP 定義了 5 種格式:
- 簡(jiǎn)單字符串(Simple String): 服務(wù)器用來(lái)返回簡(jiǎn)單的結(jié)果,比如 "OK" 非二進(jìn)制安全,且不允許換行
- 錯(cuò)誤信息(Error):服務(wù)器用來(lái)返回簡(jiǎn)單的錯(cuò)誤信息,比如 "ERR Invalid Synatx" 非二進(jìn)制安全,且不允許換行
- 整數(shù)(Integer):llen、scard 等命令的返回值,64 位有符號(hào)整數(shù)
- 字符串(Bulk String):二進(jìn)制安全字符串,比如 get 等命令的返回值
- 數(shù)組(Array,又稱 Multi Bulk Strings):Bulk String 數(shù)組,客戶端發(fā)送指令以及 lrange 等命令響應(yīng)的格式
RESP 通過第一個(gè)字符來(lái)表示格式:
- $
- *
下面讓我們通過一些實(shí)際例子來(lái)理解協(xié)議。
2.1 字符串
字符串(Bulk String)有兩行,第一行為 $
+正文長(zhǎng)度,第二行為實(shí)際內(nèi)容。如:
- $3\r\nSET\r\n
字符串(Bulk String)是二進(jìn)制安全的,就是說可以在 Bulk String 內(nèi)部包含 "\r\n" 字符(行尾的 CRLF 被隱藏):
- $4
- a\r\nb
2.2 空
$-1
表示 nil,比如使用 get 命令查詢一個(gè)不存在的 key 時(shí),響應(yīng)即為 $-1
。
2.3 數(shù)組
數(shù)組(Array)格式第一行為 "*"+數(shù)組長(zhǎng)度,其后是相應(yīng)數(shù)量的 字符串(Bulk String)。比如 ["foo", "bar"]
的報(bào)文(傳輸時(shí)的內(nèi)容):
- *2
- $3
- foo
- $3
- bar
客戶端也使用 數(shù)組(Array)格式向服務(wù)端發(fā)送指令。命令本身將作為第一個(gè)參數(shù),比如 SET key value
指令的 RESP 報(bào)文:
- *3
- $3
- SET
- $3
- key
- $5
- value
將換行符打印出來(lái):
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
2.4 解析預(yù)備
知道常用的 RESP 報(bào)文內(nèi)容后,就可以開始著手解析了。但需要注意的是 RESP 是 二進(jìn)制安全
的協(xié)議,它允許在正文中使用 \r\n
字符。舉例來(lái)說 Redis 可以正確接收并執(zhí)行 SET "a\r\nb" hellogithub
指令,這條指令的正確報(bào)文是這樣的:
- *3
- $3
- SET
- $4
- a\r\nb
- $11
- hellogithub
當(dāng) ReadBytes
讀取到第五行 "a\r\nb\r\n" 時(shí)會(huì)將其誤認(rèn)為兩行:
- *3
- $3
- SET
- $4
- a // 錯(cuò)誤的分行
- b // 錯(cuò)誤的分行
- $11
- hellogithub
因此當(dāng)讀取到第四行 $4
后,不應(yīng)該繼續(xù)使用 ReadBytes('\n')
讀取下一行,應(yīng)使用 io.ReadFull(reader, msg)
方法來(lái)讀取指定長(zhǎng)度的內(nèi)容。
- msg = make([]byte, 4 + 2) // 正文長(zhǎng)度4 + 換行符長(zhǎng)度2
- _, err = io.ReadFull(reader, msg)
2.5 編寫 RESP 協(xié)議解析器
解決完上面內(nèi)容包含 "\r\n" 的問題,我們就可以開始放手編寫 Redis 協(xié)議解析器啦!
- type Payload struct {
- Data redis.Reply
- Err error
- }
- // ParseStream 通過 io.Reader 讀取數(shù)據(jù)并將結(jié)果通過 channel 將結(jié)果返回給調(diào)用者
- // 流式處理的接口適合供客戶端/服務(wù)端使用
- func ParseStream(reader io.Reader) <-chan *Payload {
- ch := make(chan *Payload)
- go parse0(reader, ch)
- return ch
- }
由于解析器的代碼比較多,這里只簡(jiǎn)單地介紹一下核心流程。
- func parse0(reader io.Reader, ch chan<- *Payload) {
- // 初始化讀取狀態(tài)
- readingMultiLine := false
- expectedArgsCount := 0
- var args [][]byte
- var bulkLen int64
- for {
- // 上文中我們提到 RESP 是以行為單位的
- // 因?yàn)樾蟹譃楹?jiǎn)單字符串和二進(jìn)制安全的 BulkString,我們需要封裝一個(gè) readLine 函數(shù)來(lái)兼容
- line, err = readLine(reader, bulkLen)
- if err != nil {
- // 處理錯(cuò)誤
- return
- }
- // 接下來(lái)我們對(duì)剛剛讀取的行進(jìn)行解析
- // 我們簡(jiǎn)單的將 Reply 分為兩類:
- // 單行: StatusReply, IntReply, ErrorReply
- // 多行: BulkReply, MultiBulkReply
- if !readingMultiLine {
- if isMulitBulkHeader(line) {
- // 我們收到了 MulitBulkReply 的第一行
- // 獲得 MulitBulkReply 中 BulkString 的個(gè)數(shù)
- expectedArgsCount = parseMulitBulkHeader(line)
- // 等待 MulitBulkReply 后續(xù)行
- readingMultiLine = true
- } else if isBulkHeader(line) {
- // 我們收到了 BulkReply 的第一行
- // 獲得 BulkReply 第二行的長(zhǎng)度, 通過 bulkLen 告訴 readLine 函數(shù)下一行 BulkString 的長(zhǎng)度
- bulkLen = parseBulkHeader()
- // 這個(gè) Reply 中一共有 1 個(gè) BulkString
- expectedArgsCount = 1
- // 等待 BulkReply 后續(xù)行
- readingMultiLine = true
- } else {
- // 處理 StatusReply, IntReply, ErrorReply 等單行 Reply
- reply := parseSingleLineReply(line)
- // 通過 ch 返回結(jié)果
- emitReply(ch)
- }
- } else {
- // 進(jìn)入此分支說明我們正在等待 MulitBulkReply 或 BulkReply 的后續(xù)行
- // MulitBulkReply 的后續(xù)行有兩種,BulkHeader 或者 BulkString
- if isBulkHeader(line) {
- bulkLen = parseBulkHeader()
- } else {
- // 我們正在讀取一個(gè) BulkString, 它可能是 MulitBulkReply 或 BulkReply
- args = append(args, line)
- }
- if len(args) == expectedArgsCount { // 我們已經(jīng)讀取了所有后續(xù)行
- // 通過 ch 返回結(jié)果
- emitReply(ch)
- // 重置狀態(tài), 準(zhǔn)備解析下一條 Reply
- readingMultiLine = false
- expectedArgsCount = 0
- args = nil
- bulkLen = 0
- }
- }
- }
- }
三、實(shí)現(xiàn)內(nèi)存數(shù)據(jù)庫(kù)
至此我們已經(jīng)搞定數(shù)據(jù)接收和解析的部分了,剩下就是我們應(yīng)該把數(shù)據(jù)存在哪里了?
拋開持久化部分,作為基于內(nèi)存的 KV 數(shù)據(jù)庫(kù) Redis 的所有數(shù)據(jù)需要都存儲(chǔ)在內(nèi)存中的哈希表,而這個(gè)哈希表就是我們今天需要編寫的最后一個(gè)組件。
與單線程的 Redis 不同我們實(shí)現(xiàn)的 Redis(godis)是并行工作的,所以我們必須考慮各種并發(fā)安全問題。常見的并發(fā)安全哈希表設(shè)計(jì)有幾種:
-
sync.map
:Golang 官方提供的并發(fā)哈希表,適合讀多寫少的場(chǎng)景。但是在m.dirty
剛被提升后會(huì)將m.read
復(fù)制到新的m.dirty
中,在數(shù)據(jù)量較大的情況下復(fù)制操作會(huì)阻塞所有協(xié)程,存在較大的隱患。 -
juc.ConcurrentHashMap
:Java 的并發(fā)哈希表采用分段鎖實(shí)現(xiàn)。在進(jìn)行擴(kuò)容時(shí)訪問哈希表線程都將協(xié)助進(jìn)行 rehash 操作,在 rehash 結(jié)束前所有的讀寫操作都會(huì)阻塞。因?yàn)榫彺鏀?shù)據(jù)庫(kù)中鍵值對(duì)數(shù)量巨大且對(duì)讀寫操作響應(yīng)時(shí)間要求較高,使用 juc 的策略是不合適的。 -
memcached hashtable
:在后臺(tái)線程進(jìn)行 rehash 操作時(shí),主線程會(huì)判斷要訪問的哈希槽是否已被 rehash 從而決定操作 old_hashtable 還是操作 new_hashtable。這種設(shè)計(jì)被稱為 漸進(jìn)式 rehash 它的優(yōu)點(diǎn)是 rehash 操作基本不會(huì)阻塞主線程的讀寫,是最理想的的方案。
但漸進(jìn)式 rehash 的實(shí)現(xiàn)非常復(fù)雜,所以 godis 采用 Golang 社區(qū)廣泛使用的分段鎖策略(非上面的三種),就是將 key 分散到固定數(shù)量的 shard 中避免進(jìn)行整體 rehash 操作。shard 是有鎖保護(hù)的 map,當(dāng) shard 進(jìn)行 rehash 時(shí)會(huì)阻塞 shard 內(nèi)的讀寫,但不會(huì)對(duì)其他 shard 造成影響。
代碼如下:
- type ConcurrentDict struct {
- table []*Shard
- count int32
- }
- type Shard struct {
- m map[string]interface{}
- mutex sync.RWMutex
- }
- func (dict *ConcurrentDict) spread(hashCode uint32) uint32 {
- tableSize := uint32(len(dict.table))
- return (tableSize - 1) & uint32(hashCode)
- }
- func (dict *ConcurrentDict) getShard(index uint32) *Shard {
- return dict.table[index]
- }
- func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) {
- hashCode := fnv32(key)
- index := dict.spread(hashCode)
- shard := dict.getShard(index)
- shard.mutex.RLock()
- defer shard.mutex.RUnlock()
- val, exists = shard.m[key]
- return
- }
- func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) {
- if dict == nil {
- panic("dict is nil")
- }
- hashCode := fnv32(key)
- index := dict.spread(hashCode)
- shard := dict.getShard(index)
- shard.mutex.Lock()
- defer shard.mutex.Unlock()
- if _, ok := shard.m[key]; ok {
- shard.m[key] = val
- return 0
- } else {
- shard.m[key] = val
- dict.addCount()
- return 1
- }
- }
ConcurrentDict
可以保證對(duì)單個(gè) key 操作的并發(fā)安全性,但是仍然無(wú)法滿足并發(fā)安全的需求,舉例來(lái)說:
- 讀取 -> 做加法 -> 寫入
因此我們需要實(shí)現(xiàn) db.Locker
用于鎖定一個(gè)或一組 key 直到我們完成所有操作后再釋放。
實(shí)現(xiàn) db.Locker
最直接的想法是使用一個(gè) map[string]*sync.RWMutex
- 加鎖過程分為兩步:初始化 mutex -> 加鎖
- 解鎖過程也分為兩步: 解鎖 -> 釋放mutex
那么存在一個(gè)無(wú)法解決的并發(fā)問題:
時(shí)間 | 協(xié)程A | 協(xié)程B |
---|---|---|
1 | locker["a"].Unlock() | |
2 | locker["a"] = &sync.RWMutex{} | |
3 | delete(locker["a"]) | |
4 | locker["a"].Lock() |
由于 t3 時(shí)協(xié)程 B 釋放了鎖,t4 時(shí)協(xié)程 A 試圖加鎖會(huì)失敗。若協(xié)程B在解鎖時(shí)不執(zhí)行 delete(locker["a"])
就可以避免該異常的發(fā)生,但是這樣會(huì)造成嚴(yán)重的內(nèi)存泄露。
我們注意到哈希槽的數(shù)量遠(yuǎn)少于 key 的數(shù)量,反過來(lái)說多個(gè)鍵可以共用一個(gè)哈希槽。所以我們不再直接對(duì) key 進(jìn)行加鎖而是鎖定 key 所在的哈希槽也可以保證安全,另一方面哈希槽數(shù)量較少即使不釋放也不會(huì)消耗太多內(nèi)存。
- type Locks struct {
- table []*sync.RWMutex
- }
- func Make(tableSize int) *Locks {
- table := make([]*sync.RWMutex, tableSize)
- for i := 0; i < tableSize; i++ {
- table[i] = &sync.RWMutex{}
- }
- return &Locks{
- table: table,
- }
- }
- func (locks *Locks)Lock(key string) {
- index := locks.spread(fnv32(key))
- mu := locks.table[index]
- mu.Lock()
- }
- func (locks *Locks)UnLock(key string) {
- index := locks.spread(fnv32(key))
- mu := locks.table[index]
- mu.Unlock()
- }
在鎖定多個(gè) key 時(shí)需要注意,若 協(xié)程A 持有 鍵a 的鎖試圖獲得 鍵b 的鎖,此時(shí) 協(xié)程B 持有 鍵b 的鎖試圖獲得 鍵a 的鎖則會(huì)形成死鎖。
解決方法是所有協(xié)程都按照相同順序加鎖,若兩個(gè)協(xié)程都想獲得 鍵a 和 鍵b 的鎖,那么必須先獲取 鍵a 的鎖后獲取 鍵b 的鎖,這樣就可以避免循環(huán)等待。
到目前為止構(gòu)建 Redis 服務(wù)器所需的基本組件已經(jīng)備齊,只需要將 TCP 服務(wù)器、協(xié)議解析器與哈希表組裝起來(lái)我們的 Redis 服務(wù)器就可以開始工作啦。
最后,以上代碼均簡(jiǎn)化自我寫的開源項(xiàng)目 Godis:一個(gè)用 Go 語(yǔ)言實(shí)現(xiàn)的 Redis 服務(wù)器。期待您的關(guān)注和 Star:
項(xiàng)目地址: https://github.com/HDT3213/godis
結(jié)語(yǔ)
很多朋友的日常工作主要是編寫業(yè)務(wù)代碼,對(duì)于框架、數(shù)據(jù)庫(kù)、中間件這些“架構(gòu)”、“底層代碼” 有一些恐懼感。
但本文我們只寫了 3 個(gè)組件,共計(jì)幾百行代碼就實(shí)現(xiàn)了一個(gè)基本的 Redis 服務(wù)器。所以底層的技術(shù)并不難,只要你對(duì)技術(shù)感興趣由淺入深、從簡(jiǎn)到繁,“底層代碼”也并不神秘。