事件流與事件溯源
事件流和事件溯源是事件驅(qū)動架構(gòu)中兩個相關(guān)但不同的概念。
事件流是持續(xù)捕獲和存儲系統(tǒng)中發(fā)生的事件的過程。這些事件可以實時處理和分析,也可以存儲以供后續(xù)分析。事件流通常用于需要實時處理大量數(shù)據(jù)的系統(tǒng),如金融交易系統(tǒng)或社交媒體平臺。
以下是使用流行的Kafka消息系統(tǒng)在Go中進行事件流處理的簡單示例:
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
)
func main() {
// 設(shè)置Kafka生產(chǎn)者以將事件發(fā)送到主題
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
})
// 發(fā)送一些事件到主題
writer.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("key1"),
Value: []byte("value1"),
},
kafka.Message{
Key: []byte("key2"),
Value: []byte("value2"),
},
)
// 設(shè)置Kafka消費者以從主題讀取事件
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
})
// 從主題讀取事件
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("Received message: key=%s, value=%s\n", string(msg.Key), string(msg.Value))
}
}
而事件溯源是一種構(gòu)建系統(tǒng)的模式,將應(yīng)用程序狀態(tài)的所有變化存儲為事件序列。這些事件然后可以用于在任何時間點重建應(yīng)用程序的狀態(tài)。事件溯源通常用于需要可審計性、可追溯性或合規(guī)性的系統(tǒng),如金融系統(tǒng)或醫(yī)療系統(tǒng)。
以下是在Go中使用內(nèi)存事件存儲進行事件溯源的簡單示例:
package main
import (
"fmt"
)
type Event struct {
Type string
Data interface{}
}
type EventStore struct {
events []Event
}
func (store *EventStore) Append(event Event) {
store.events = append(store.events, event)
}
func (store *EventStore) GetEvents() []Event {
return store.events
}
type Account struct {
id string
balance int
store *EventStore
}
func NewAccount(id string, store *EventStore) *Account {
return &Account{
id: id,
balance: 0,
store: store,
}
}
func (account *Account) Deposit(amount int) {
event := Event{
Type: "deposit",
Data: amount,
}
account.store.Append(event)
account.balance += amount
}
func (account *Account) Withdraw(amount int) {
if account.balance >= amount {
event := Event{
Type: "withdraw",
Data: amount,
}
account.store.Append(event)
account.balance -= amount
}
}
func (account *Account) GetBalance() int {
return account.balance
}
func main() {
store := &EventStore{}
account := NewAccount("123", store)
account.Deposit(100)
account.Withdraw(50)
account.Deposit(25)
events := store.GetEvents()
for _, event := range events {
switch event.Type {
case "deposit":
amount := event.Data.(int)
fmt.Printf("Deposited %d\n", amount)
case "withdraw":
amount := event.Data.(int)
fmt.Printf("Withdrew %d\n", amount)
}
}
fmt.Printf("Final balance: %d\n", account.GetBalance())
}
事件溯源是通過將每個對聚合的修改記錄為事件并將其追加到連續(xù)流中的一種方法。要重建聚合的最終狀態(tài),需要按順序讀取這些事件,然后將其應(yīng)用于聚合。這與在創(chuàng)建、讀取、更新和刪除(CRUD)系統(tǒng)中執(zhí)行的即時修改形成對比。在CRUD系統(tǒng)中,對記錄狀態(tài)的任何更改都存儲在數(shù)據(jù)庫中,實質(zhì)上覆蓋了同
一聚合的先前版本。
一旦價格變化已保存到Products表中,只更新了價格本身,而行的其余部分保持不變。然而,如圖5.1所示,這種方法導致了先前價格和更改背后的上下文的丟失。
為了保留不僅新價格還包括關(guān)鍵元數(shù)據(jù)(如調(diào)整原因)的信息,將更改記錄為Events表中的事件。先前的價格在先前事件中保持不變,以便在需要時檢索。
為了實現(xiàn)有效的事件溯源,建議使用提供強大一致性保證并使用樂觀并發(fā)控制的事件存儲。在實踐中,這意味著當多個修改同時發(fā)生時,只有初始修改才能附加到流中。隨后的修改可能需要重試或可能會失敗。