在 .NET 中實(shí)現(xiàn)發(fā)件箱模式的實(shí)戰(zhàn)
我們?cè)诜植际较到y(tǒng)中經(jīng)常面臨保持?jǐn)?shù)據(jù)庫(kù)和外部系統(tǒng)同步的挑戰(zhàn)。想象一下,將訂單保存在數(shù)據(jù)庫(kù)中,然后將消息發(fā)布到消息代理。任何一個(gè)操作失敗,系統(tǒng)都將處于不一致的狀態(tài)。
發(fā)件箱模式通過(guò)將消息發(fā)布視為數(shù)據(jù)庫(kù)事務(wù)的一部分來(lái)解決此問(wèn)題。我們并不直接發(fā)布消息,而是將消息保存到數(shù)據(jù)庫(kù)中的發(fā)件箱表中,以確保原子操作,然后通過(guò)單獨(dú)進(jìn)程可靠的發(fā)布消息。
本文將深入探討如何在 .NET 中實(shí)現(xiàn)這種模式。
為什么需要發(fā)件箱模式?
事務(wù)性發(fā)件箱模式修復(fù)了分布式系統(tǒng)中的一個(gè)常見(jiàn)問(wèn)題。當(dāng)我們需要同時(shí)做兩件事時(shí),就會(huì)出現(xiàn)這個(gè)問(wèn)題:保存數(shù)據(jù)并與外部組件通信。
考慮這樣的場(chǎng)景:發(fā)送訂單確認(rèn)郵件,通知其他系統(tǒng)有關(guān)新客戶注冊(cè)的信息,或在下訂單后更新庫(kù)存水平。每一種操作都涉及本地?cái)?shù)據(jù)變更以及外部數(shù)據(jù)通信或更新。
例如,想象某個(gè)微服務(wù)需要:
- 在數(shù)據(jù)庫(kù)中保存新訂單
- 告訴其他系統(tǒng)這個(gè)新訂單
如果其中某個(gè)步驟失敗,系統(tǒng)可能最終處于不一致?tīng)顟B(tài)。也許訂單被保存了,但是沒(méi)有其他人知道?;蛘呙總€(gè)人都認(rèn)為有新訂單,但數(shù)據(jù)庫(kù)里卻沒(méi)有。
下面是一個(gè)沒(méi)有發(fā)件箱模式的 CreateOrderCommandHandler:
public class CreateOrderCommandHandler(
IOrderRepository orderRepository,
IProductInventoryChecker inventoryChecker,
IUnitOfWork unitOfWork,
IEventBus eventBus) : IRequestHandler<CreateOrderCommand, OrderDto>
{
public async Task<OrderDto> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
{
var order = new Order(request.CustomerId, request.ProductId, request.Quantity, inventoryChecker);
await orderRepository.AddAsync(order);
await unitOfWork.CommitAsync(cancellationToken);
// 數(shù)據(jù)庫(kù)事務(wù)已完成
await eventBus.Send(new OrderCreatedIntegrationEvent(order.Id));
returnnew OrderDto { Id = order.Id, Total = order.Total };
}
}
這段代碼有潛在的一致性問(wèn)題。在提交數(shù)據(jù)庫(kù)事務(wù)之后,有兩件事可能出錯(cuò):
- 應(yīng)用程序可能在事務(wù)提交之后、事件發(fā)送之前崩潰。因此數(shù)據(jù)庫(kù)會(huì)創(chuàng)建訂單,但其他系統(tǒng)不會(huì)知道。
- 當(dāng)我們嘗試發(fā)送事件時(shí),事件總線可能已關(guān)閉或無(wú)法訪問(wèn)。這將導(dǎo)致在沒(méi)有通知其他系統(tǒng)的情況下創(chuàng)建訂單。
事務(wù)性發(fā)件箱模式通過(guò)確保將數(shù)據(jù)庫(kù)更新和事件發(fā)布視為單個(gè)原子操作來(lái)幫助解決此問(wèn)題。
流程圖說(shuō)明了發(fā)件箱模式如何解決一致性挑戰(zhàn)。我們沒(méi)有嘗試分別保存數(shù)據(jù)和發(fā)送消息,而是將訂單和發(fā)件箱消息保存在單個(gè)數(shù)據(jù)庫(kù)事務(wù)中。這是一個(gè)全部成功或全部失敗的操作,該操作不能以不一致的狀態(tài)結(jié)束。
單獨(dú)的發(fā)件箱進(jìn)程處理實(shí)際的消息發(fā)送。它持續(xù)檢查發(fā)件箱表中未發(fā)送的消息,并將其發(fā)布到消息隊(duì)列中。進(jìn)程在成功發(fā)布消息后將消息標(biāo)記為已發(fā)送,從而避免重復(fù)發(fā)送。
需要注意,發(fā)件箱模式提供了至少一次的交付。發(fā)件箱消息將至少發(fā)送一次,但也可以多次發(fā)送,用于重試。這意味著必須實(shí)現(xiàn)冪等的消息消費(fèi)。
發(fā)件箱模式實(shí)現(xiàn)
首先,創(chuàng)建發(fā)件箱表,將在其中存儲(chǔ)消息:
CREATE TABLE outbox_messages (
idUUID PRIMARY KEY,
typeVARCHAR(255) NOTNULL,
content JSONB NOTNULL,
occurred_on_utc TIMESTAMPWITHTIME ZONE NOTNULL,
processed_on_utc TIMESTAMPWITHTIME ZONE NULL,
errorTEXTNULL
);
-- 因?yàn)閷?huì)經(jīng)常查詢未處理的消息,因此可以考慮添加索引
-- 它將數(shù)據(jù)行按照我們查詢需要的正確順序排序。
CREATEINDEXIFNOTEXISTS idx_outbox_messages_unprocessed
ON outbox_messages (occurred_on_utc, processed_on_utc)
INCLUDE (id, type, content)
WHERE processed_on_utc ISNULL;
我用 PostgreSQL 作為示例數(shù)據(jù)庫(kù)。注意 content 列類型為 jsonb。如果將來(lái)需要,可以對(duì) JSON 數(shù)據(jù)進(jìn)行索引和查詢。
現(xiàn)在,我們創(chuàng)建一個(gè)表示發(fā)件箱條目的類:
public sealed class OutboxMessage
{
public Guid Id { get; init; }
public string Type { get; init; }
public string Content { get; init; }
public DateTime OccurredOnUtc { get; init; }
public DateTime? ProcessedOnUtc { get; init; }
public string? Error { get; init; }
}
下面將消息添加到發(fā)件箱:
public async Task AddToOutbox<T>(T message, NpgsqlDataSource dataSource)
{
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
OccurredOnUtc = DateTime.UtcNow,
Type = typeof(T).FullName, // 通過(guò)這種方式實(shí)現(xiàn)反序列化
Content = JsonSerializer.Serialize(message)
};
awaitusingvar connection = await dataSource.OpenConnectionAsync();
await connection.ExecuteAsync(
@"""
INSERT INTO outbox_messages (id, occurred_on_utc, type, content)
VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb)
""",
outboxMessage);
}
一種優(yōu)雅的實(shí)現(xiàn)方法是使用域事件來(lái)表示通知。當(dāng)域中發(fā)生重大事件時(shí),將觸發(fā)域事件。在完成事務(wù)之前,可以獲取所有事件并存儲(chǔ)為發(fā)件箱消息。我們可以通過(guò)工作單元或EF Core攔截器執(zhí)行此操作。
發(fā)件箱進(jìn)程
另一個(gè)組件是發(fā)件箱進(jìn)程,可以是物理上獨(dú)立的進(jìn)程,也可以是同一進(jìn)程中的后臺(tái)工作線程。
我用 Quartz 來(lái)調(diào)度處理發(fā)件箱的后臺(tái)作業(yè),這是一個(gè)健壯的庫(kù),對(duì)調(diào)度循環(huán)作業(yè)提供了出色的支持。
我們來(lái)實(shí)現(xiàn) OutboxProcessorJob:
[DisallowConcurrentExecution]
public class OutboxProcessorJob(
NpgsqlDataSource dataSource,
IPublishEndpoint publishEndpoint,
Assembly integrationEventsAssembly) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
awaitusingvar connection = await dataSource.OpenConnectionAsync();
awaitusingvar transaction = await connection.BeginTransactionAsync();
// You can make the limit a parameter, to control the batch size.
// We can also select just the id, type, and content columns.
var messages = await connection.QueryAsync<OutboxMessage>(
@"""
SELECT id AS Id, type AS Type, content AS Content
FROM outbox_messages
WHERE processed_on_utc IS NULL
ORDER BY occurred_on_utc LIMIT 100
""",
transaction: transaction);
foreach (var message in messages)
{
try
{
var messageType = integrationEventsAssembly.GetType(message.Type);
var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
// We should introduce retries here to improve reliability.
await publishEndpoint.Publish(deserializedMessage);
await connection.ExecuteAsync(
@"""
UPDATE outbox_messages
SET processed_on_utc = @ProcessedOnUtc
WHERE id = @Id
""",
new { ProcessedOnUtc = DateTime.UtcNow, message.Id },
transaction: transaction);
}
catch (Exception ex)
{
// We can also introduce error logging here.
await connection.ExecuteAsync(
@"""
UPDATE outbox_messages
SET processed_on_utc = @ProcessedOnUtc, error = @Error
WHERE id = @Id
""",
new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id },
transaction: transaction);
}
}
await transaction.CommitAsync();
}
}
這種方法使用輪詢定期從數(shù)據(jù)庫(kù)獲取未處理的消息。因?yàn)樾枰l繁查詢未處理消息,因此輪詢會(huì)增加數(shù)據(jù)庫(kù)負(fù)載。
處理發(fā)件箱消息的另一種方法是使用事務(wù)日志跟蹤,可以通過(guò) Postgres邏輯復(fù)制來(lái)實(shí)現(xiàn)。數(shù)據(jù)庫(kù)把更改從預(yù)寫(xiě)日志(Write-Ahead Log, WAL)流式傳輸?shù)綉?yīng)用程序,然后處理這些消息并發(fā)布到消息代理。通過(guò)這種方式可以實(shí)現(xiàn)基于推送的發(fā)件箱處理進(jìn)程。
權(quán)衡利弊
發(fā)件箱模式雖然有效,但引入了額外復(fù)雜性和數(shù)據(jù)庫(kù)寫(xiě)入。在高吞吐量系統(tǒng)中,很重要的一點(diǎn)是需要監(jiān)控性能以確保其不會(huì)成為瓶頸。
建議在發(fā)件箱處理進(jìn)程中實(shí)現(xiàn)重試機(jī)制,以提高可靠性??紤]對(duì)瞬態(tài)故障使用指數(shù)回退,對(duì)持久性問(wèn)題使用斷路器,以防止系統(tǒng)在中斷期間過(guò)載。
非常重要的一點(diǎn)是需要實(shí)現(xiàn)消息的冪等消費(fèi)。網(wǎng)絡(luò)問(wèn)題或處理器重啟可能導(dǎo)致多次傳遞同一消息,因此使用者必須安全的處理重復(fù)消息。
隨著時(shí)間推移,發(fā)件箱表可能會(huì)顯著增長(zhǎng),從而影響數(shù)據(jù)庫(kù)性能。盡早實(shí)現(xiàn)存檔策略是很重要的一點(diǎn),可以考慮將處理過(guò)的消息移動(dòng)到冷存儲(chǔ)或在一段時(shí)間后刪除。
擴(kuò)展發(fā)件箱處理進(jìn)程
隨著系統(tǒng)增長(zhǎng),可能單個(gè)發(fā)件箱處理進(jìn)程無(wú)法跟上消息數(shù)量的增長(zhǎng),從而導(dǎo)致發(fā)生錯(cuò)誤以及增加處理延遲。
一種直接的方法是增加發(fā)件箱處理作業(yè)的頻率,考慮每隔幾秒鐘運(yùn)行一次,可以顯著減少消息處理中的延遲。
另一種有效的策略是在獲取未處理消息時(shí)增加批處理大小。通過(guò)在每次運(yùn)行中處理更多消息,可以提高吞吐量。但是,要小心不要使批處理太大,以免導(dǎo)致長(zhǎng)時(shí)間運(yùn)行的事務(wù)。
對(duì)于大容量系統(tǒng),發(fā)件箱的并行處理可能非常有效。實(shí)現(xiàn)鎖定機(jī)制以聲明消息批次,從而允許多個(gè)處理進(jìn)程同時(shí)工作而不發(fā)生沖突。可以 SELECT…FOR UPDATE SKIP LOCKED 聲明一批消息。這種方法可以顯著提高處理能力。
總結(jié)
發(fā)件箱模式是維護(hù)分布式系統(tǒng)數(shù)據(jù)一致性的強(qiáng)大工具。通過(guò)將數(shù)據(jù)庫(kù)操作與消息發(fā)布分離,發(fā)件箱模式可確保系統(tǒng)即使在出現(xiàn)故障時(shí)也保持可靠。
記住保持消費(fèi)者冪等,實(shí)現(xiàn)適當(dāng)?shù)臄U(kuò)容策略,并管理好發(fā)件箱表的增長(zhǎng)。
雖然增加了一些復(fù)雜性,但保證消息傳遞的好處使其成為許多場(chǎng)景中有價(jià)值的模式。