基于Etcdserver包將自己的Go程序打造成高可用系統(tǒng)
背景
我們每一個(gè)系統(tǒng)開發(fā)人員都希望自己的程序永遠(yuǎn)不宕機(jī),高可用是很多系統(tǒng)的目標(biāo)。那我們?nèi)绾伟炎约旱南到y(tǒng)改造成高可用的系統(tǒng)呢?帶著這個(gè)問題,本文就給大家演示下,如何自己動(dòng)手,從零開始基于raft協(xié)議來(lái)改造我們的已有系統(tǒng)。很多同學(xué)都知道Raft協(xié)議是一種分布式一致性算法。從用戶的角度出發(fā),它提供給程序設(shè)計(jì)人員的功能主要有以下2個(gè)方面
- 它提供了一個(gè)全局一致的存儲(chǔ)狀態(tài),這樣我們的程序就可以通過(guò)它在多個(gè)節(jié)點(diǎn)上存儲(chǔ)信息
- 它提供了容錯(cuò)的功能,當(dāng)leader不可用后,系統(tǒng)自動(dòng)開始選舉新的leader.而且每個(gè)節(jié)點(diǎn)知道自己的身份是follower還是leader.這樣我們就可以利用這個(gè)功能實(shí)現(xiàn)讀寫分離。
當(dāng)然很多同學(xué)講到,我們可以直接部署高可用的分布式鍵值存儲(chǔ)系統(tǒng)etcd,它本身具有高可用、高并發(fā)、一致性等特點(diǎn),已經(jīng)被廣泛應(yīng)用于云計(jì)算、微服務(wù)、容器等領(lǐng)域了,是很多云原生系統(tǒng)的底層基石之一。但是大家很快發(fā)現(xiàn)這樣做又增加了我們系統(tǒng)的依賴,所以本文給出的解決方案是直接采用etcdserver包(go.etcd.io/etcd/server/v3/etcdserver是etcd的Go語(yǔ)言實(shí)現(xiàn),提供了etcd服務(wù)器的主要功能,包括集群管理、數(shù)據(jù)存儲(chǔ)、數(shù)據(jù)同步等)本文接下來(lái)的內(nèi)容主要分為2部分,首先介紹下etcdserver的使用,然后以一個(gè)例子闡述下如何引用etcdserver包來(lái)實(shí)現(xiàn)高可用的系統(tǒng)的構(gòu)建,這種構(gòu)建方法不依賴于外部第三方的組件,所以它的分發(fā)與部署是比較輕便與簡(jiǎn)單的。由于內(nèi)容比較多,所以就暫定分兩期來(lái)介紹。
通過(guò)embed啟動(dòng)etcdserver
我們通過(guò)go.etcd.io/etcd/server/v3/embed這個(gè)包來(lái)快速啟動(dòng)集成的etcdserver。
package main
import (
_ "context"
"go.etcd.io/etcd/server/v3/embed"
"log"
)
func main() {
cfg := embed.NewConfig()
cfg.Dir = "/Users/dongluyang1/Documents/workspace/toutiao/etcdserversample" //etcd 數(shù)據(jù)存儲(chǔ)的目錄,用于持久化存儲(chǔ) etcd 數(shù)據(jù)。
cfg.WalDir = ""
cfg.Name = "test" //節(jié)點(diǎn)名稱
cfg.InitialCluster = "test=http://localhost:2380" //集群名稱
cfg.ClusterState = embed.ClusterStateFlagNew //etcd 集群的初始狀態(tài),可以是 new 或 existing。當(dāng)設(shè)置為 new 時(shí),將啟動(dòng)一個(gè)新的 etcd 集群;當(dāng)設(shè)置為 existing 時(shí),將加入一個(gè)已經(jīng)存在的 etcd 集群。
cfg.AutoCompactionMode = "periodic"
cfg.AutoCompactionRetention = "1"
cfg.QuotaBackendBytes = 8 * 1024 * 1024 * 1024
e, err := embed.StartEtcd(cfg)
if err != nil {
log.Fatalf("Failed to start etcd: %v", err)
}
defer e.Close()
select {} //阻止主程序退出,導(dǎo)致etcdserver退出
}
上面的cfg參數(shù)通過(guò)StartEtcd方法傳入embed,如下所示,實(shí)際上它的值最終傳給config.ServerConfig來(lái)實(shí)現(xiàn)對(duì)etcdserver的配置。
embed.NewConfig的值傳給了config.ServerConfig來(lái)控制etcdserver的配置
我們上面的代碼簡(jiǎn)單的給出了常用的配置,下面具體給出配置的含義
go.etcd.io/etcd/server/v3/config 包中的 serverConfig 結(jié)構(gòu)體包含了 etcd 服務(wù)器的配置信息,以下是該結(jié)構(gòu)體中各個(gè)參數(shù)的含義:
- Name: etcd 集群中當(dāng)前節(jié)點(diǎn)的名稱,可以是任何字符串,建議為集群中唯一的名稱。
- DataDir:etcd 數(shù)據(jù)存儲(chǔ)的目錄,用于持久化存儲(chǔ) etcd 數(shù)據(jù)。
- ListenClientUrls: etcd 服務(wù)器監(jiān)聽客戶端請(qǐng)求的 URL 地址,支持多個(gè) URL,以逗號(hào)分隔。例如:http://localhost:2379,http://localhost:4001。不填默認(rèn)2379
- ListenPeerUrls: etcd 服務(wù)器監(jiān)聽節(jié)點(diǎn)之間通信的 URL 地址,支持多個(gè) URL,以逗號(hào)分隔。例如:http://localhost:2380,http://localhost:7001。不填默認(rèn)2380
- InitialCluster: etcd 集群中所有節(jié)點(diǎn)的信息,以 name=URL 的形式表示,各節(jié)點(diǎn)信息之間以逗號(hào)分隔。例如:node1=http://localhost:2380,node2=http://localhost:7001。
- InitialClusterState: etcd 集群的初始狀態(tài),可以是 new 或 existing。當(dāng)設(shè)置為 new 時(shí),將啟動(dòng)一個(gè)新的 etcd 集群;當(dāng)設(shè)置為 existing 時(shí),將加入一個(gè)已經(jīng)存在的 etcd 集群。
- InitialClusterToken: etcd 集群的唯一標(biāo)識(shí)符,用于區(qū)分不同的 etcd 集群。當(dāng)啟動(dòng)一個(gè)新的 etcd 集群時(shí),需要指定一個(gè)唯一的標(biāo)識(shí)符。
- AutoCompactionRetention: etcd 自動(dòng)壓縮功能的保留時(shí)間,以天為單位。當(dāng) etcd 啟用自動(dòng)壓縮功能時(shí),將保留指定天數(shù)內(nèi)的數(shù)據(jù),過(guò)期數(shù)據(jù)將被刪除。
- AutoCompactionMode: etcd 自動(dòng)壓縮功能的模式,可以是 periodic 或 revision。當(dāng)設(shè)置為 periodic 時(shí),將按時(shí)間間隔壓縮數(shù)據(jù);當(dāng)設(shè)置為 revision 時(shí),將按事務(wù)數(shù)壓縮數(shù)據(jù)。
客戶端訪問etcdserver
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
go func() {
if err == nil {
for {
_, err = cli.Put(context.Background(), "yandaojiumozhi", fmt.Sprintf("mibao-%d", rand.Intn(100)))
if err != nil {
// handle error
} else {
resp, err := cli.Get(context.Background(), "yandaojiumozhi")
if err != nil {
// handle error
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
}
time.Sleep(5 * time.Second)
}
}
}()
defer cli.Close()
上面的代碼簡(jiǎn)單測(cè)試了,通過(guò)localhost:2379隨機(jī)寫入yandaojiumozhi,然后讀取這個(gè)key。
演示結(jié)果
上面的代碼使得我們不需要額外部署與維護(hù)第三方etcd組件,便可以再啟動(dòng)我們后臺(tái)程序的同時(shí)通過(guò)embed啟動(dòng)etcdserver來(lái)實(shí)現(xiàn)存儲(chǔ)了。
embed啟動(dòng)etcdserver的邏輯
go.etcd.io/etcd/server/v3/etcdserver 包是 etcd 服務(wù)器的核心包,它包含了 etcd 服務(wù)器的所有核心邏輯。其中 EtcdServer 結(jié)構(gòu)體是 etcd 服務(wù)器的核心,它負(fù)責(zé)管理 etcd 服務(wù)器的所有組件、監(jiān)聽客戶端請(qǐng)求、處理事務(wù)和維護(hù) etcd 數(shù)據(jù)庫(kù)等核心任務(wù)。這個(gè)包主要負(fù)責(zé) etcd 服務(wù)器的啟動(dòng)、停止和管理。而go.etcd.io/etcd/server/v3/embed 包則負(fù)責(zé)將go.etcd.io/etcd/server/v3/etcdserver 封裝到一個(gè)可嵌入的包中,使得在應(yīng)用程序中嵌入 etcd 服務(wù)器變得更加容易。所以搞明白embed如何啟動(dòng)的etcdserver,我們便可以直接在我們的代碼里面啟動(dòng)etcdserver,這樣便可以有更大的靈活性來(lái)做一些功能。比如判斷是否是leader等。
embed啟動(dòng)etcdserver的流程圖如下所示,它的核心是在2379,2380啟動(dòng)監(jiān)聽器,然后配置config.ServerConfig,以及調(diào)用NewServer,最后Start它。
embed創(chuàng)建并啟動(dòng)etcdserver的流程
所以我們自己也可以寫一個(gè)embed,來(lái)創(chuàng)建并啟動(dòng)etcdserver,然后通過(guò)下面的方法來(lái)判斷是不是leader。通過(guò)isLeader的判斷,來(lái)完成分布式環(huán)境下面的,不同角色自己該干的事情。
本期就先介紹這些,下一期給大家演示下分布式環(huán)境下的,節(jié)點(diǎn)加入,選舉等相關(guān)操作。
func check(srv *etcdserver.EtcdServer, ctx context.Context) {
log.Info("start check LeaderChanged")
for {
select {
case <-ctx.Done():
log.Info("has Done")
return
case <-srv.LeaderChangedNotify():
{
log.Info("Leader changed")
/*這個(gè)isLeader可以判斷當(dāng)前節(jié)點(diǎn)是不是leader,如果是leader的話,可以做一些
leader可以做的業(yè)務(wù),比如它可以接受寫請(qǐng)求,其他的收到寫請(qǐng)求,都轉(zhuǎn)發(fā)leader
*/
isLeader := srv.Leader() == srv.ID()
......
}
}
}
}