在 .NET 中實現(xiàn)發(fā)件箱模式的實戰(zhàn)
我們在分布式系統(tǒng)中經(jīng)常面臨保持數(shù)據(jù)庫和外部系統(tǒng)同步的挑戰(zhàn)。想象一下,將訂單保存在數(shù)據(jù)庫中,然后將消息發(fā)布到消息代理。任何一個操作失敗,系統(tǒng)都將處于不一致的狀態(tài)。
發(fā)件箱模式通過將消息發(fā)布視為數(shù)據(jù)庫事務的一部分來解決此問題。我們并不直接發(fā)布消息,而是將消息保存到數(shù)據(jù)庫中的發(fā)件箱表中,以確保原子操作,然后通過單獨進程可靠的發(fā)布消息。
本文將深入探討如何在 .NET 中實現(xiàn)這種模式。

為什么需要發(fā)件箱模式?
事務性發(fā)件箱模式修復了分布式系統(tǒng)中的一個常見問題。當我們需要同時做兩件事時,就會出現(xiàn)這個問題:保存數(shù)據(jù)并與外部組件通信。
考慮這樣的場景:發(fā)送訂單確認郵件,通知其他系統(tǒng)有關(guān)新客戶注冊的信息,或在下訂單后更新庫存水平。每一種操作都涉及本地數(shù)據(jù)變更以及外部數(shù)據(jù)通信或更新。
例如,想象某個微服務需要:
- 在數(shù)據(jù)庫中保存新訂單
- 告訴其他系統(tǒng)這個新訂單
如果其中某個步驟失敗,系統(tǒng)可能最終處于不一致狀態(tài)。也許訂單被保存了,但是沒有其他人知道。或者每個人都認為有新訂單,但數(shù)據(jù)庫里卻沒有。
下面是一個沒有發(fā)件箱模式的 CreateOrderCommandHandler:
public class CreateOrderCommandHandler(
IOrderRepository orderRepository,
IProductInventoryChecker inventoryChecker,
IUnitOfWork unitOfWork,
IEventBus eventBus) : IRequestHandler<CreateOrderCommand, OrderDto>
{
public async Task<OrderDto> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
{
var order = new Order(request.CustomerId, request.ProductId, request.Quantity, inventoryChecker);
await orderRepository.AddAsync(order);
await unitOfWork.CommitAsync(cancellationToken);
// 數(shù)據(jù)庫事務已完成
await eventBus.Send(new OrderCreatedIntegrationEvent(order.Id));
returnnew OrderDto { Id = order.Id, Total = order.Total };
}
}這段代碼有潛在的一致性問題。在提交數(shù)據(jù)庫事務之后,有兩件事可能出錯:
- 應用程序可能在事務提交之后、事件發(fā)送之前崩潰。因此數(shù)據(jù)庫會創(chuàng)建訂單,但其他系統(tǒng)不會知道。
- 當我們嘗試發(fā)送事件時,事件總線可能已關(guān)閉或無法訪問。這將導致在沒有通知其他系統(tǒng)的情況下創(chuàng)建訂單。
事務性發(fā)件箱模式通過確保將數(shù)據(jù)庫更新和事件發(fā)布視為單個原子操作來幫助解決此問題。

流程圖說明了發(fā)件箱模式如何解決一致性挑戰(zhàn)。我們沒有嘗試分別保存數(shù)據(jù)和發(fā)送消息,而是將訂單和發(fā)件箱消息保存在單個數(shù)據(jù)庫事務中。這是一個全部成功或全部失敗的操作,該操作不能以不一致的狀態(tài)結(jié)束。
單獨的發(fā)件箱進程處理實際的消息發(fā)送。它持續(xù)檢查發(fā)件箱表中未發(fā)送的消息,并將其發(fā)布到消息隊列中。進程在成功發(fā)布消息后將消息標記為已發(fā)送,從而避免重復發(fā)送。
需要注意,發(fā)件箱模式提供了至少一次的交付。發(fā)件箱消息將至少發(fā)送一次,但也可以多次發(fā)送,用于重試。這意味著必須實現(xiàn)冪等的消息消費。
發(fā)件箱模式實現(xiàn)
首先,創(chuàng)建發(fā)件箱表,將在其中存儲消息:
CREATE TABLE outbox_messages (
idUUID PRIMARY KEY,
typeVARCHAR(255) NOTNULL,
content JSONB NOTNULL,
occurred_on_utc TIMESTAMPWITHTIME ZONE NOTNULL,
processed_on_utc TIMESTAMPWITHTIME ZONE NULL,
errorTEXTNULL
);
-- 因為將會經(jīng)常查詢未處理的消息,因此可以考慮添加索引
-- 它將數(shù)據(jù)行按照我們查詢需要的正確順序排序。
CREATEINDEXIFNOTEXISTS idx_outbox_messages_unprocessed
ON outbox_messages (occurred_on_utc, processed_on_utc)
INCLUDE (id, type, content)
WHERE processed_on_utc ISNULL;我用 PostgreSQL 作為示例數(shù)據(jù)庫。注意 content 列類型為 jsonb。如果將來需要,可以對 JSON 數(shù)據(jù)進行索引和查詢。
現(xiàn)在,我們創(chuàng)建一個表示發(fā)件箱條目的類:
public sealed class OutboxMessage
{
public Guid Id { get; init; }
public string Type { get; init; }
public string Content { get; init; }
public DateTime OccurredOnUtc { get; init; }
public DateTime? ProcessedOnUtc { get; init; }
public string? Error { get; init; }
}下面將消息添加到發(fā)件箱:
public async Task AddToOutbox<T>(T message, NpgsqlDataSource dataSource)
{
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
OccurredOnUtc = DateTime.UtcNow,
Type = typeof(T).FullName, // 通過這種方式實現(xiàn)反序列化
Content = JsonSerializer.Serialize(message)
};
awaitusingvar connection = await dataSource.OpenConnectionAsync();
await connection.ExecuteAsync(
@"""
INSERT INTO outbox_messages (id, occurred_on_utc, type, content)
VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb)
""",
outboxMessage);
}一種優(yōu)雅的實現(xiàn)方法是使用域事件來表示通知。當域中發(fā)生重大事件時,將觸發(fā)域事件。在完成事務之前,可以獲取所有事件并存儲為發(fā)件箱消息。我們可以通過工作單元或EF Core攔截器執(zhí)行此操作。
發(fā)件箱進程
另一個組件是發(fā)件箱進程,可以是物理上獨立的進程,也可以是同一進程中的后臺工作線程。
我用 Quartz 來調(diào)度處理發(fā)件箱的后臺作業(yè),這是一個健壯的庫,對調(diào)度循環(huán)作業(yè)提供了出色的支持。
我們來實現(xiàn) OutboxProcessorJob:
[DisallowConcurrentExecution]
public class OutboxProcessorJob(
NpgsqlDataSource dataSource,
IPublishEndpoint publishEndpoint,
Assembly integrationEventsAssembly) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
awaitusingvar connection = await dataSource.OpenConnectionAsync();
awaitusingvar transaction = await connection.BeginTransactionAsync();
// You can make the limit a parameter, to control the batch size.
// We can also select just the id, type, and content columns.
var messages = await connection.QueryAsync<OutboxMessage>(
@"""
SELECT id AS Id, type AS Type, content AS Content
FROM outbox_messages
WHERE processed_on_utc IS NULL
ORDER BY occurred_on_utc LIMIT 100
""",
transaction: transaction);
foreach (var message in messages)
{
try
{
var messageType = integrationEventsAssembly.GetType(message.Type);
var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
// We should introduce retries here to improve reliability.
await publishEndpoint.Publish(deserializedMessage);
await connection.ExecuteAsync(
@"""
UPDATE outbox_messages
SET processed_on_utc = @ProcessedOnUtc
WHERE id = @Id
""",
new { ProcessedOnUtc = DateTime.UtcNow, message.Id },
transaction: transaction);
}
catch (Exception ex)
{
// We can also introduce error logging here.
await connection.ExecuteAsync(
@"""
UPDATE outbox_messages
SET processed_on_utc = @ProcessedOnUtc, error = @Error
WHERE id = @Id
""",
new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id },
transaction: transaction);
}
}
await transaction.CommitAsync();
}
}這種方法使用輪詢定期從數(shù)據(jù)庫獲取未處理的消息。因為需要頻繁查詢未處理消息,因此輪詢會增加數(shù)據(jù)庫負載。
處理發(fā)件箱消息的另一種方法是使用事務日志跟蹤,可以通過 Postgres邏輯復制來實現(xiàn)。數(shù)據(jù)庫把更改從預寫日志(Write-Ahead Log, WAL)流式傳輸?shù)綉贸绦?,然后處理這些消息并發(fā)布到消息代理。通過這種方式可以實現(xiàn)基于推送的發(fā)件箱處理進程。
權(quán)衡利弊
發(fā)件箱模式雖然有效,但引入了額外復雜性和數(shù)據(jù)庫寫入。在高吞吐量系統(tǒng)中,很重要的一點是需要監(jiān)控性能以確保其不會成為瓶頸。
建議在發(fā)件箱處理進程中實現(xiàn)重試機制,以提高可靠性。考慮對瞬態(tài)故障使用指數(shù)回退,對持久性問題使用斷路器,以防止系統(tǒng)在中斷期間過載。
非常重要的一點是需要實現(xiàn)消息的冪等消費。網(wǎng)絡(luò)問題或處理器重啟可能導致多次傳遞同一消息,因此使用者必須安全的處理重復消息。
隨著時間推移,發(fā)件箱表可能會顯著增長,從而影響數(shù)據(jù)庫性能。盡早實現(xiàn)存檔策略是很重要的一點,可以考慮將處理過的消息移動到冷存儲或在一段時間后刪除。
擴展發(fā)件箱處理進程
隨著系統(tǒng)增長,可能單個發(fā)件箱處理進程無法跟上消息數(shù)量的增長,從而導致發(fā)生錯誤以及增加處理延遲。
一種直接的方法是增加發(fā)件箱處理作業(yè)的頻率,考慮每隔幾秒鐘運行一次,可以顯著減少消息處理中的延遲。
另一種有效的策略是在獲取未處理消息時增加批處理大小。通過在每次運行中處理更多消息,可以提高吞吐量。但是,要小心不要使批處理太大,以免導致長時間運行的事務。
對于大容量系統(tǒng),發(fā)件箱的并行處理可能非常有效。實現(xiàn)鎖定機制以聲明消息批次,從而允許多個處理進程同時工作而不發(fā)生沖突??梢?nbsp;SELECT…FOR UPDATE SKIP LOCKED 聲明一批消息。這種方法可以顯著提高處理能力。
總結(jié)
發(fā)件箱模式是維護分布式系統(tǒng)數(shù)據(jù)一致性的強大工具。通過將數(shù)據(jù)庫操作與消息發(fā)布分離,發(fā)件箱模式可確保系統(tǒng)即使在出現(xiàn)故障時也保持可靠。
記住保持消費者冪等,實現(xiàn)適當?shù)臄U容策略,并管理好發(fā)件箱表的增長。
雖然增加了一些復雜性,但保證消息傳遞的好處使其成為許多場景中有價值的模式。






















