干貨|SpringBoot JMS(ActiveMQ)API實踐應用詳解
前言
Active是一種開源的,實現(xiàn)了JMS1.1規(guī)范的,面向消息(MOM)的中間件,為應用程序提供高效的、可擴展的、穩(wěn)定的和安全的企業(yè)級消息通信。AC-tiveMQ使用Apache提供的授權,任何人都可以對其實現(xiàn)代碼進行修改。
ActiveMQ的設計目標是提供標準的,面向消息的,能夠跨越多語言和多系統(tǒng)的應用集成消息通信中間件。ActiveMQ實現(xiàn)了JMS標準并提供了很多附加的特性。本文將帶大家詳細介紹ActiveMQ的API的使用。
1. JMS的概念?
「什么是JMS呢:」
- JMS---------JAVA Message Service
 - JAVA的消息服務,是sun公司提供的接口,只是一個規(guī)范,這個規(guī)范就類似于JDBC是一樣的,使用的時候是需要當前規(guī)范的實現(xiàn)產(chǎn)品的。
 
「JMS能干什么呢:」
- 能夠?qū)⑿畔l(fā)布到目的地
 - 可以從目的地來消費這個消息
 
2、兩種通信模型
「隊列的通信概念:」
- 特點:當我們同一個隊列有多個消費者的時候,多個消費者的數(shù)據(jù)之和才是原來隊列中的所有數(shù)據(jù)
 - 隊列的通信模型最大的適用場景:流量的消峰,高并發(fā)的處理
 
「主題的通信模型:」
- 特點:當我們隊列有多個消費者的時候,那么這多個消費者消費到的數(shù)據(jù)是一樣的
 - 主題消費者通信模型的適用場景:微服務下服務之間的異步通信
 
3. MQ的實現(xiàn)產(chǎn)品
「實現(xiàn)產(chǎn)品:」
- ActiveMQ
 - RabbitMQ
 - RockerMQ
 - Kafka(這個設計的初衷是做分布式的日志的,后來因為日志有嚴格的順序問題,這個時候人們就用Kafka來做消息隊列了)
 
4、JMS中常見的名詞
「常見的名詞:」
- ActiveMQConnectionFactory:這個是創(chuàng)建連接的工廠
 - ConnectionFactory:連接的工廠
 - Connection:連接JAVA對MQ的一個連接
 - Destination:目的地
 - 生產(chǎn)者(Producer)
 - 消費者(Consumer)
 - Session:會話(每一次對MQ的操作都稱為一次會話)
 - Queue:隊列
 - Topic:主題
 
5、什么是消息隊列
「消息隊列簡單的說就是用來存放臨時數(shù)據(jù)的地方:」
- 生產(chǎn)者----------->存儲介質(zhì)上
 - 消費者----------->存儲介質(zhì)上
 
「消息隊列類似于快遞公司:」
- 你可以將東西交給快遞公司
 - 目標人也可以從快遞公司去取東西
 
6. ActiveMQ是什么
「含義:」
- ActiveMQ就是一個JMS的實現(xiàn)產(chǎn)品,它能夠?qū)崿F(xiàn)JMS下的所有功能
 
7、ActiveMQ能干什么
「主要作用:」
- 流量消峰處理
 - 微服務下模塊的異步通信
 - 處理高并發(fā)下的訂單
 - 處理第三方平臺的高并發(fā)
 - 協(xié)助消息表可以完成分布式事務的最終一致性
 
8、ActiveMQ的安裝
「ActiveMQ的安裝和配置:」
- 1、官網(wǎng)下載Linux版的ActiveMQ(最新版本為5.13.4)
 - https://activemq.apache.org/download.html
 - 2、解壓安裝
 - tar -zxvf apache-activemq-5.13.4-bin.tar.gz
 - 3、配置(這里采用默認配置,無需修改)
 - vim /usr/lical/activemq-1/conf/activemq.xml
 - 4、啟動
 - cd /usr/local/activemq-1/bin
 - ./activemq start
 - 5、打開管理界面(管理界面可以查看并管理所有隊列及消息)
 - http://192.168.1.100:8161
 - 啟動成功后,可以瀏覽 http://localhost:8161/admin/
 - 默認用戶名、密碼:admin/admin
 - 管理界面是用jetty做容器的,如果想修改管理界面的端口,可以編輯../conf/jetty.xml,找到下面這一段:
 - <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
 - <!-- the default port number for the web console -->
 - <property name="host" value="0.0.0.0"/>
 - <property name="port" value="8161"/>
 - </bean>
 - 用戶名/密碼是在 ../conf/jetty-realm.properties 里,比如要增加一個管理員jimmy/123456,可參考下面修改:
 - 1
 - 2
 - 3admin: admin, admin
 - jimmy: 123456, admin
 - user: user, user
 - 注:管理界面有一個小坑,ActiveMQ 5.13.2與jdk1.8兼容性有點問題,如果使用jdk1.8,管理界面進入Queues標簽頁時,偶爾會報錯,但是并不影響消息正常收發(fā),只是無法從界面上查看隊列情況,如果出現(xiàn)該問題,可將jdk版本降至1.7,同時最好清空data目錄下的所有數(shù)據(jù),再重啟activemq即可。
 
9. ActiveMQ的API的使用
「AcatveMQ的API使用:」
- 隊列的使用(生產(chǎn)者)
 
- package com.qy.mq.queue;
 - import org.apache.activemq.ActiveMQConnectionFactory;
 - import org.apache.activemq.Message;
 - import javax.jms.*;
 - /**
 - * @Auther: qianyu
 - * @Date: 2020/11/04 14:12
 - * @Description:生產(chǎn)者
 - */
 - public class Producer {
 - //準備發(fā)布的這個地址
 - private static final String PATH="tcp://10.7.182.87:61616";
 - //ActiveMQ下的用戶名
 - private static final String userName="admin";
 - //ActiveMQ下的密碼
 - private static final String password="admin";
 - public static void main(String[] args) throws JMSException {
 - //第一步:創(chuàng)建連接的工廠
 - ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
 - //通過這個工廠獲取連接
 - Connection connection = activeMQConnectionFactory.createConnection();
 - //第三步:打開這個連接
 - connection.start();
 - //第四步:創(chuàng)建操作MQ的這個會話
 - /**
 - * 第一個參數(shù):是否使用事務
 - * 第二個參數(shù):客戶端的應答模式
 - * 第一種:自動應答
 - * 第二種:客戶端手動應答
 - */
 - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 - //需要發(fā)送消息的目的地(queue操作的是隊列)
 - Destination destination=session.createQueue("wqqq");
 - //生產(chǎn)者來生產(chǎn)這個消息
 - //要有生產(chǎn)者
 - MessageProducer messageProducer = session.createProducer(destination);
 - //發(fā)送很多的消息到消息隊列中去
 - // for (int i=0;i<100;i++){
 - //需要準備發(fā)送的消息
 - // TextMessage textMessage = session.createTextMessage("我是淺羽:"+i);
 - //研究消息的類型
 - /* BytesMessage bytesMessage = session.createBytesMessage();
 - bytesMessage.setByteProperty("www",new Byte("123"));
 - //接下來就可以發(fā)送消息了
 - messageProducer.send(bytesMessage);*/
 - //創(chuàng)建map類型的message
 - /*MapMessage mapMessage = session.createMapMessage();
 - mapMessage.setInt("www1",123);
 - messageProducer.send(mapMessage);*/
 - ObjectMessage objectMessage = session.createObjectMessage(new User(1, "qianyu", "123"));
 - messageProducer.send(objectMessage);
 - // }
 - }
 - }
 
- 隊列的使用(消費者)
 
- package com.qy.mq.queue;
 - import org.apache.activemq.ActiveMQConnectionFactory;
 - import javax.jms.*;
 - import java.io.Serializable;
 - /**
 - * @Auther: qianyu
 - * @Date: 2020/11/04 14:13
 - * @Description:消費者
 - */
 - public class Consumer {
 - //準備發(fā)布的這個地址
 - private static final String PATH="tcp://10.7.182.87:61616";
 - //ActiveMQ下的用戶名
 - private static final String userName="admin";
 - //ActiveMQ下的密碼
 - private static final String password="admin";
 - public static void main(String[] args) throws JMSException {
 - //第一步:創(chuàng)建連接的工廠
 - ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
 - //通過這個工廠獲取連接
 - Connection connection = activeMQConnectionFactory.createConnection();
 - //第三步:打開這個連接
 - connection.start();
 - //第四步:創(chuàng)建操作MQ的這個會話
 - /**
 - * 第一個參數(shù):是否使用事務
 - * 第二個參數(shù):客戶端的應答模式
 - * 第一種:自動應答
 - * 第二種:客戶端手動應答
 - */
 - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 - //需要發(fā)送消息的目的地(queue操作的是隊列)
 - Destination destination=session.createQueue("wqqq");
 - //創(chuàng)建我們的消費者了
 - MessageConsumer messageConsumer = session.createConsumer(destination);
 - //接下來就可以接收我們的消息了
 - //Message message = messageConsumer.receive();
 - //接收消息并指定這個超時的時間
 - // Message message = messageConsumer.receive(5000);
 - //接收消息沒有就不等待了 直接over了 不接收了
 - // Message message = messageConsumer.receiveNoWait();
 - //給定當前的路徑設置監(jiān)聽器
 - messageConsumer.setMessageListener(new MessageListener() {
 - @Override
 - public void onMessage(Message message) {
 - /* BytesMessage bytesMessage= (BytesMessage) message;
 - try {
 - System.out.println("獲取到的數(shù)據(jù)是:"+bytesMessage.getByteProperty("www"));
 - } catch (JMSException e) {
 - e.printStackTrace();
 - }*/
 - /*MapMessage mapMessage= (MapMessage) message;
 - try {
 - System.out.println("獲取到的數(shù)據(jù)是:"+mapMessage.getInt("www1"));
 - } catch (JMSException e) {
 - e.printStackTrace();
 - }*/
 - //測試對象類型的消息的發(fā)送和接收
 - ObjectMessage objectMessage= (ObjectMessage) message;
 - try {
 - User user = (User) objectMessage.getObject();
 - System.out.println("接收到的數(shù)據(jù)是:"+user);
 - } catch (JMSException e) {
 - e.printStackTrace();
 - }
 - /* //我們知道是一個字符串類型的消息
 - TextMessage textMessage= (TextMessage) message;
 - //接下來就可以打印這個消息了
 - try {
 - System.out.println("消費者1---接收到的消息是:"+textMessage.getText());
 - } catch (JMSException e) {
 - e.printStackTrace();
 - }*/
 - try {
 - //這句話就表示的是客戶端來手動的進行應答
 - message.acknowledge();
 - } catch (JMSException e) {
 - e.printStackTrace();
 - }
 - }
 - });
 - }
 - }
 
- 主題模型的生產(chǎn)者
 
- package com.qy.mq.topic;
 - import org.apache.activemq.ActiveMQConnectionFactory;
 - import javax.jms.*;
 - /**
 - * @Auther: qianyu
 - * @Date: 2020/11/04 15:17
 - * @Description:
 - */
 - public class Producer {
 - //準備發(fā)布的這個地址
 - private static final String PATH="tcp://10.7.182.87:61616";
 - //ActiveMQ下的用戶名
 - private static final String userName="admin";
 - //ActiveMQ下的密碼
 - private static final String password="admin";
 - public static void main(String[] args) throws JMSException {
 - //第一步:創(chuàng)建連接的工廠
 - ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
 - //通過這個工廠獲取連接
 - Connection connection = activeMQConnectionFactory.createConnection();
 - //第三步:打開這個連接
 - connection.start();
 - //第四步:創(chuàng)建操作MQ的這個會話
 - /**
 - * 第一個參數(shù):是否使用事務
 - * 第二個參數(shù):客戶端的應答模式
 - * 第一種:自動應答
 - * 第二種:客戶端手動應答
 - */
 - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 - //需要發(fā)送消息的目的地(下面創(chuàng)建的就應該是主題模型的地址)
 - Destination destination=session.createTopic("topic222");
 - //生產(chǎn)者來生產(chǎn)這個消息
 - //要有生產(chǎn)者
 - MessageProducer messageProducer = session.createProducer(destination);
 - //發(fā)送很多的消息到消息隊列中去
 - for (int i=0;i<100;i++){
 - //需要準備發(fā)送的消息
 - TextMessage textMessage = session.createTextMessage("我是淺羽:"+i);
 - //接下來就可以發(fā)送消息了
 - messageProducer.send(textMessage);
 - }
 - }
 - }
 
- 主題模型的消費者
 
- package com.qy.mq.topic;
 - import org.apache.activemq.ActiveMQConnectionFactory;
 - import javax.jms.*;
 - /**
 - * @Auther: qianyu
 - * @Date: 2020/11/04 15:19
 - * @Description:
 - */
 - public class Consumer {
 - //準備發(fā)布的這個地址
 - private static final String PATH="tcp://10.7.182.87:61616";
 - //ActiveMQ下的用戶名
 - private static final String userName="admin";
 - //ActiveMQ下的密碼
 - private static final String password="admin";
 - public static void main(String[] args) throws JMSException {
 - //第一步:創(chuàng)建連接的工廠
 - ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
 - //通過這個工廠獲取連接
 - Connection connection = activeMQConnectionFactory.createConnection();
 - //第三步:打開這個連接
 - connection.start();
 - //第四步:創(chuàng)建操作MQ的這個會話
 - /**
 - * 第一個參數(shù):是否使用事務
 - * 第二個參數(shù):客戶端的應答模式
 - * 第一種:自動應答
 - * 第二種:客戶端手動應答
 - */
 - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 - //需要發(fā)送消息的目的地(queue操作的是隊列)
 - Destination destination=session.createTopic("topic222");
 - //創(chuàng)建我們的消費者了
 - MessageConsumer messageConsumer = session.createConsumer(destination);
 - //接下來就可以接收我們的消息了
 - //給定當前的路徑設置監(jiān)聽器
 - messageConsumer.setMessageListener(new MessageListener() {
 - @Override
 - public void onMessage(Message message) {
 - //我們知道是一個字符串類型的消息
 - TextMessage textMessage= (TextMessage) message;
 - //接下來就可以打印這個消息了
 - try {
 - System.out.println("消費者1---接收到的消息是:"+textMessage.getText());
 - } catch (JMSException e) {
 - e.printStackTrace();
 - }
 - try {
 - message.acknowledge();
 - } catch (JMSException e) {
 - e.printStackTrace();
 - }
 - }
 - });
 - }
 - }
 
本篇關于ActiveMQ的介紹就先到這里結束了,后續(xù)會出更多關于ActiveMQ系列更多文章,謝謝大家支持!















 
 
 











 
 
 
 