一篇文章帶你學(xué)習(xí)etcd-wal模塊解析
Part1常見(jiàn)的數(shù)據(jù)庫(kù)日志
傳統(tǒng)數(shù)據(jù)庫(kù)的日志,例如 redo log(重做日志),記錄的是修改后的數(shù)據(jù)。其實(shí)就是 MySQL 里經(jīng)常說(shuō)到的 WAL 技術(shù),它的關(guān)鍵點(diǎn)就是先寫(xiě)日志,再寫(xiě)磁盤(pán)。
- redo log 是 InnoDB 引擎特有的;binlog 是 MySQL 的 Server 層實(shí)現(xiàn)的,所有引擎都可以使用。redo log 是物理日志,記錄的是“在某個(gè)數(shù)據(jù)頁(yè)上做了什么修改”;
- binlog 是邏輯日志,記錄的是這個(gè)語(yǔ)句的原始邏輯,比如“給 ID=2 這一行的 c 字段加 1 ”。redo log 是循環(huán)寫(xiě)的,空間固定會(huì)用完;
- binlog 是可以追加寫(xiě)入的。“追加寫(xiě)”是指 binlog 文件寫(xiě)到一定大小后會(huì)切換到下一個(gè),并不會(huì)覆蓋以前的日志。
redis使用AOF(Append Only File),這樣做的好處是會(huì)在執(zhí)行命令成功后保存,不需要提前驗(yàn)證命令是否正確。AOF會(huì)保存服務(wù)器執(zhí)行的所有寫(xiě)操作到日志文件中,在服務(wù)重啟以后,會(huì)執(zhí)行這些命令來(lái)恢復(fù)數(shù)據(jù)。而 AOF 里記錄的是 Redis 收到的每一條命令,這些命令是以文本形式保存的。
etcd會(huì)判斷命令是否合法,然后Leader 收到提案后,通過(guò) Raft 模塊的事件總線保存待發(fā)給 Follower 節(jié)點(diǎn)的消息和待持久化的日志條目,日志條目是封裝的entry。etcdserver 從 Raft 模塊獲取到以上消息和日志條目后,作為 Leader,它會(huì)將 put 提案消息廣播給集群各個(gè)節(jié)點(diǎn),同時(shí)需要把集群 Leader 任期號(hào)、投票信息、已提交索引、提案內(nèi)容持久化到一個(gè) WAL(Write Ahead Log)日志文件中,用于保證集群的一致性、可恢復(fù)性。
Part2wal源碼分析
etcd server在啟動(dòng)時(shí),會(huì)根據(jù)是否wal目錄來(lái)確定之前etcd是否創(chuàng)建過(guò)wal,如果沒(méi)有創(chuàng)建wal,etcd會(huì)嘗試調(diào)用wal.Create方法,創(chuàng)建wal。否則使用wal.Open及wal.ReadAll方法是reload之前的wal,邏輯在etcd/etcdserver/server.go的NewServer方法里,存在wal時(shí)會(huì)調(diào)用restartNode,下面分創(chuàng)建wal和加載wal兩種情況作介紹。
wal的關(guān)鍵對(duì)象介紹如下
wal日志結(jié)構(gòu).png
dir:wal文件保存的路徑
dirFile:dir打開(kāi)后的一個(gè)目錄fd對(duì)象
metadata:創(chuàng)建wal時(shí)傳入的字節(jié)序列,etcd里面主要是序列化的是節(jié)點(diǎn)id及集群id相關(guān)信息,后續(xù)每創(chuàng)建一個(gè)wal文件就會(huì)將其寫(xiě)到wal的首部。
state:wal在append過(guò)程中保存的hardState信息,每次raft傳出的hardState有變化都會(huì)被更新,并會(huì)及時(shí)刷盤(pán),在wal有切割時(shí)會(huì)在新的wal頭部保存最新的
hardState信息,etcd重啟后會(huì)讀取最后一次保存的hardState用來(lái)恢復(fù)宕機(jī)或機(jī)器重啟時(shí)storage中hardState狀態(tài)信息,hardState的結(jié)構(gòu)如下:
- type HardState struct {
- Term uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
- Vote uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
- Commit uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
- XXX_unrecognized []byte `json:"-"`
- }
start:記錄最后一次保存的snapshot的元數(shù)據(jù)信息,主要是snapshot中最后一條日志Entry的index和Term,walpb.Snapshot的結(jié)構(gòu)如:
- type Snapshot struct {
- Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
- Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
- XXX_unrecognized []byte `json:"-"`
- }
decoder:負(fù)責(zé)在讀取WAL日志文件時(shí),將protobuf反序列化成Record實(shí)例。
readClose:用于關(guān)閉decoder關(guān)聯(lián)的reader,關(guān)閉wal讀模式,通過(guò)是在readALL之后調(diào)用該函數(shù)實(shí)現(xiàn)的
enti:最后一次保存到wal中的日志Entry的index
encoder:負(fù)責(zé)將寫(xiě)入WAL日志文件的Record實(shí)例進(jìn)行序列化成protobuf。
size :創(chuàng)建臨時(shí)文件時(shí)預(yù)分配空間的大小,默認(rèn)是 64MB (由wal.SegmentSizeBytes指定,該值也是每個(gè)日志文件的大小)。
locks:當(dāng)前WAL實(shí)例管理的所有WAL日志文件對(duì)應(yīng)的句柄。
fp:filePipeline實(shí)例負(fù)責(zé)創(chuàng)建新的臨時(shí)文件。
WAL創(chuàng)建
先來(lái)看一下wal.Create()方法,該方法不僅會(huì)創(chuàng)建WAL實(shí)例,而是做了很多初始化工作,其大致步驟如下:
(1)創(chuàng)建臨時(shí)目錄,并在臨時(shí)目錄中創(chuàng)建編號(hào)為“0-0”的WAL日志文件,WAL日志文件名由兩部分組成,一部分是seq(單調(diào)遞增),另一部分是該日志文件中的第一條日志記錄的索引值。
(2)嘗試為該WAL日志文件預(yù)分配磁盤(pán)空間。
(3)向該WAL日志文件中寫(xiě)入一條crcType類(lèi)型的日志記錄、一條metadataType類(lèi)型的日志記錄及一條snapshotType類(lèi)型的日志記錄。
(4)創(chuàng)建WAL實(shí)例關(guān)聯(lián)的filePipeline實(shí)例。
(5)將臨時(shí)目錄重命名為WAL.dir字段指定的名稱(chēng)。這里之所以先使用臨時(shí)目錄完成初始化操作再將其重命名的方式,主要是為了讓整個(gè)初始化過(guò)程看上去是一個(gè)原子操作。這樣上層模塊只需要檢查wal的目錄是否存在。
wal.Create()方法的具體實(shí)現(xiàn)如下:
- // Create creates a WAL ready for appending records. The given metadata is
- // recorded at the head of each WAL file, and can be retrieved(檢索) with ReadAll.
- func Create(dirpath string, metadata []byte) (*WAL, error) {
- // keep temporary wal directory so WAL initialization appears atomic
- //先使用臨時(shí)目錄完成初始化操作再將其重命名的方式,主要是為了讓整個(gè)初始化過(guò)程看上去是一個(gè)原子操作。
- tmpdirpath := filepath.Clean(dirpath) + ".tmp"
- if fileutil.Exist(tmpdirpath) {
- if err := os.RemoveAll(tmpdirpath); err != nil {
- return nil, err
- }
- }
- if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
- return nil, err
- }
- // dir/filename ,filename從walName獲取 seq-index.wal
- p := filepath.Join(tmpdirpath, walName(0, 0))
- // 創(chuàng)建對(duì)文件上互斥鎖
- f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
- if err != nil {
- return nil, err
- }
- // 定位到文件末尾
- if _, err = f.Seek(0, io.SeekEnd); err != nil {
- return nil, err
- }
- // 預(yù)分配文件,大小為SegmentSizeBytes(64MB)
- if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
- return nil, err
- }
- // 新建WAL結(jié)構(gòu)
- w := &WAL{
- dir: dirpath,
- metadata: metadata,// metadata 可為nil
- }
- // 在這個(gè)wal文件上創(chuàng)建一個(gè)encoder
- w.encoder, err = newFileEncoder(f.File, 0)
- if err != nil {
- return nil, err
- }
- // 把這個(gè)上了互斥鎖的文件加入到locks數(shù)組中
- w.locks = append(w.locks, f)
- if err = w.saveCrc(0); err != nil {
- return nil, err
- }
- // 將metadataType類(lèi)型的record記錄在wal的header處
- if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
- return nil, err
- }
- // 保存空的snapshot
- if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
- return nil, err
- }
- // 之前以.tmp結(jié)尾的文件,初始化完成之后重命名
- if w, err = w.renameWal(tmpdirpath); err != nil {
- return nil, err
- }
- // directory was renamed; sync parent dir to persist rename
- pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
- if perr != nil {
- return nil, perr
- }
- // 將parent dir 進(jìn)行同步到磁盤(pán)
- if perr = fileutil.Fsync(pdir); perr != nil {
- return nil, perr
- }
- return w, nil
- }
WAL日志文件遵循一定的命名規(guī)則,由walName實(shí)現(xiàn),格式為"序號(hào)--raft日志索引.wal"。
- // 根據(jù)seq和index產(chǎn)生wal文件名
- func walName(seq, index uint64) string {
- return fmt.Sprintf("%016x-%016x.wal", seq, index)
- }
在創(chuàng)建的過(guò)程中,Create函數(shù)還向WAL日志中寫(xiě)入了兩條數(shù)據(jù),一條就是記錄metadata,一條是記錄snapshot,WAL中的數(shù)據(jù)都是以Record為單位保存的,結(jié)構(gòu)定義如下:
- // 存儲(chǔ)在wal穩(wěn)定存儲(chǔ)中的消息一共有兩種,這是第一種普通記錄的格式
- type Record struct {
- Type int64 `protobuf:"varint,1,opt,name=type" json:"type"`
- Crc uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
- Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
- XXX_unrecognized []byte `json:"-"`
- }
Record類(lèi)型
其中Type字段表示該Record的類(lèi)型,取值可以是以下幾種:
- const (
- metadataType int64 = iota + 1
- entryType
- stateType
- crcType
- snapshotType
- // warnSyncDuration is the amount of time allotted to an fsync before
- // logging a warning
- warnSyncDuration = time.Second
- )
對(duì)應(yīng)于raft中的Snapshot(應(yīng)用狀態(tài)機(jī)的Snapshot),WAL中也會(huì)記錄一些Snapshot的信息(但是它不會(huì)記錄完整的應(yīng)用狀態(tài)機(jī)的Snapshot數(shù)據(jù)),WAL中的Snapshot格式定義如下:
- // 存儲(chǔ)在wal中的第二種Record消息,snapshot
- type Snapshot struct {
- Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
- Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
- XXX_unrecognized []byte `json:"-"`
- }
在保存Snapshot的(注意這里的Snapshot是WAL里的Record類(lèi)型,不是raft中的應(yīng)用狀態(tài)機(jī)的Snapshot)SaveSnapshot函數(shù)中:
- // 持久化walpb.Snapshot
- func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
- // pb序列化,此時(shí)的e可為空的
- b := pbutil.MustMarshal(&e)
- w.mu.Lock()
- defer w.mu.Unlock()
- // 創(chuàng)建snapshotType類(lèi)型的record
- rec := &walpb.Record{Type: snapshotType, Data: b}
- // 持久化到wal中
- if err := w.encoder.encode(rec); err != nil {
- return err
- }
- // update enti only when snapshot is ahead of last index
- if w.enti < e.Index {
- // index of the last entry saved to the wal
- // e.Index來(lái)自應(yīng)用狀態(tài)機(jī)的Index
- w.enti = e.Index
- }
- // 同步刷新磁盤(pán)
- return w.sync()
- }
一條Record需要先把序列化后才能持久化,這個(gè)是通過(guò)encode函數(shù)完成的(encoder.go),代碼如下:
- // 將Record序列化后持久化到WAL文件
- func (e *encoder) encode(rec *walpb.Record) error {
- e.mu.Lock()
- defer e.mu.Unlock()
- e.crc.Write(rec.Data)
- // 生成數(shù)據(jù)的crc
- rec.Crc = e.crc.Sum32()
- var (
- data []byte
- err error
- n int
- )
- if rec.Size() > len(e.buf) {
- // 如果超過(guò)預(yù)分配的buf,就使用動(dòng)態(tài)分配
- data, err = rec.Marshal()
- if err != nil {
- return err
- }
- } else {
- // 否則就使用與分配的buf
- n, err = rec.MarshalTo(e.buf)
- if err != nil {
- return err
- }
- data = e.buf[:n]
- }
- lenField, padBytes := encodeFrameSize(len(data))
- // 先寫(xiě)recode編碼后的長(zhǎng)度
- if err = writeUint64(e.bw, lenField, e.uint64buf); err != nil {
- return err
- }
- if padBytes != 0 {
- // 如果有追加數(shù)據(jù)(對(duì)齊需求)
- data = append(data, make([]byte, padBytes)...)
- }
- // 寫(xiě)recode內(nèi)容
- _, err = e.bw.Write(data)
- return err
- }
從代碼可以看到,一個(gè)Record被序列化之后(這里為protobuf格式),會(huì)以一個(gè)Frame的格式持久化。Frame首先是一個(gè)長(zhǎng)度字段(encodeFrameSize完成,在encoder.go文件),64bit,56bit存數(shù)據(jù)。其中MSB表示這個(gè)Frame是否有padding字節(jié),接下來(lái)才是真正的序列化后的數(shù)據(jù)。一般一個(gè)page是4096字節(jié),對(duì)齊到8字節(jié),不會(huì)出現(xiàn)一個(gè)double被拆到兩個(gè)page的情況,在cache中,也不會(huì)被拆開(kāi):
- func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {
- lenField = uint64(dataBytes)
- // force 8 byte alignment so length never gets a torn write
- padBytes = (8 - (dataBytes % 8)) % 8
- if padBytes != 0 {
- // lenField的高56記錄padding的長(zhǎng)度
- lenField |= uint64(0x80|padBytes) << 56 // 最高位為1用于表示含有padding,方便在decode的時(shí)候判斷
- }
- return lenField, padBytes
- }
最終,下圖展示了包含了兩個(gè)WAL文件的示例圖。
filePipeline類(lèi)型
filePipeline采用“餓漢式”,即提前創(chuàng)建一些文件備用,這樣可以加快文件的創(chuàng)建速度。filePipeline它負(fù)責(zé)預(yù)創(chuàng)建日志文件并為日志文件預(yù)分配空間。在filePipeline中會(huì)啟動(dòng)一個(gè)獨(dú)立的后臺(tái)goroutine來(lái)創(chuàng)建“.tmp”結(jié)尾的臨時(shí)文件,當(dāng)進(jìn)行日志文件切換時(shí),直接將臨時(shí)文件進(jìn)行重命名即可使用。結(jié)構(gòu)體filePipeline中各個(gè)字段的含義如下。
dir(string類(lèi)型):存放臨時(shí)文件的目錄。
size (int64 類(lèi)型):創(chuàng)建臨時(shí)文件時(shí)預(yù)分配空間的大小,默認(rèn)是 64MB (由wal.SegmentSizeBytes指定,該值也是每個(gè)日志文件的大小)。
count(int類(lèi)型):當(dāng)前filePipeline實(shí)例創(chuàng)建的臨時(shí)文件數(shù)。
filec(chan*fileutil.LockedFile 類(lèi)型):新建的臨時(shí)文件句柄會(huì)通過(guò) filec 通道返回給WAL實(shí)例使用。
errc(chan error類(lèi)型):當(dāng)創(chuàng)建臨時(shí)文件出現(xiàn)異常時(shí),則將異常傳遞到errc通道中。
donec(chan struct{}類(lèi)型):當(dāng)filePipeline.Close()被調(diào)用時(shí)會(huì)關(guān)閉donec通道,從而通知filePipeline實(shí)例刪除最后一次創(chuàng)建的臨時(shí)文件。
在newFilePipeline()方法中,除了創(chuàng)建filePipeline實(shí)例,還會(huì)啟動(dòng)一個(gè)后臺(tái)goroutine來(lái)執(zhí)行filePipeline.run()方法,該后臺(tái)goroutine中會(huì)創(chuàng)建新的臨時(shí)文件并將其句柄傳遞到filec通道中。filePipeline.run()方法的具體實(shí)現(xiàn)如下:
- // filePipeline pipelines allocating disk space
- type filePipeline struct {
- // dir to put files
- dir string
- // size of files to make, in bytes
- size int64
- // count number of files generated
- count int
- filec chan *fileutil.LockedFile
- errc chan error
- donec chan struct{}
- }
- func newFilePipeline(dir string, fileSize int64) *filePipeline {
- fp := &filePipeline{
- dir: dir,
- size: fileSize,
- filec: make(chan *fileutil.LockedFile),
- errc: make(chan error, 1),
- donec: make(chan struct{}),
- }
- // 一直執(zhí)行預(yù)分配
- go fp.run()
- return fp
- }
- // Open returns a fresh file for writing. Rename the file before calling
- // Open again or there will be file collisions.
- func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
- select {
- case f = <-fp.filec: // 從filec通道中獲取文件描述符,并返回
- case err = <-fp.errc: // 如果創(chuàng)建失敗,從errc通道中獲取,并返回
- }
- return f, err
- }
- func (fp *filePipeline) Close() error {
- close(fp.donec)
- return <-fp.errc //出現(xiàn)錯(cuò)誤,關(guān)閉donec通道并向errc通到中發(fā)送錯(cuò)誤
- }
- func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
- // count % 2 so this file isn't the same as the one last published
- // 創(chuàng)建臨時(shí)文件的編號(hào)是0或者1。
- fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
- //創(chuàng)建臨時(shí)文件,注意文件的模式與權(quán)限。
- if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
- return nil, err
- }
- // 嘗試預(yù)分配,如果當(dāng)前文件系統(tǒng)不支持預(yù)分配空間,則不會(huì)報(bào)錯(cuò)。
- if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
- f.Close() //如果出現(xiàn)異常,則會(huì)關(guān)閉donec通道
- return nil, err
- }
- // 已經(jīng)分配的文件個(gè)數(shù)
- fp.count++
- return f, nil //返回創(chuàng)建的臨時(shí)文件
- }
- // goroutine
- func (fp *filePipeline) run() {
- defer close(fp.errc)
- for {
- // 調(diào)用alloc()執(zhí)行創(chuàng)建臨時(shí)文件
- f, err := fp.alloc()
- if err != nil {
- fp.errc <- err
- return
- }
- select {
- case fp.filec <- f: // 等待消費(fèi)方從這個(gè)channel中取出這個(gè)預(yù)創(chuàng)建的被鎖的文件
- case <-fp.donec: //關(guān)閉時(shí)觸發(fā),刪除最后一次創(chuàng)建的臨時(shí)文件
- os.Remove(f.Name())
- f.Close()
- return
- }
- }
- }
本文轉(zhuǎn)載自微信公眾號(hào)「 運(yùn)維開(kāi)發(fā)故事」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系 運(yùn)維開(kāi)發(fā)故事公眾號(hào)。