SpringBoot整合RabbitMQ實現(xiàn)郵件異步發(fā)送
本篇文章將介紹另一種高可用的服務架構,以便實現(xiàn)郵件 100% 被投遞成功。類似的短信推送等服務,實現(xiàn)邏輯也大體類似。
01、先來一張流程圖
圖片
本文內(nèi)容主要圍繞這個流程圖展開,利用 RabbitMQ 消息隊列來實現(xiàn)郵件 100% 被投遞,內(nèi)容涵蓋了 RabbitMQ 很多知識點,如:
- 生產(chǎn)者和消費者模型
 - 消息發(fā)送機制
 - 消費確認機制
 - 消息的重新投遞
 - 消息消費失敗的處理方案
 
02、實現(xiàn)思路
- 1.準備一臺電腦,并安裝 RabbitMQ 服務
 - 2.開放 QQ 郵箱或者其它郵箱授權碼,用于發(fā)送郵件
 - 3.創(chuàng)建郵件發(fā)送項目并編寫代碼
 - 4.發(fā)送郵件測試
 - 5.消息消費失敗的處理介紹
 
03、環(huán)境準備
3.1、安裝 RabbitMQ 服務
安裝 RabbitMQ 服務,這一步比較簡單,可以訪問下面的官方地址,下載軟件包并依次按照步驟進行安裝即可。
https://rabbitmq.org.cn/docs/download安裝成功之后,登陸 RabbitMQ 控制臺,可以看到類似于如下界面。
圖片
3.1.1、創(chuàng)建交換器
點擊“Exchanges”菜單,進入“交換器”管理界面。
圖片
進入之后,點擊最下方“Add a new exchange”按鈕,創(chuàng)建一個類型為topic,名稱叫mail.exchange的交換器,并提交。
圖片
3.1.2、創(chuàng)建消息隊列
接著,點擊“Queues”菜單,進入消息隊列管理界面。
圖片
同樣的,點擊最下方“Add a new queue”按鈕,創(chuàng)建一個名稱叫mq.mail.ack的消息隊列,并提交。
圖片
保存之后,在列表中可以看到剛剛創(chuàng)建的消息隊列,然后點擊進入詳情。
圖片
在詳情中,將當前消息隊列與上文創(chuàng)建的交換器進行綁定,便于后續(xù)通過交換器來發(fā)送消息到隊列,操作如下。
圖片
對于topic類型的交換器,通常不直接與消息隊列進行交互,而是通過一個路由鍵,將消息路由到目標消息隊列,這樣設計的目的是讓消息投遞更加靈活。路由鍵,可以簡單理解為類似于路由器,對數(shù)據(jù)進行路由分發(fā)處理。
3.2、配置郵箱發(fā)送服務器
為了實現(xiàn)郵件自動發(fā)送功能,我們還需要準備一個郵箱發(fā)送服務器,這一步在之前的文章中已經(jīng)詳細的介紹過,在此,我們簡單的再介紹一下。
以 QQ 郵箱為例,登陸進去之后,在設置里面開啟 POP3/SMTP 服務,并獲取授權碼記錄下來。
圖片
圖片
該授權碼,就是下文配置文件中spring.mail.password需要的密碼!
04、方案實踐
4.1、構建項目
在 IDEA 下創(chuàng)建一個名稱為smail的 Spring Boot 項目,pom文件中加入amqp和mail相關依賴包,示例如下:
<!--mail 支持-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<!--amqp 支持-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>4.2、添加相關配置
在application.properties中添加 rabbitmq、郵箱相關配置,示例如下:
# 配置郵件發(fā)送主機地址
spring.mail.host=smtp.exmail.qq.com
# 配置郵件發(fā)送服務端口號
spring.mail.port=465
# 配置郵件發(fā)送服務協(xié)議
spring.mail.protocol=smtp
# 配置郵件發(fā)送者用戶名或者賬戶
spring.mail.username=xxxx
# 配置郵件發(fā)送者密碼或者授權碼
spring.mail.password=xxxx
# 配置郵件默認編碼
spring.mail.default-encoding=UTF-8
# 配置smtp相關屬性
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.ssl.enable=true
spring.mail.properties.mail.smtp.ssl.required=true
#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=test
spring.rabbitmq.password=test
# 開啟confirms回調(diào) P -> Exchange
spring.rabbitmq.publisher-cnotallow=true
# 開啟returnedMessage回調(diào) Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 設置手動確認(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100其中,spring.mail.username和spring.mail.password指的就是上文中創(chuàng)建的郵箱賬號和授權碼,將其配置進去即可。
4.3、編寫 RabbitMQ 配置類
編寫一個 RabbitMQ 配置類,用于監(jiān)聽消息的發(fā)送情況,示例如下。
@Configuration
public class RabbitConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfig.class);
    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 設置消息轉(zhuǎn)換器為json格式
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        // 消息是否成功發(fā)送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                LOGGER.info("消息發(fā)送到Exchange成功,{}", correlationData);
            } else {
                LOGGER.error("消息發(fā)送到Exchange失敗, {}, cause: {}", correlationData, cause);
            }
        });
        // 觸發(fā)setReturnCallback回調(diào)必須設置mandatory=true, 否則Exchange沒有找到Queue就會丟棄掉消息, 而不會觸發(fā)回調(diào)
        rabbitTemplate.setMandatory(true);
        // 消息是否從Exchange路由到Queue, 注意: 這是一個失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會回調(diào)這個方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            LOGGER.error("消息從Exchange路由到Queue失敗: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
        });
        return rabbitTemplate;
    }
}4.4、編寫生產(chǎn)者服務
在 Spring Boot 中,我們可以利用RabbitTemplate工具,將數(shù)據(jù)通過交換器發(fā)送到目標消息隊列,示例如下。
@Service
public class ProduceService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 發(fā)送消息
     * @param mail
     * @return
     */
    public boolean sendByAck(Mail mail) {
        // 創(chuàng)建uuid
        String msgId = UUID.randomUUID().toString().replaceAll("-", "");
        mail.setMsgId(msgId);
        // 發(fā)送消息到mq服務器中(附帶消息ID)
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend("mail.exchange", "route.mail.ack", MessageHelper.objToMsg(mail), correlationData);
        return true;
    }
}4.5、編寫消費者服務
在 Spring Boot 中,我們可以利用@RabbitListener注解,監(jiān)聽指定的消息隊列,如果隊列中有消息會第一時間收到回調(diào),示例如下。
@Component
public class ConsumerService {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerService.class);
    @Autowired
    private SendMailService sendMailService;
    /**
     * 監(jiān)聽消息隊列,手動確認模式,必須手動調(diào)用ack或者nack方法
     * 配置參數(shù):spring.rabbitmq.listener.simple.acknowledge-mode=manual
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = {"mq.mail.ack"})
    public void consumeFromAck(Message message, Channel channel) throws IOException {
        LOGGER.info("收到消息:{}", message.toString());
        //將消息轉(zhuǎn)化為對象
        Mail mail = MessageHelper.msgToObj(message, Mail.class);
        // 手動確認模式
        long tag = message.getMessageProperties().getDeliveryTag();
        boolean success = sendMailService.send(mail);
        if (success) {
            // 消費成功,消息會被刪除
            channel.basicAck(tag, false);
        } else {
            // 消費失敗,重新返回隊列
            channel.basicNack(tag, false, true);
        }
    }
}4.6、編寫郵件發(fā)送服務
正如之前的文章中所介紹的,在 Spring Boot 中,我們可以利用JavaMailSender工具來實現(xiàn)郵件的自動推送,示例如下。
@Service
public class SendMailService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SendMailService.class);
    @Value("${spring.mail.username}")
    private String from;
    @Autowired
    private JavaMailSender mailSender;
    /**
     * 發(fā)送簡單郵件
     *
     * @param mail
     */
    public boolean send(Mail mail) {
        String to = mail.getTo();// 目標郵箱
        String title = mail.getTitle();// 郵件標題
        String content = mail.getContent();// 郵件正文
        SimpleMailMessage message = new SimpleMailMessage();
        message.setFrom(from);
        message.setTo(to);
        message.setSubject(title);
        message.setText(content);
        try {
            mailSender.send(message);
            LOGGER.info("郵件發(fā)送成功");
            return true;
        } catch (MailException e) {
            LOGGER.error("郵件發(fā)送失敗, to: {}, title: {}", to, title, e);
            return false;
        }
    }
}4.7、編寫 controller 接口
接著,編寫一個 controller 接口,將郵件發(fā)送服務暴露出去,示例如下:
@RestController
public class MailController {
    @Autowired
    private ProduceService produceService;
    @PostMapping("send")
    public String sendMail(Mail mail) {
        boolean result = produceService.sendByAck(mail);
        return result ? "success": "fail";
    }
}4.8、服務測試
最后,啟動 SpringBoot 服務,用 postman 來測試一下。
圖片
查看控制臺信息。
圖片
查詢接受者郵件信息。
圖片
可以清楚的看到,郵件發(fā)送成功!
當大批量的發(fā)送郵件,也不用擔心,因為整個郵件的發(fā)送都是異步的,不會阻塞主流程的運行。
05、消費失敗的處理方案
雖然以上的方案非常可靠,可以保證發(fā)出的消息 100% 被消費,但是其實也有弊端。
試想一下,按照上面的處理邏輯,假設其中有一條消息,因為某種原因一直發(fā)送失敗,會出現(xiàn)什么樣的情況?
此時,這條消息會重新返回隊列,然后一直重試,會導致其它的消息可能會無法被消費。
針對這種情況,最簡單粗暴的辦法就是,當重試失敗之后將消息丟棄,不會阻礙其它的消息被正常處理,不過會丟失數(shù)據(jù)。
那么如何正確的處理消息消費失敗的問題呢?
可以借助數(shù)據(jù)庫來記錄消費失敗的數(shù)據(jù),針對系統(tǒng)無法成功處理的消息,人工進行干預。
實踐過程如下!
5.1、創(chuàng)建一張消息日志表
首先,在數(shù)據(jù)庫中創(chuàng)建一張消息日志表,用于跟蹤消息數(shù)據(jù)的狀態(tài),示例如下:
CREATE TABLE `msg_log` (
  `msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一標識',
  `exchange` varchar(100) NOT NULL DEFAULT '' COMMENT '交換機',
  `route_key` varchar(100) NOT NULL DEFAULT '' COMMENT '路由鍵',
  `queue_name` varchar(100) NOT NULL DEFAULT '' COMMENT '隊列名稱',
  `msg` text COMMENT '消息體, json格式化',
  `result` varchar(255) DEFAULT NULL COMMENT '處理結果',
  `status` int(11) NOT NULL DEFAULT '0' COMMENT '狀態(tài),0:等待消費,1:消費成功,2:消費失敗,9:重試失敗',
  `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重試次數(shù)',
  `next_try_time` datetime DEFAULT NULL COMMENT '下一次重試時間',
  `create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時間',
  `update_time` datetime DEFAULT NULL COMMENT '更新時間',
  PRIMARY KEY (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='mq消息日志';5.2、改寫生產(chǎn)者邏輯
在生產(chǎn)者服務類中,先將消息數(shù)據(jù)寫入數(shù)據(jù)庫,再向 rabbitMQ 服務中發(fā)消息,示例如下:
@Service
public class ProduceService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MsgLogService msgLogService;
    /**
     * 發(fā)送消息
     * @param mail
     * @return
     */
    public boolean sendByAuto(Mail mail) {
        String msgId = UUID.randomUUID().toString().replaceAll("-", "");
        mail.setMsgId(msgId);
        // 1.存儲要消費的數(shù)據(jù)
        msgLogService.save("mail.exchange", "route.mail.auto", "mq.mail.auto", msgId, mail);
        // 2.發(fā)送消息到mq服務器中(附帶消息ID)
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend("mail.exchange", "route.mail.auto", MessageHelper.objToMsg(mail), correlationData);
        return true;
    }
}5.3、改寫消費者邏輯
在消費者服務類中,收到消息之后,不管處理成功還是失敗,都只會修改數(shù)據(jù)庫中的消息狀態(tài),并且消息處理失敗時,不再重新返回隊列。
@Component
public class ConsumerService {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerService.class);
    @Autowired
    private SendMailService sendMailService;
    @Autowired
    private MsgLogService msgLogService;
    /**
     * 監(jiān)聽消息隊列,自動確認模式,無需調(diào)用ack或者nack方法,當程序執(zhí)行時才刪除消息
     * 配置參數(shù):spring.rabbitmq.listener.simple.acknowledge-mode=auto
     * @param message
     */
    @RabbitListener(queues = {"mq.mail.auto"})
    public void consumeFromAuto(Message message) {
        LOGGER.info("收到消息:{}", message.toString());
        // 獲取消息ID
        Mail mail = MessageHelper.msgToObj(message, Mail.class);
        // 消息冪等性處理,如果已經(jīng)處理成功,無需重復消費
        MsgLog queryObj = msgLogService.selectByMsgId(mail.getMsgId());
        if(Objects.nonNull(queryObj) && Constant.SUCCESS.equals(queryObj.getStatus())){
            return;
        }
        // 發(fā)送郵件
        boolean success = sendMailService.send(mail);
        if(success){
            msgLogService.updateStatus(mail.getMsgId(), Constant.SUCCESS, "郵件發(fā)送成功");
        } else {
            msgLogService.updateStatus(mail.getMsgId(), Constant.FAIL, "郵件發(fā)送失敗");
        }
    }
}因為此處采用自動確認模式,因此還需要修改application.properties中的配置參數(shù),內(nèi)容如下:
# 設置自動確認(默認此模式)
spring.rabbitmq.listener.simple.acknowledge-mode=auto5.4、編寫定時任務對失敗消息進行補償投遞
當消息消費失敗時,會自動記錄到數(shù)據(jù)庫。
實際上,不可能每條數(shù)據(jù)都需要我們進行干預,有的可能重試一次就好了,因此可以編寫一個定時任務,將消費失敗的數(shù)據(jù)篩選出來,重新放入到消息隊列中,只有當消費次數(shù)達到設置的最大值,此時進入人工干預階段,可以節(jié)省不少的工作。
示例如下:
@Component
public class ScheduledTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTask.class);
    /**
     * 最大投遞次數(shù)
     */
    private static final int MAX_TRY_COUNT = 3;
    @Autowired
    private MsgLogService msgLogService;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 每30s拉取消費失敗的消息, 重新投遞
     */
    @Scheduled(cron = "0/30 * * * * ?")
    public void retry() {
        LOGGER.info("開始執(zhí)行重新投遞消費失敗的消息!");
        // 查詢需要重新投遞的消息
        List<MsgLog> msgLogs = msgLogService.selectFailMsg();
        for (MsgLog msgLog : msgLogs) {
            if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
                msgLogService.updateStatus(msgLog.getMsgId(), Constant.RETRY_FAIL, msgLog.getResult());
                LOGGER.info("超過最大重試次數(shù), msgId: {}", msgLog.getMsgId());
                break;
            }
            // 重新投遞消息
            CorrelationData correlationData = new CorrelationData(msgLog.getMsgId());
            rabbitTemplate.convertAndSend("", msgLog.getQueueName(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);
            // 更新下次重試時間
            msgLogService.updateNextTryTime(msgLog.getMsgId(), msgLog.getTryCount());
        }
    }
}最后別忘了,在Application類上添加@EnableScheduling,以便讓定時調(diào)度生效,示例如下:
@EnableScheduling
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class,args);
    }
}利用定時任務,對投遞失敗的消息進行補償投遞,基本可以保證消息 100% 消費成功!
06、小結
本文主要以實現(xiàn)郵件自動推送這個業(yè)務場景為例,通過 Springboot 整合 rabbitMQ 技術來實現(xiàn)高可用的效果。
當然,解決這個業(yè)務需求的技術方案還有很多,例如 Springboot 整合 rocketMQ 也可以實現(xiàn)這個效果,不管怎么變,底層的實現(xiàn)思路基本都一樣。
希望本篇的知識總結,對大家有所幫助。
最后,代碼都經(jīng)過自測,想要獲取項目源碼的同學,可以點擊如下地址獲取。
示例代碼地址:
https://gitee.com/pzblogs/spring-boot-example-demo














 
 
 














 
 
 
 