偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

DDD 架構,MQ 應該放那一層使用?

開發(fā) 前端
因為我們本章所講解的內容是把 RocketMQ 放入 DDD 架構中進行使用,那么也就引申出領域事件定義。所以我們先來了解下,什么是領域事件。

本文的宗旨在于通過簡單干凈實踐的方式教會讀者,使用 Docker 配置 RocketMQ 并在基于 DDD 分層結構的 SpringBoot 工程中使用 RocketMQ 技術。因為大部分 MQ 的發(fā)送都是基于特定業(yè)務場景的,所以本章節(jié)也是基于 《MyBatis 使用教程和插件開發(fā)》 章節(jié)的擴展。

本章也會包括關于 MQ 消息的發(fā)送和接收應該處于 DDD 的哪一層的實踐講解和使用。

本文涉及的工程:

  • xfg-dev-tech-rocketmq:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-rocketmq
  • RocketMQ Docker 安裝:rocketmq-docker-compose-mac-amd-arm.yml
  • 導入測試庫表 road-map.sql

一、案例背景

首先我們要知道,MQ 消息的作用是用于;解耦過長的業(yè)務流程和應對流量沖擊的消峰。如;用戶下單支付完成后,拿到支付消息推動后續(xù)的發(fā)貨流程。也可以是我們基于 《MyBatis 使用教程和插件開發(fā)》 中的案例場景,給雇員提升級別和薪資的時候,也發(fā)送一條MQ消息,用于發(fā)送郵件通知給用戶。

圖片圖片

  • 從薪資調整到郵件發(fā)送,這里是2個業(yè)務流程,通過 MQ 消息的方式進行連接。
  • 其實MQ消息的使用場景特別多,原來你可能使用多線程的一些操作,現(xiàn)在就擴展為多實例的操作了。發(fā)送 MQ 消息出來,讓應用的各個實例接收并進行消費。

二、領域事件

因為我們本章所講解的內容是把 RocketMQ 放入 DDD 架構中進行使用,那么也就引申出領域事件定義。所以我們先來了解下,什么是領域事件。

領域事件,可以說是解耦微服務設計的關鍵。領域事件也是領域模型中非常重要的一部分內容,用于標示當前領域模型中發(fā)生的事件行為。一個領域事件會推進業(yè)務流程的進一步操作,在實現(xiàn)業(yè)務解耦的同時,也推動了整個業(yè)務的閉環(huán)。

圖片圖片

  • 首先,我們需要在領域模型層,添加一塊 event 區(qū)域。它的存在是為了定義出于當前領域下所需的事件消息信息。信息的類型可以是model 下的實體對象、聚合對象。
  • 之后,消息的發(fā)送是放在基礎設置層。本身基礎設置層就是依賴倒置于模型層,所以在模型層所定義的 event 對象,可以很方便的在基礎設置層使用。而且大部分開發(fā)的時候,MQ消息的發(fā)送與數(shù)據(jù)庫操作都是關聯(lián)的,采用的方式是,做完數(shù)據(jù)落庫后,推送MQ消息。所以定義在倉儲中實現(xiàn),會更加得心應手、水到渠成。
  • 最后,就是 MQ 的消息,MQ 的消費可以是自身服務所發(fā)出的消息,也可以是外部其他微服務的消息。就在小傅哥所整體講述的這套簡明教程中 DDD 部分的觸發(fā)器層。

三、環(huán)境安裝

本案例涉及了數(shù)據(jù)庫和RocketMQ的使用,都已經(jīng)在工程中提供了安裝腳本,可以按需執(zhí)行。

圖片圖片

這里主要介紹 RocketMQ 的安裝;

1. 執(zhí)行 compose yml

文件:docs/rocketmq/rocketmq-docker-compose-mac-amd-arm.yml - 關于安裝小傅哥提供了不同的鏡像,包括Mac、Mac M1、Windows 可以按需選擇使用。

version: '3'
services:
  # https://hub.docker.com/r/xuchengen/rocketmq
  # 注意修改項;
  # 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1
  # 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口
  rocketmq:
    image: livinphp/rocketmq:5.1.0
    container_name: rocketmq
    ports:
      - 9009:9009
      - 9876:9876
      - 10909:10909
      - 10911:10911
      - 10912:10912
    volumes:
      - ./data:/home/app/data
    environment:
      TZ: "Asia/Shanghai"
      NAMESRV_ADDR: "rocketmq:9876"
  • 在 IDEA 中打開 rocketmq-docker-compose-mac-amd-arm.yml 你會看到一個綠色的按鈕在左側側邊欄,點擊即可安裝。或者你也可以使用命令安裝:# /usr/local/bin/docker-compose -f /docs/dev-ops/environment/environment-docker-compose.yml up -d - 比較適合在云服務器上執(zhí)行。
  • 首次安裝可能使用不了,一個原因是 brokerIP1 未配置IP,另外一個是默認的 8080 端口占用??梢园凑杖缦滦「蹈缯f的方式修改。

2. 修改默認配合

  1. 打開 data/rocketmq/conf/broker.conf 添加一條 brokerIP1=127.0.0.1 在結尾
# 集群名稱
brokerClusterName = DefaultCluster
# BROKER 名稱
brokerName = broker-a
# 0 表示 Master, > 0 表示 Slave
brokerId = 0
# 刪除文件時間點,默認凌晨 4 點
deleteWhen = 04
# 文件保留時間,默認 48 小時
fileReservedTime = 48
# BROKER 角色 ASYNC_MASTER為異步主節(jié)點,SYNC_MASTER為同步主節(jié)點,SLAVE為從節(jié)點
brokerRole = ASYNC_MASTER
# 刷新數(shù)據(jù)到磁盤的方式,ASYNC_FLUSH 刷新
flushDiskType = ASYNC_FLUSH
# 存儲路徑
storePathRootDir = /home/app/data/rocketmq/store
# IP地址
brokerIP1 = 127.0.0.1
  1. 打開 ``data/console/config/application.properties修改server.port=9009` 端口。
server.address=0.0.0.0
server.port=9009
  • 修改配置后,重啟服務。

3. RockMQ登錄與配置

3.1 登錄

RocketMQ 此鏡像,會在安裝后在控制臺打印登錄賬號信息,你可以查看使用。

圖片圖片

圖片圖片

登錄:http://localhost:9009/

3.2 創(chuàng)建Topic

圖片圖片

  • 也可以使用命令創(chuàng)建:docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq

3.3 創(chuàng)建消費者組

圖片圖片

  • 也可以使用命令創(chuàng)建:docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group

四、工程實現(xiàn)

1. 工程結構

圖片圖片

  • MQ 的使用無論是 RocketMQ 還是 Kafka 等,都很簡單。但在使用之前,要考慮好怎么在架構中合理的使用。如果最初沒有定義好這些,那么胡亂的任何地方都能發(fā)送和接收MQ,最后的工程將非常難以維護。
  • 所以這里整個MQ的生產(chǎn)和消費,是按照整個 DDD 領域事件結構進行設計。分為在 domain 使用基礎層生產(chǎn)消息,再有 trigger 層接收消息。

2. 配置文件

引入POM

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.4</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

添加配置

# RocketMQ 配置
rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: xfg-group
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消費最大值
    pull-batch-size: 10
  producer:
    # 發(fā)送同一類消息的設置為同一個group,保證唯一
    group: xfg-group
    # 發(fā)送消息超時時間,默認3000
    sendMessageTimeout: 10000
    # 發(fā)送消息失敗重試次數(shù),默認2
    retryTimesWhenSendFailed: 2
    # 異步消息重試此處,默認2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大長度,默認1024 * 1024 * 4(默認4M)
    maxMessageSize: 4096
    # 壓縮消息閾值,默認4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在內部發(fā)送失敗時重試另一個broker,默認false
    retryNextServer: false

3. 定義領域事件

源碼:cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent

圖片圖片

@EqualsAndHashCode(callSuper = true)
@Data
public class SalaryAdjustEvent extends BaseEvent<AdjustSalaryApplyOrderAggregate> {

    public static String TOPIC = "xfg-mq";

    public static SalaryAdjustEvent create(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
        SalaryAdjustEvent event = new SalaryAdjustEvent();
        event.setId(RandomStringUtils.randomNumeric(11));
        event.setTimestamp(new Date());
        event.setData(adjustSalaryApplyOrderAggregate);
        return event;
    }

}
  • 每個領域的消息,都有領域自己定義。發(fā)送的時候再交給基礎設施層來發(fā)送。

4. 消息發(fā)送

源碼:cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher

圖片圖片

@Component
@Slf4j
public class EventPublisher {

    @Setter(onMethod_ = @Autowired)
    private RocketMQTemplate rocketmqTemplate;

    /**
     * 普通消息
     *
     * @param topic   主題
     * @param message 消息
     */
    public void publish(String topic, BaseEvent<?> message) {
        try {
            String mqMessage = JSON.toJSONString(message);
            log.info("發(fā)送MQ消息 topic:{} message:{}", topic, mqMessage);
            rocketmqTemplate.convertAndSend(topic, mqMessage);
        } catch (Exception e) {
            log.error("發(fā)送MQ消息失敗 topic:{} message:{}", topic, JSON.toJSONString(message), e);
            // 大部分MQ發(fā)送失敗后,會需要任務補償
        }
    }

    /**
     * 延遲消息
     *
     * @param topic          主題
     * @param message        消息
     * @param delayTimeLevel 延遲時長
     */
    public void publishDelivery(String topic, BaseEvent<?> message, int delayTimeLevel) {
        try {
            String mqMessage = JSON.toJSONString(message);
            log.info("發(fā)送MQ延遲消息 topic:{} message:{}", topic, mqMessage);
            rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 1000, delayTimeLevel);
        } catch (Exception e) {
            log.error("發(fā)送MQ延遲消息失敗 topic:{} message:{}", topic, JSON.toJSONString(message), e);
            // 大部分MQ發(fā)送失敗后,會需要任務補償
        }
    }

}
  • 在基礎設施層提供 event 事件的處理,也就是 MQ 消息的發(fā)送。

源碼:cn.bugstack.xfg.dev.tech.infrastructure.repository.SalaryAdjustRepository

@Resource
private EventPublisher eventPublisher;
    
@Override
@Transactional(rollbackFor = Exception.class, timeout = 350, propagation = Propagation.REQUIRED, isolation = Isolation.DEFAULT)
public String adjustSalary(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
   
  // ... 省略部分代碼 

    eventPublisher.publish(SalaryAdjustEvent.TOPIC, SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate));
    return orderId;
}

在 SalaryAdjustRepository 倉儲的實現(xiàn)中,做完業(yè)務流程開始發(fā)送 MQ 消息。這里有2點要注意;

  1. 消息發(fā)送,不要寫在數(shù)據(jù)庫事務中。因為事務一直占用數(shù)據(jù)庫連接,需要快速釋放。
  2. 對于一些強MQ要求的場景,需要在發(fā)送MQ前,寫入一條數(shù)據(jù)庫 Task 記錄,發(fā)送消息后更新 Task 狀態(tài)為成功。如果長時間未更新數(shù)據(jù)庫狀態(tài)或者為失敗的,則需要由任務補償進行處理。

5. 消費消息

源碼:cn.bugstack.xfg.dev.tech.trigger.mq.SalaryAdjustMQListener

圖片圖片

@Component
@Slf4j
@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group")
public class SalaryAdjustMQListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("接收到MQ消息 {}", s);
    }

}
  • 消費消息,配置消費者組合消費的主題,之后就可以接收到消息了。接收以后你可以做自己的業(yè)務,如果拋出異常,消息會進行重新接收處理。

六、測試驗證

1. 單獨發(fā)送消息測試

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMQTest {

    @Setter(onMethod_ = @Autowired)
    private RocketMQTemplate rocketmqTemplate;

    @Test
    public void test() throws InterruptedException {
        while (true) {
            rocketmqTemplate.convertAndSend("xfg-mq", "我是測試消息");
            Thread.sleep(3000);
        }
    }

}
  • 這里方便你來發(fā)送消息,驗證流程。

2. 業(yè)務流程消息驗證

@Test
public void test_execSalaryAdjust() throws InterruptedException {
    AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate = AdjustSalaryApplyOrderAggregate.builder()
            .employeeNumber("10000001")
            .orderId("100908977676003")
            .employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build())
            .employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder()
                    .adjustTotalAmount(new BigDecimal(100))
                    .adjustBaseAmount(new BigDecimal(80))
                    .adjustMeritAmount(new BigDecimal(20)).build())
            .build();
    String orderId = salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate);
    log.info("調薪測試 req: {} res: {}", JSON.toJSONString(adjustSalaryApplyOrderAggregate), orderId);
    Thread.sleep(Integer.MAX_VALUE);
}
23-07-29.15:40:52.307 [main            ] INFO  HikariDataSource       - HikariPool-1 - Start completed.
23-07-29.15:40:52.445 [main            ] INFO  EventPublisher         - 發(fā)送MQ消息 topic:xfg-mq message:{"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
23-07-29.15:40:52.517 [main            ] INFO  ISalaryAdjustApplyServiceTest - 調薪測試 req: {"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"} res: 100908977676004
23-07-29.15:40:52.520 [ConsumeMessageThread_1] INFO  SalaryAdjustMQListener - 接收到MQ消息 {"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
  • 當執(zhí)行一次加薪調整后,就會接收到MQ消息了。
責任編輯:武曉燕 來源: bugstack蟲洞棧
相關推薦

2023-11-24 07:16:10

DDD微服務

2020-09-07 06:38:54

HA高可用協(xié)議

2024-05-21 09:26:54

微服務DDD建模架構

2019-01-18 16:39:08

系統(tǒng)層中間件層應用層

2022-01-10 13:01:32

指針Struct內存

2021-10-29 21:26:39

前端引擎層類型

2025-01-15 08:46:55

2025-02-05 09:46:13

OracleDBA投資

2023-02-15 13:50:58

DDD戰(zhàn)略設計

2022-01-11 20:43:16

TCPIP模型

2021-10-26 16:20:34

比特幣區(qū)塊鏈加密貨幣

2025-01-16 10:38:31

2009-06-10 09:58:14

程序員職場層次

2021-03-18 13:20:52

Linux MintLinuxLinux發(fā)行版

2011-04-19 13:53:41

三層架構

2024-04-11 10:01:29

2010-11-10 10:39:19

2024-06-20 13:22:13

C++11C++模板

2023-08-06 23:31:36

架構系統(tǒng)RPC

2019-08-26 14:53:32

數(shù)據(jù)中心運維管理宕機
點贊
收藏

51CTO技術棧公眾號