動(dòng)手實(shí)現(xiàn)一個(gè)Localcache-實(shí)現(xiàn)篇
前言
哈嘍,大家好,我是asong,經(jīng)過了前面兩篇的介紹,我們已經(jīng)基本了解該如何設(shè)計(jì)一個(gè)本地緩存了,本文就是這個(gè)系列的終結(jié)篇,自己動(dòng)手實(shí)現(xiàn)一個(gè)本地緩存,接下來且聽我細(xì)細(xì)道來!!!
本文代碼已經(jīng)上傳到github:https://github.com/asong2020/go-localcache
現(xiàn)在這一版本算是一個(gè)1.0,后續(xù)會(huì)繼續(xù)進(jìn)行優(yōu)化和迭代。
第一步:抽象接口
第一步很重要,以面向接口編程為原則,我們先抽象出來要暴露給用戶的方法,給用戶提供簡(jiǎn)單易懂的方法,因此我抽象出來的結(jié)果如下:
- // ICache abstract interface
- type ICache interface {
- // Set value use default expire time. default does not expire.
- Set(key string, value []byte) error
- // Get value if find it. if value already expire will delete.
- Get(key string) ([]byte, error)
- // SetWithTime set value with expire time
- SetWithTime(key string, value []byte, expired time.Duration) error
- // Delete manual removes the key
- Delete(key string) error
- // Len computes number of entries in cache
- Len() int
- // Capacity returns amount of bytes store in the cache.
- Capacity() int
- // Close is used to signal a shutdown of the cache when you are done with it.
- // This allows the cleaning goroutines to exit and ensures references are not
- // kept to the cache preventing GC of the entire cache.
- Close() error
- // Stats returns cache's statistics
- Stats() Stats
- // GetKeyHit returns key hit
- GetKeyHit(key string) int64
- }
- Set(key string, value []byte):使用該方法存儲(chǔ)的數(shù)據(jù)使用默認(rèn)的過期時(shí)間,如果清除過期的異步任務(wù)沒有enable,那么就永不過期,否則默認(rèn)過期時(shí)間為10min。
- Get(key string) ([]byte, error):根據(jù)key獲取對(duì)象內(nèi)容,如果數(shù)據(jù)過期了會(huì)在這一步刪除。
- SetWithTime(key string, value []byte, expired time.Duration):存儲(chǔ)對(duì)象是使用自定義過期時(shí)間
- Delete(key string) error:根據(jù)key刪除對(duì)應(yīng)的緩存數(shù)據(jù)
- Len() int:獲取緩存的對(duì)象數(shù)量
- Capacity() int:獲取當(dāng)前緩存的容量
- Close() error:關(guān)閉緩存
- Stats() Stats:緩存監(jiān)控?cái)?shù)據(jù)
- GetKeyHit(key string) int64:獲取key的命中率數(shù)據(jù)
第二步:定義緩存對(duì)象
第一步我們抽象好了接口,下面就要定義一個(gè)緩存對(duì)象實(shí)例實(shí)現(xiàn)接口,先看定義結(jié)構(gòu):
- type cache struct {
- // hashFunc represents used hash func
- hashFunc HashFunc
- // bucketCount represents the number of segments within a cache instance. value must be a power of two.
- bucketCount uint64
- // bucketMask is bitwise AND applied to the hashVal to find the segment id.
- bucketMask uint64
- // segment is shard
- segments []*segment
- // segment lock
- locks []sync.RWMutex
- // close cache
- close chan struct{}
- }
- hashFunc:分片要用的哈希函數(shù),用戶可以自行定義,實(shí)現(xiàn)HashFunc接口即可,默認(rèn)使用fnv算法。
- bucketCount:分片的數(shù)量,一定要是偶數(shù),默認(rèn)分片數(shù)為256。
- bucketMask:因?yàn)榉制瑪?shù)是偶數(shù),所以可以分片時(shí)可以使用位運(yùn)算代替取余提升性能效率,hashValue % bucketCount == hashValue & bucketCount - 1。
- segments:分片對(duì)象,每個(gè)分片的對(duì)象結(jié)構(gòu)我們?cè)诤竺娼榻B。
- locks:每個(gè)分片的讀寫鎖
- close:關(guān)閉緩存對(duì)象時(shí)通知其他goroutine暫停
接下來我們來寫cache對(duì)象的構(gòu)造函數(shù):
- // NewCache constructor cache instance
- func NewCache(opts ...Opt) (ICache, error) {
- options := &options{
- hashFunc: NewDefaultHashFunc(),
- bucketCount: defaultBucketCount,
- maxBytes: defaultMaxBytes,
- cleanTime: defaultCleanTIme,
- statsEnabled: defaultStatsEnabled,
- cleanupEnabled: defaultCleanupEnabled,
- }
- for _, each := range opts{
- each(options)
- }
- if !isPowerOfTwo(options.bucketCount){
- return nil, errShardCount
- }
- if options.maxBytes <= 0 {
- return nil, ErrBytes
- }
- segments := make([]*segment, options.bucketCount)
- locks := make([]sync.RWMutex, options.bucketCount)
- maxSegmentBytes := (options.maxBytes + options.bucketCount - 1) / options.bucketCount
- for index := range segments{
- segments[index] = newSegment(maxSegmentBytes, options.statsEnabled)
- }
- c := &cache{
- hashFunc: options.hashFunc,
- bucketCount: options.bucketCount,
- bucketMask: options.bucketCount - 1,
- segments: segments,
- locks: locks,
- close: make(chan struct{}),
- }
- if options.cleanupEnabled {
- go c.cleanup(options.cleanTime)
- }
- return c, nil
- }
這里為了更好的擴(kuò)展,我們使用Options編程模式,我們的構(gòu)造函數(shù)主要做三件事:
- 前置參數(shù)檢查,對(duì)于外部傳入的參數(shù),我們還是要做基本的校驗(yàn)
- 分片對(duì)象初始化
- 構(gòu)造緩存對(duì)象
這里構(gòu)造緩存對(duì)象時(shí)我們要先計(jì)算每個(gè)分片的容量,默認(rèn)整個(gè)本地緩存256M的數(shù)據(jù),然后在平均分到每一片區(qū)內(nèi),用戶可以自行選擇要緩存的數(shù)據(jù)大小。
第三步:定義分片結(jié)構(gòu)
每個(gè)分片結(jié)構(gòu)如下:
- type segment struct {
- hashmap map[uint64]uint32
- entries buffer.IBuffer
- clock clock
- evictList *list.List
- stats IStats
- }
- hashmp:存儲(chǔ)key所對(duì)應(yīng)的存儲(chǔ)索引
- entries:存儲(chǔ)key/value的底層結(jié)構(gòu),我們?cè)诘谒牟降臅r(shí)候介紹,也是代碼的核心部分。
- clock:定義時(shí)間方法
- evicList:這里我們使用一個(gè)隊(duì)列來記錄old索引,當(dāng)容量不足時(shí)進(jìn)行刪除(臨時(shí)解決方案,當(dāng)前存儲(chǔ)結(jié)構(gòu)不適合使用LRU淘汰算法)
- stats:緩存的監(jiān)控?cái)?shù)據(jù)。
接下來我們?cè)賮砜匆幌旅總€(gè)分片的構(gòu)造函數(shù):
- func newSegment(bytes uint64, statsEnabled bool) *segment {
- if bytes == 0 {
- panic(fmt.Errorf("bytes cannot be zero"))
- }
- if bytes >= maxSegmentSize{
- panic(fmt.Errorf("too big bytes=%d; should be smaller than %d", bytes, maxSegmentSize))
- }
- capacity := (bytes + segmentSize - 1) / segmentSize
- entries := buffer.NewBuffer(int(capacity))
- entries.Reset()
- return &segment{
- entries: entries,
- hashmap: make(map[uint64]uint32),
- clock: &systemClock{},
- evictList: list.New(),
- stats: newStats(statsEnabled),
- }
- }
這里主要注意一點(diǎn):
我們要根據(jù)每個(gè)片區(qū)的緩存數(shù)據(jù)大小來計(jì)算出容量,與上文的緩存對(duì)象初始化步驟對(duì)應(yīng)上了。
第四步:定義緩存結(jié)構(gòu)
緩存對(duì)象現(xiàn)在也構(gòu)造好了,接下來就是本地緩存的核心:定義緩存結(jié)構(gòu)。
bigcache、fastcache、freecache都使用字節(jié)數(shù)組代替map存儲(chǔ)緩存數(shù)據(jù),從而減少GC壓力,所以我們也可以借鑒其思想繼續(xù)保持使用字節(jié)數(shù)組,這里我們使用二維字節(jié)切片存儲(chǔ)緩存數(shù)據(jù)key/value;畫個(gè)圖表示一下:
使用二維數(shù)組存儲(chǔ)數(shù)據(jù)的相比于bigcache的優(yōu)勢(shì)在于可以直接根據(jù)索引刪除對(duì)應(yīng)的數(shù)據(jù),雖然也會(huì)有蟲洞的問題,但是我們可以記錄下來蟲洞的索引,不斷填充。
每個(gè)緩存的封裝結(jié)構(gòu)如下:
基本思想已經(jīng)明確,接下來看一下我們對(duì)存儲(chǔ)層的封裝:
- type Buffer struct {
- array [][]byte
- capacity int
- index int
- // maxCount = capacity - 1
- count int
- // availableSpace If any objects are removed after the buffer is full, the idle index is logged.
- // Avoid array "wormhole"
- availableSpace map[int]struct{}
- // placeholder record the index that buffer has stored.
- placeholder map[int]struct{}
- }
- array [][]byte:存儲(chǔ)緩存對(duì)象的二維切片
- capacity:緩存結(jié)構(gòu)的最大容量
- index:索引,記錄緩存所在的位置的索引
- count:記錄緩存數(shù)量
- availableSpace:記錄"蟲洞",當(dāng)緩存對(duì)象被刪除時(shí)記錄下空閑位置的索引,方便后面容量滿了后使用"蟲洞"
- placeholder:記錄緩存對(duì)象的索引,迭代清除過期緩存可以用上。
向buffer寫入數(shù)據(jù)的流程(不貼代碼了):
第五步:完善向緩存寫入數(shù)據(jù)方法
上面我們定義好了所有需要的結(jié)構(gòu),接下來就是填充我們的寫入緩存方法就可以了:
- func (c *cache) Set(key string, value []byte) error {
- hashKey := c.hashFunc.Sum64(key)
- bucketIndex := hashKey&c.bucketMask
- c.locks[bucketIndex].Lock()
- defer c.locks[bucketIndex].Unlock()
- err := c.segments[bucketIndex].set(key, hashKey, value, defaultExpireTime)
- return err
- }
- func (s *segment) set(key string, hashKey uint64, value []byte, expireTime time.Duration) error {
- if expireTime <= 0{
- return ErrExpireTimeInvalid
- }
- expireAt := uint64(s.clock.Epoch(expireTime))
- if previousIndex, ok := s.hashmap[hashKey]; ok {
- if err := s.entries.Remove(int(previousIndex)); err != nil{
- return err
- }
- delete(s.hashmap, hashKey)
- }
- entry := wrapEntry(expireAt, key, hashKey, value)
- for {
- index, err := s.entries.Push(entry)
- if err == nil {
- s.hashmap[hashKey] = uint32(index)
- s.evictList.PushFront(index)
- return nil
- }
- ele := s.evictList.Back()
- if err := s.entries.Remove(ele.Value.(int)); err != nil{
- return err
- }
- s.evictList.Remove(ele)
- }
- }
流程分析如下:
根據(jù)key計(jì)算哈希值,然后根據(jù)分片數(shù)獲取對(duì)應(yīng)分片位置
如果當(dāng)前緩存中存在相同的key,則先刪除,在重新插入,會(huì)刷新過期時(shí)間
封裝存儲(chǔ)結(jié)構(gòu),根據(jù)過期時(shí)間戳、key長(zhǎng)度、哈希大小、緩存對(duì)象進(jìn)行封裝
將數(shù)據(jù)存入緩存,如果緩存失敗,移除最老的數(shù)據(jù)后再次重試
第六步:完善從緩存讀取數(shù)據(jù)方法
第一步根據(jù)key計(jì)算哈希值,再根據(jù)分片數(shù)獲取對(duì)應(yīng)的分片位置:
- func (c *cache) Get(key string) ([]byte, error) {
- hashKey := c.hashFunc.Sum64(key)
- bucketIndex := hashKey&c.bucketMask
- c.locks[bucketIndex].RLock()
- defer c.locks[hashKey&c.bucketMask].RUnlock()
- entry, err := c.segments[bucketIndex].get(key, hashKey)
- if err != nil{
- return nil, err
- }
- return entry,nil
- }
第二步執(zhí)行分片方法獲取緩存數(shù)據(jù):
- 先根據(jù)哈希值判斷key是否存在于緩存中,不存返回key沒有找到
- 從緩存中讀取數(shù)據(jù)得到緩存中的key判斷是否發(fā)生哈希沖突
- 判斷緩存對(duì)象是否過期,過期刪除緩存數(shù)據(jù)(可以根據(jù)業(yè)務(wù)優(yōu)化需要是否返回當(dāng)前過期數(shù)據(jù))
- 在每個(gè)記錄緩存監(jiān)控?cái)?shù)據(jù)
- func (s *segment) getWarpEntry(key string, hashKey uint64) ([]byte,error) {
- index, ok := s.hashmap[hashKey]
- if !ok {
- s.stats.miss()
- return nil, ErrEntryNotFound
- }
- entry, err := s.entries.Get(int(index))
- if err != nil{
- s.stats.miss()
- return nil, err
- }
- if entry == nil{
- s.stats.miss()
- return nil, ErrEntryNotFound
- }
- if entryKey := readKeyFromEntry(entry); key != entryKey {
- s.stats.collision()
- return nil, ErrEntryNotFound
- }
- return entry, nil
- }
- func (s *segment) get(key string, hashKey uint64) ([]byte, error) {
- currentTimestamp := s.clock.TimeStamp()
- entry, err := s.getWarpEntry(key, hashKey)
- if err != nil{
- return nil, err
- }
- res := readEntry(entry)
- expireAt := int64(readExpireAtFromEntry(entry))
- if currentTimestamp - expireAt >= 0{
- _ = s.entries.Remove(int(s.hashmap[hashKey]))
- delete(s.hashmap, hashKey)
- return nil, ErrEntryNotFound
- }
- s.stats.hit(key)
- return res, nil
- }
第七步:來個(gè)測(cè)試用例體驗(yàn)一下
先來個(gè)簡(jiǎn)單的測(cè)試用例測(cè)試一下:
- func (h *cacheTestSuite) TestSetAndGet() {
- cache, err := NewCache()
- assert.Equal(h.T(), nil, err)
- key := "asong"
- value := []byte("公眾號(hào):Golang夢(mèng)工廠")
- err = cache.Set(key, value)
- assert.Equal(h.T(), nil, err)
- res, err := cache.Get(key)
- assert.Equal(h.T(), nil, err)
- assert.Equal(h.T(), value, res)
- h.T().Logf("get value is %s", string(res))
- }
運(yùn)行結(jié)果:
- === RUN TestCacheTestSuite
- === RUN TestCacheTestSuite/TestSetAndGet
- cache_test.go:33: get value is 公眾號(hào):Golang夢(mèng)工廠
- --- PASS: TestCacheTestSuite (0.00s)
- --- PASS: TestCacheTestSuite/TestSetAndGet (0.00s)
- PASS
大功告成,基本功能通了,剩下就是跑基準(zhǔn)測(cè)試、優(yōu)化、迭代了(不在文章贅述了,可以關(guān)注github倉(cāng)庫最新動(dòng)態(tài))。
參考文章
- https://github.com/allegro/bigcache
- https://github.com/VictoriaMetrics/fastcache
- https://github.com/coocood/freecache
- https://github.com/patrickmn/go-cache
總結(jié)
實(shí)現(xiàn)篇到這里就結(jié)束了,但是這個(gè)項(xiàng)目的編碼仍未結(jié)束,我會(huì)繼續(xù)以此版本為基礎(chǔ)不斷迭代優(yōu)化,該本地緩存的優(yōu)點(diǎn):
- 實(shí)現(xiàn)簡(jiǎn)單、提供給用戶的方法簡(jiǎn)單易懂
- 使用二維切片作為存儲(chǔ)結(jié)構(gòu),避免了不能刪除底層數(shù)據(jù)的缺點(diǎn),也在一定程度上避免了"蟲洞"問題。
- 測(cè)試用例齊全,適合作為小白的入門項(xiàng)目
待優(yōu)化點(diǎn):
- 沒有使用高效的緩存淘汰算法,可能會(huì)導(dǎo)致熱點(diǎn)數(shù)據(jù)被頻繁刪除
- 定時(shí)刪除過期數(shù)據(jù)會(huì)導(dǎo)致鎖持有時(shí)間過長(zhǎng),需要優(yōu)化
- 關(guān)閉緩存實(shí)例需要優(yōu)化處理方式
- 根據(jù)業(yè)務(wù)場(chǎng)景進(jìn)行優(yōu)化(特定業(yè)務(wù)場(chǎng)景)
迭代點(diǎn):
- 添加異步加載緩存功能
- ...... (思考中)
本文代碼已經(jīng)上傳到github:https://github.com/asong2020/go-localcache
好啦,本文到這里就結(jié)束了,我是asong,我們下期見。


































