使用 Go 與 Redis Streams 構(gòu)建可靠的事件驅(qū)動(dòng)系統(tǒng)
事件驅(qū)動(dòng)架構(gòu)在現(xiàn)代軟件系統(tǒng)中十分常見,它讓各組件能夠異步通信。傳統(tǒng)實(shí)現(xiàn)通常借助 Kafka、Google Pub/Sub 或 Amazon SQS 等消息中間件,但在某些場(chǎng)景下,我們可能想用更輕量又足夠可靠的方案進(jìn)行學(xué)習(xí)或滿足定制需求。
本文演示如何基于 Golang + Redis Streams 搭建一個(gè)高可靠性的事件驅(qū)動(dòng)系統(tǒng)。

為什么事件驅(qū)動(dòng)系統(tǒng)需要“可靠性”
在很多業(yè)務(wù)里,丟失事件是不可接受的。以告警系統(tǒng)為例,若漏掉一次關(guān)鍵告警,可能導(dǎo)致宕機(jī)、數(shù)據(jù)泄露或交易失敗。因而系統(tǒng)必須滿足:
- 持久化(Durability):事件在被處理前必須保存下來;
- 確認(rèn)與重試(Ack & Retry):消費(fèi)失敗不能導(dǎo)致事件丟失;
- 可擴(kuò)展(Scalability):支持多生產(chǎn)者、多消費(fèi)者并發(fā)處理。
為什么選 Redis Streams 而非 Pub/Sub?
Redis 原生 Pub/Sub 只做即時(shí)推送,訂閱者離線時(shí)消息直接丟棄;而 Redis Streams 提供:
- 消息持久化;
- Consumer Group,便于水平擴(kuò)展;
- 消息確認(rèn)及重放;
- 高效處理大規(guī)模事件。
系統(tǒng)架構(gòu)
- 事件生產(chǎn)者:產(chǎn)生事件并寫入 Redis Stream;
- Redis Streams:中央事件存儲(chǔ)與消息分發(fā);
- 事件消費(fèi)者:讀取、處理并確認(rèn)事件。
Golang 實(shí)現(xiàn)
(1) 啟動(dòng) Redis
redis-server(2) 安裝 Go 客戶端
go get github.com/redis/go-redis/v9(3) 事件生產(chǎn)者
package main
import (
"context"
"fmt"
"log"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
event := map[string]interface{}{"message": "Critical alert! Server down."}
_, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "alerts",
Values: event,
}).Result()
if err != nil {
log.Fatalf("發(fā)布事件失敗: %v", err)
}
fmt.Println("事件發(fā)布成功")
}(4) 事件消費(fèi)者
package main
import (
"context"
"fmt"
"log"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
for {
res, err := client.XRead(ctx, &redis.XReadArgs{
Streams: []string{"alerts", "$"}, // "$" 表示從最新位置開始
Count: 1,
Block: 0, // 阻塞等待
}).Result()
if err != nil {
log.Fatalf("讀取事件失敗: %v", err)
}
for _, stream := range res {
for _, msg := range stream.Messages {
fmt.Printf("處理事件: %v\n", msg.Values)
}
}
}
}走向生產(chǎn)的強(qiáng)化點(diǎn)
盡管這只是一個(gè)簡(jiǎn)單的演示,真正用于生產(chǎn)的版本還應(yīng)包含以下功能:
- 錯(cuò)誤處理與重試:在失敗時(shí)實(shí)現(xiàn)指數(shù)退避重試機(jī)制;
- 消費(fèi)者組:將負(fù)載分配給多個(gè)消費(fèi)者以實(shí)現(xiàn)并行處理;
- 監(jiān)控與日志:持續(xù)追蹤事件處理的各項(xiàng)指標(biāo);
- 持久化與備份:?jiǎn)⒂么疟P持久化,防止數(shù)據(jù)丟失并支持備份。
結(jié)語
借助 Redis Streams + Golang,我們可以構(gòu)建一個(gè)具備持久化、確認(rèn)機(jī)制和水平擴(kuò)展能力的輕量事件驅(qū)動(dòng)系統(tǒng),非常適合學(xué)習(xí)及小型高可用場(chǎng)景。

































