偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

在 .NET 中實(shí)現(xiàn)發(fā)件箱模式的實(shí)戰(zhàn)

開(kāi)發(fā) 數(shù)據(jù)庫(kù)
本文介紹了發(fā)件箱模式在.NET中的實(shí)現(xiàn),討論了該模式的利弊和擴(kuò)容機(jī)制,在本文的基礎(chǔ)上也能比較方便的實(shí)現(xiàn)基于其他語(yǔ)言的發(fā)件箱模式。

我們?cè)诜植际较到y(tǒng)中經(jīng)常面臨保持?jǐn)?shù)據(jù)庫(kù)和外部系統(tǒng)同步的挑戰(zhàn)。想象一下,將訂單保存在數(shù)據(jù)庫(kù)中,然后將消息發(fā)布到消息代理。任何一個(gè)操作失敗,系統(tǒng)都將處于不一致的狀態(tài)。

發(fā)件箱模式通過(guò)將消息發(fā)布視為數(shù)據(jù)庫(kù)事務(wù)的一部分來(lái)解決此問(wèn)題。我們并不直接發(fā)布消息,而是將消息保存到數(shù)據(jù)庫(kù)中的發(fā)件箱表中,以確保原子操作,然后通過(guò)單獨(dú)進(jìn)程可靠的發(fā)布消息。

本文將深入探討如何在 .NET 中實(shí)現(xiàn)這種模式。

為什么需要發(fā)件箱模式?

事務(wù)性發(fā)件箱模式修復(fù)了分布式系統(tǒng)中的一個(gè)常見(jiàn)問(wèn)題。當(dāng)我們需要同時(shí)做兩件事時(shí),就會(huì)出現(xiàn)這個(gè)問(wèn)題:保存數(shù)據(jù)并與外部組件通信。

考慮這樣的場(chǎng)景:發(fā)送訂單確認(rèn)郵件,通知其他系統(tǒng)有關(guān)新客戶注冊(cè)的信息,或在下訂單后更新庫(kù)存水平。每一種操作都涉及本地?cái)?shù)據(jù)變更以及外部數(shù)據(jù)通信或更新。

例如,想象某個(gè)微服務(wù)需要:

  • 在數(shù)據(jù)庫(kù)中保存新訂單
  • 告訴其他系統(tǒng)這個(gè)新訂單

如果其中某個(gè)步驟失敗,系統(tǒng)可能最終處于不一致?tīng)顟B(tài)。也許訂單被保存了,但是沒(méi)有其他人知道?;蛘呙總€(gè)人都認(rèn)為有新訂單,但數(shù)據(jù)庫(kù)里卻沒(méi)有。

下面是一個(gè)沒(méi)有發(fā)件箱模式的 CreateOrderCommandHandler:

public class CreateOrderCommandHandler(
    IOrderRepository orderRepository,
    IProductInventoryChecker inventoryChecker,
    IUnitOfWork unitOfWork,
    IEventBus eventBus) : IRequestHandler<CreateOrderCommand, OrderDto>
{
    public async Task<OrderDto> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
    {
        var order = new Order(request.CustomerId, request.ProductId, request.Quantity, inventoryChecker);

        await orderRepository.AddAsync(order);

        await unitOfWork.CommitAsync(cancellationToken);

        // 數(shù)據(jù)庫(kù)事務(wù)已完成

        await eventBus.Send(new OrderCreatedIntegrationEvent(order.Id));

        returnnew OrderDto { Id = order.Id, Total = order.Total };
    }
}

這段代碼有潛在的一致性問(wèn)題。在提交數(shù)據(jù)庫(kù)事務(wù)之后,有兩件事可能出錯(cuò):

  • 應(yīng)用程序可能在事務(wù)提交之后、事件發(fā)送之前崩潰。因此數(shù)據(jù)庫(kù)會(huì)創(chuàng)建訂單,但其他系統(tǒng)不會(huì)知道。
  • 當(dāng)我們嘗試發(fā)送事件時(shí),事件總線可能已關(guān)閉或無(wú)法訪問(wèn)。這將導(dǎo)致在沒(méi)有通知其他系統(tǒng)的情況下創(chuàng)建訂單。

事務(wù)性發(fā)件箱模式通過(guò)確保將數(shù)據(jù)庫(kù)更新和事件發(fā)布視為單個(gè)原子操作來(lái)幫助解決此問(wèn)題。

流程圖說(shuō)明了發(fā)件箱模式如何解決一致性挑戰(zhàn)。我們沒(méi)有嘗試分別保存數(shù)據(jù)和發(fā)送消息,而是將訂單和發(fā)件箱消息保存在單個(gè)數(shù)據(jù)庫(kù)事務(wù)中。這是一個(gè)全部成功或全部失敗的操作,該操作不能以不一致的狀態(tài)結(jié)束。

單獨(dú)的發(fā)件箱進(jìn)程處理實(shí)際的消息發(fā)送。它持續(xù)檢查發(fā)件箱表中未發(fā)送的消息,并將其發(fā)布到消息隊(duì)列中。進(jìn)程在成功發(fā)布消息后將消息標(biāo)記為已發(fā)送,從而避免重復(fù)發(fā)送。

需要注意,發(fā)件箱模式提供了至少一次的交付。發(fā)件箱消息將至少發(fā)送一次,但也可以多次發(fā)送,用于重試。這意味著必須實(shí)現(xiàn)冪等的消息消費(fèi)。

發(fā)件箱模式實(shí)現(xiàn)

首先,創(chuàng)建發(fā)件箱表,將在其中存儲(chǔ)消息:

CREATE TABLE outbox_messages (
    idUUID PRIMARY KEY,
    typeVARCHAR(255) NOTNULL,
    content JSONB NOTNULL,
    occurred_on_utc TIMESTAMPWITHTIME ZONE NOTNULL,
    processed_on_utc TIMESTAMPWITHTIME ZONE NULL,
    errorTEXTNULL
);

-- 因?yàn)閷?huì)經(jīng)常查詢未處理的消息,因此可以考慮添加索引
-- 它將數(shù)據(jù)行按照我們查詢需要的正確順序排序。

CREATEINDEXIFNOTEXISTS idx_outbox_messages_unprocessed
ON outbox_messages (occurred_on_utc, processed_on_utc)
INCLUDE (id, type, content)
WHERE processed_on_utc ISNULL;

我用 PostgreSQL 作為示例數(shù)據(jù)庫(kù)。注意 content 列類型為 jsonb。如果將來(lái)需要,可以對(duì) JSON 數(shù)據(jù)進(jìn)行索引和查詢。

現(xiàn)在,我們創(chuàng)建一個(gè)表示發(fā)件箱條目的類:

public sealed class OutboxMessage
{
    public Guid Id { get; init; }
    public string Type { get; init; }
    public string Content { get; init; }
    public DateTime OccurredOnUtc { get; init; }
    public DateTime? ProcessedOnUtc { get; init; }
    public string? Error { get; init; }
}

下面將消息添加到發(fā)件箱:

public async Task AddToOutbox<T>(T message, NpgsqlDataSource dataSource)
{
    var outboxMessage = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        OccurredOnUtc = DateTime.UtcNow,
        Type = typeof(T).FullName, // 通過(guò)這種方式實(shí)現(xiàn)反序列化
        Content = JsonSerializer.Serialize(message)
    };

    awaitusingvar connection = await dataSource.OpenConnectionAsync();
    await connection.ExecuteAsync(
        @"""
        INSERT INTO outbox_messages (id, occurred_on_utc, type, content)
        VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb)
        """,
        outboxMessage);
}

一種優(yōu)雅的實(shí)現(xiàn)方法是使用域事件來(lái)表示通知。當(dāng)域中發(fā)生重大事件時(shí),將觸發(fā)域事件。在完成事務(wù)之前,可以獲取所有事件并存儲(chǔ)為發(fā)件箱消息。我們可以通過(guò)工作單元或EF Core攔截器執(zhí)行此操作。

發(fā)件箱進(jìn)程

另一個(gè)組件是發(fā)件箱進(jìn)程,可以是物理上獨(dú)立的進(jìn)程,也可以是同一進(jìn)程中的后臺(tái)工作線程。

我用 Quartz 來(lái)調(diào)度處理發(fā)件箱的后臺(tái)作業(yè),這是一個(gè)健壯的庫(kù),對(duì)調(diào)度循環(huán)作業(yè)提供了出色的支持。

我們來(lái)實(shí)現(xiàn) OutboxProcessorJob:

[DisallowConcurrentExecution]
public class OutboxProcessorJob(
    NpgsqlDataSource dataSource,
    IPublishEndpoint publishEndpoint,
    Assembly integrationEventsAssembly) : IJob
{
    public async Task Execute(IJobExecutionContext context)
    {
        awaitusingvar connection = await dataSource.OpenConnectionAsync();
        awaitusingvar transaction = await connection.BeginTransactionAsync();

        // You can make the limit a parameter, to control the batch size.
        // We can also select just the id, type, and content columns.
        var messages = await connection.QueryAsync<OutboxMessage>(
            @"""
            SELECT id AS Id, type AS Type, content AS Content
            FROM outbox_messages
            WHERE processed_on_utc IS NULL
            ORDER BY occurred_on_utc LIMIT 100
            """,
            transaction: transaction);

        foreach (var message in messages)
        {
            try
            {
                var messageType = integrationEventsAssembly.GetType(message.Type);
                var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);

                // We should introduce retries here to improve reliability.
                await publishEndpoint.Publish(deserializedMessage);

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, message.Id },
                    transaction: transaction);
            }
            catch (Exception ex)
            {
                // We can also introduce error logging here.

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc, error = @Error
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id },
                    transaction: transaction);
            }
        }

        await transaction.CommitAsync();
    }
}

這種方法使用輪詢定期從數(shù)據(jù)庫(kù)獲取未處理的消息。因?yàn)樾枰l繁查詢未處理消息,因此輪詢會(huì)增加數(shù)據(jù)庫(kù)負(fù)載。

處理發(fā)件箱消息的另一種方法是使用事務(wù)日志跟蹤,可以通過(guò) Postgres邏輯復(fù)制來(lái)實(shí)現(xiàn)。數(shù)據(jù)庫(kù)把更改從預(yù)寫(xiě)日志(Write-Ahead Log, WAL)流式傳輸?shù)綉?yīng)用程序,然后處理這些消息并發(fā)布到消息代理。通過(guò)這種方式可以實(shí)現(xiàn)基于推送的發(fā)件箱處理進(jìn)程。

權(quán)衡利弊

發(fā)件箱模式雖然有效,但引入了額外復(fù)雜性和數(shù)據(jù)庫(kù)寫(xiě)入。在高吞吐量系統(tǒng)中,很重要的一點(diǎn)是需要監(jiān)控性能以確保其不會(huì)成為瓶頸。

建議在發(fā)件箱處理進(jìn)程中實(shí)現(xiàn)重試機(jī)制,以提高可靠性??紤]對(duì)瞬態(tài)故障使用指數(shù)回退,對(duì)持久性問(wèn)題使用斷路器,以防止系統(tǒng)在中斷期間過(guò)載。

非常重要的一點(diǎn)是需要實(shí)現(xiàn)消息的冪等消費(fèi)。網(wǎng)絡(luò)問(wèn)題或處理器重啟可能導(dǎo)致多次傳遞同一消息,因此使用者必須安全的處理重復(fù)消息。

隨著時(shí)間推移,發(fā)件箱表可能會(huì)顯著增長(zhǎng),從而影響數(shù)據(jù)庫(kù)性能。盡早實(shí)現(xiàn)存檔策略是很重要的一點(diǎn),可以考慮將處理過(guò)的消息移動(dòng)到冷存儲(chǔ)或在一段時(shí)間后刪除。

擴(kuò)展發(fā)件箱處理進(jìn)程

隨著系統(tǒng)增長(zhǎng),可能單個(gè)發(fā)件箱處理進(jìn)程無(wú)法跟上消息數(shù)量的增長(zhǎng),從而導(dǎo)致發(fā)生錯(cuò)誤以及增加處理延遲。

一種直接的方法是增加發(fā)件箱處理作業(yè)的頻率,考慮每隔幾秒鐘運(yùn)行一次,可以顯著減少消息處理中的延遲。

另一種有效的策略是在獲取未處理消息時(shí)增加批處理大小。通過(guò)在每次運(yùn)行中處理更多消息,可以提高吞吐量。但是,要小心不要使批處理太大,以免導(dǎo)致長(zhǎng)時(shí)間運(yùn)行的事務(wù)。

對(duì)于大容量系統(tǒng),發(fā)件箱的并行處理可能非常有效。實(shí)現(xiàn)鎖定機(jī)制以聲明消息批次,從而允許多個(gè)處理進(jìn)程同時(shí)工作而不發(fā)生沖突。可以 SELECT…FOR UPDATE SKIP LOCKED 聲明一批消息。這種方法可以顯著提高處理能力。

總結(jié)

發(fā)件箱模式是維護(hù)分布式系統(tǒng)數(shù)據(jù)一致性的強(qiáng)大工具。通過(guò)將數(shù)據(jù)庫(kù)操作與消息發(fā)布分離,發(fā)件箱模式可確保系統(tǒng)即使在出現(xiàn)故障時(shí)也保持可靠。

記住保持消費(fèi)者冪等,實(shí)現(xiàn)適當(dāng)?shù)臄U(kuò)容策略,并管理好發(fā)件箱表的增長(zhǎng)。

雖然增加了一些復(fù)雜性,但保證消息傳遞的好處使其成為許多場(chǎng)景中有價(jià)值的模式。

責(zé)任編輯:趙寧寧 來(lái)源: DeepNoMind
相關(guān)推薦

2023-01-07 10:17:06

微服務(wù)架構(gòu)模式

2025-05-26 09:10:00

微服務(wù)系統(tǒng)發(fā)件箱模式

2012-05-24 13:39:11

Python

2024-09-29 09:58:57

2024-07-22 14:34:20

簡(jiǎn)單工廠模式C#

2009-08-10 09:19:47

.NET反應(yīng)性框架

2009-03-13 09:48:33

ASP.NETAjaxJQuery

2010-01-21 09:08:53

.NET設(shè)計(jì)模式

2011-08-30 14:18:00

2019-07-02 15:21:39

緩存NET單線程

2009-02-27 16:22:34

AjaxProAjax.NET

2009-07-30 18:45:05

ASP.NET中防止頁(yè)

2009-07-27 18:13:19

ASP.NET工廠模式

2009-06-26 10:48:45

職責(zé)鏈模式.NET

2024-09-30 09:48:41

RabbitMQ消息中間件

2009-06-25 15:54:18

設(shè)計(jì)模式EJB

2009-07-30 13:45:40

ASP.NET開(kāi)發(fā)模式MVC模式

2018-02-10 09:44:19

2009-07-30 14:03:04

ASP.NET中的se

2010-02-05 08:32:32

ASP.NET MVC
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)