偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

在 .NET 中實現(xiàn)發(fā)件箱模式的實戰(zhàn)

開發(fā) 數(shù)據(jù)庫
本文介紹了發(fā)件箱模式在.NET中的實現(xiàn),討論了該模式的利弊和擴容機制,在本文的基礎(chǔ)上也能比較方便的實現(xiàn)基于其他語言的發(fā)件箱模式。

我們在分布式系統(tǒng)中經(jīng)常面臨保持數(shù)據(jù)庫和外部系統(tǒng)同步的挑戰(zhàn)。想象一下,將訂單保存在數(shù)據(jù)庫中,然后將消息發(fā)布到消息代理。任何一個操作失敗,系統(tǒng)都將處于不一致的狀態(tài)。

發(fā)件箱模式通過將消息發(fā)布視為數(shù)據(jù)庫事務的一部分來解決此問題。我們并不直接發(fā)布消息,而是將消息保存到數(shù)據(jù)庫中的發(fā)件箱表中,以確保原子操作,然后通過單獨進程可靠的發(fā)布消息。

本文將深入探討如何在 .NET 中實現(xiàn)這種模式。

為什么需要發(fā)件箱模式?

事務性發(fā)件箱模式修復了分布式系統(tǒng)中的一個常見問題。當我們需要同時做兩件事時,就會出現(xiàn)這個問題:保存數(shù)據(jù)并與外部組件通信。

考慮這樣的場景:發(fā)送訂單確認郵件,通知其他系統(tǒng)有關(guān)新客戶注冊的信息,或在下訂單后更新庫存水平。每一種操作都涉及本地數(shù)據(jù)變更以及外部數(shù)據(jù)通信或更新。

例如,想象某個微服務需要:

  • 在數(shù)據(jù)庫中保存新訂單
  • 告訴其他系統(tǒng)這個新訂單

如果其中某個步驟失敗,系統(tǒng)可能最終處于不一致狀態(tài)。也許訂單被保存了,但是沒有其他人知道。或者每個人都認為有新訂單,但數(shù)據(jù)庫里卻沒有。

下面是一個沒有發(fā)件箱模式的 CreateOrderCommandHandler:

public class CreateOrderCommandHandler(
    IOrderRepository orderRepository,
    IProductInventoryChecker inventoryChecker,
    IUnitOfWork unitOfWork,
    IEventBus eventBus) : IRequestHandler<CreateOrderCommand, OrderDto>
{
    public async Task<OrderDto> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
    {
        var order = new Order(request.CustomerId, request.ProductId, request.Quantity, inventoryChecker);

        await orderRepository.AddAsync(order);

        await unitOfWork.CommitAsync(cancellationToken);

        // 數(shù)據(jù)庫事務已完成

        await eventBus.Send(new OrderCreatedIntegrationEvent(order.Id));

        returnnew OrderDto { Id = order.Id, Total = order.Total };
    }
}

這段代碼有潛在的一致性問題。在提交數(shù)據(jù)庫事務之后,有兩件事可能出錯:

  • 應用程序可能在事務提交之后、事件發(fā)送之前崩潰。因此數(shù)據(jù)庫會創(chuàng)建訂單,但其他系統(tǒng)不會知道。
  • 當我們嘗試發(fā)送事件時,事件總線可能已關(guān)閉或無法訪問。這將導致在沒有通知其他系統(tǒng)的情況下創(chuàng)建訂單。

事務性發(fā)件箱模式通過確保將數(shù)據(jù)庫更新和事件發(fā)布視為單個原子操作來幫助解決此問題。

流程圖說明了發(fā)件箱模式如何解決一致性挑戰(zhàn)。我們沒有嘗試分別保存數(shù)據(jù)和發(fā)送消息,而是將訂單和發(fā)件箱消息保存在單個數(shù)據(jù)庫事務中。這是一個全部成功或全部失敗的操作,該操作不能以不一致的狀態(tài)結(jié)束。

單獨的發(fā)件箱進程處理實際的消息發(fā)送。它持續(xù)檢查發(fā)件箱表中未發(fā)送的消息,并將其發(fā)布到消息隊列中。進程在成功發(fā)布消息后將消息標記為已發(fā)送,從而避免重復發(fā)送。

需要注意,發(fā)件箱模式提供了至少一次的交付。發(fā)件箱消息將至少發(fā)送一次,但也可以多次發(fā)送,用于重試。這意味著必須實現(xiàn)冪等的消息消費。

發(fā)件箱模式實現(xiàn)

首先,創(chuàng)建發(fā)件箱表,將在其中存儲消息:

CREATE TABLE outbox_messages (
    idUUID PRIMARY KEY,
    typeVARCHAR(255) NOTNULL,
    content JSONB NOTNULL,
    occurred_on_utc TIMESTAMPWITHTIME ZONE NOTNULL,
    processed_on_utc TIMESTAMPWITHTIME ZONE NULL,
    errorTEXTNULL
);

-- 因為將會經(jīng)常查詢未處理的消息,因此可以考慮添加索引
-- 它將數(shù)據(jù)行按照我們查詢需要的正確順序排序。

CREATEINDEXIFNOTEXISTS idx_outbox_messages_unprocessed
ON outbox_messages (occurred_on_utc, processed_on_utc)
INCLUDE (id, type, content)
WHERE processed_on_utc ISNULL;

我用 PostgreSQL 作為示例數(shù)據(jù)庫。注意 content 列類型為 jsonb。如果將來需要,可以對 JSON 數(shù)據(jù)進行索引和查詢。

現(xiàn)在,我們創(chuàng)建一個表示發(fā)件箱條目的類:

public sealed class OutboxMessage
{
    public Guid Id { get; init; }
    public string Type { get; init; }
    public string Content { get; init; }
    public DateTime OccurredOnUtc { get; init; }
    public DateTime? ProcessedOnUtc { get; init; }
    public string? Error { get; init; }
}

下面將消息添加到發(fā)件箱:

public async Task AddToOutbox<T>(T message, NpgsqlDataSource dataSource)
{
    var outboxMessage = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        OccurredOnUtc = DateTime.UtcNow,
        Type = typeof(T).FullName, // 通過這種方式實現(xiàn)反序列化
        Content = JsonSerializer.Serialize(message)
    };

    awaitusingvar connection = await dataSource.OpenConnectionAsync();
    await connection.ExecuteAsync(
        @"""
        INSERT INTO outbox_messages (id, occurred_on_utc, type, content)
        VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb)
        """,
        outboxMessage);
}

一種優(yōu)雅的實現(xiàn)方法是使用域事件來表示通知。當域中發(fā)生重大事件時,將觸發(fā)域事件。在完成事務之前,可以獲取所有事件并存儲為發(fā)件箱消息。我們可以通過工作單元或EF Core攔截器執(zhí)行此操作。

發(fā)件箱進程

另一個組件是發(fā)件箱進程,可以是物理上獨立的進程,也可以是同一進程中的后臺工作線程。

我用 Quartz 來調(diào)度處理發(fā)件箱的后臺作業(yè),這是一個健壯的庫,對調(diào)度循環(huán)作業(yè)提供了出色的支持。

我們來實現(xiàn) OutboxProcessorJob:

[DisallowConcurrentExecution]
public class OutboxProcessorJob(
    NpgsqlDataSource dataSource,
    IPublishEndpoint publishEndpoint,
    Assembly integrationEventsAssembly) : IJob
{
    public async Task Execute(IJobExecutionContext context)
    {
        awaitusingvar connection = await dataSource.OpenConnectionAsync();
        awaitusingvar transaction = await connection.BeginTransactionAsync();

        // You can make the limit a parameter, to control the batch size.
        // We can also select just the id, type, and content columns.
        var messages = await connection.QueryAsync<OutboxMessage>(
            @"""
            SELECT id AS Id, type AS Type, content AS Content
            FROM outbox_messages
            WHERE processed_on_utc IS NULL
            ORDER BY occurred_on_utc LIMIT 100
            """,
            transaction: transaction);

        foreach (var message in messages)
        {
            try
            {
                var messageType = integrationEventsAssembly.GetType(message.Type);
                var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);

                // We should introduce retries here to improve reliability.
                await publishEndpoint.Publish(deserializedMessage);

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, message.Id },
                    transaction: transaction);
            }
            catch (Exception ex)
            {
                // We can also introduce error logging here.

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc, error = @Error
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id },
                    transaction: transaction);
            }
        }

        await transaction.CommitAsync();
    }
}

這種方法使用輪詢定期從數(shù)據(jù)庫獲取未處理的消息。因為需要頻繁查詢未處理消息,因此輪詢會增加數(shù)據(jù)庫負載。

處理發(fā)件箱消息的另一種方法是使用事務日志跟蹤,可以通過 Postgres邏輯復制來實現(xiàn)。數(shù)據(jù)庫把更改從預寫日志(Write-Ahead Log, WAL)流式傳輸?shù)綉贸绦?,然后處理這些消息并發(fā)布到消息代理。通過這種方式可以實現(xiàn)基于推送的發(fā)件箱處理進程。

權(quán)衡利弊

發(fā)件箱模式雖然有效,但引入了額外復雜性和數(shù)據(jù)庫寫入。在高吞吐量系統(tǒng)中,很重要的一點是需要監(jiān)控性能以確保其不會成為瓶頸。

建議在發(fā)件箱處理進程中實現(xiàn)重試機制,以提高可靠性。考慮對瞬態(tài)故障使用指數(shù)回退,對持久性問題使用斷路器,以防止系統(tǒng)在中斷期間過載。

非常重要的一點是需要實現(xiàn)消息的冪等消費。網(wǎng)絡(luò)問題或處理器重啟可能導致多次傳遞同一消息,因此使用者必須安全的處理重復消息。

隨著時間推移,發(fā)件箱表可能會顯著增長,從而影響數(shù)據(jù)庫性能。盡早實現(xiàn)存檔策略是很重要的一點,可以考慮將處理過的消息移動到冷存儲或在一段時間后刪除。

擴展發(fā)件箱處理進程

隨著系統(tǒng)增長,可能單個發(fā)件箱處理進程無法跟上消息數(shù)量的增長,從而導致發(fā)生錯誤以及增加處理延遲。

一種直接的方法是增加發(fā)件箱處理作業(yè)的頻率,考慮每隔幾秒鐘運行一次,可以顯著減少消息處理中的延遲。

另一種有效的策略是在獲取未處理消息時增加批處理大小。通過在每次運行中處理更多消息,可以提高吞吐量。但是,要小心不要使批處理太大,以免導致長時間運行的事務。

對于大容量系統(tǒng),發(fā)件箱的并行處理可能非常有效。實現(xiàn)鎖定機制以聲明消息批次,從而允許多個處理進程同時工作而不發(fā)生沖突??梢?nbsp;SELECT…FOR UPDATE SKIP LOCKED 聲明一批消息。這種方法可以顯著提高處理能力。

總結(jié)

發(fā)件箱模式是維護分布式系統(tǒng)數(shù)據(jù)一致性的強大工具。通過將數(shù)據(jù)庫操作與消息發(fā)布分離,發(fā)件箱模式可確保系統(tǒng)即使在出現(xiàn)故障時也保持可靠。

記住保持消費者冪等,實現(xiàn)適當?shù)臄U容策略,并管理好發(fā)件箱表的增長。

雖然增加了一些復雜性,但保證消息傳遞的好處使其成為許多場景中有價值的模式。

責任編輯:趙寧寧 來源: DeepNoMind
相關(guān)推薦

2023-01-07 10:17:06

微服務架構(gòu)模式

2025-05-26 09:10:00

微服務系統(tǒng)發(fā)件箱模式

2012-05-24 13:39:11

Python

2024-09-29 09:58:57

2024-07-22 14:34:20

簡單工廠模式C#

2009-08-10 09:19:47

.NET反應性框架

2009-03-13 09:48:33

ASP.NETAjaxJQuery

2010-01-21 09:08:53

.NET設(shè)計模式

2011-08-30 14:18:00

2019-07-02 15:21:39

緩存NET單線程

2009-02-27 16:22:34

AjaxProAjax.NET

2009-07-30 18:45:05

ASP.NET中防止頁

2009-07-27 18:13:19

ASP.NET工廠模式

2024-09-30 09:48:41

RabbitMQ消息中間件

2009-06-26 10:48:45

職責鏈模式.NET

2018-02-10 09:44:19

2009-07-30 13:45:40

ASP.NET開發(fā)模式MVC模式

2009-06-25 15:54:18

設(shè)計模式EJB

2021-09-13 07:00:01

C# .NET 緩存

2009-07-30 14:03:04

ASP.NET中的se
點贊
收藏

51CTO技術(shù)棧公眾號