支持億級連接并開源的分布式MQTT消息服務器分享
今天給各位分享一款開源的分布式MQTT消息服務器EMQX,此消息服務器幾乎是物聯(lián)網(wǎng)系統(tǒng)的標配同時也適合做即時通知和推送服務場景,在作者之前參與的項目中主要用于做物聯(lián)網(wǎng)系統(tǒng)邊緣設備信息采集、以及交易所行情數(shù)據(jù)推送使用,下面是EMQX 相關介紹。
什么是 EMQX
EMQX 是一款開源的大規(guī)模分布式 MQTT 消息服務器,功能豐富,專為物聯(lián)網(wǎng)和實時通信應用而設計。EMQX 5.0 單集群支持 MQTT 并發(fā)連接數(shù)高達 1 億條,單服務器的傳輸與處理吞吐量可達每秒百萬級 MQTT 消息,同時保證毫秒級的低時延。
EMQX 支持多種協(xié)議,包括 MQTT (3.1、3.1.1 和 5.0)、HTTP、QUIC 和 WebSocket 等,保證各種網(wǎng)絡環(huán)境和硬件設備的可訪問性。EMQX 還提供了全面的 SSL/TLS 功能支持,比如雙向認證以及多種身份驗證機制,為物聯(lián)網(wǎng)設備和應用程序提供可靠和高效的通信基礎設施。
圖片
內置基于 SQL 的規(guī)則引擎,EMQX 可以實時提取、過濾、豐富和轉換物聯(lián)網(wǎng)數(shù)據(jù)。此外,EMQX 采用了無主分布式架構,以確保高可用性和水平擴展性,并提供操作友好的用戶體驗和出色的可觀測性。
EMQX 提供了開源版和商業(yè)版兩種方式,用戶可以基于自己需求進行選擇。
官網(wǎng)地址:https://www.emqx.io
github 地址:https://github.com/emqx/emqx
為什么說專為物聯(lián)網(wǎng)和實時通信設計?
物聯(lián)網(wǎng)方面
以下是幾個理由說明為什么MQTT適合物聯(lián)網(wǎng):
- 輕量級和低帶寬消耗:MQTT協(xié)議設計簡單輕量,消息頭部開銷小,傳輸數(shù)據(jù)量少,使其非常適合在低帶寬、不穩(wěn)定的網(wǎng)絡環(huán)境下使用。這對于許多物聯(lián)網(wǎng)設備來說非常重要,因為它們通常具有資源受限的特點,如有限的處理能力、內存和電池壽命。
- 可靠性和持久性:MQTT支持可靠的消息傳遞,并且具有消息持久性。設備可以發(fā)布消息并確保消息可靠地傳遞到服務器,即使在網(wǎng)絡連接中斷后,也可以在重新連接后接收未傳遞的消息。這對于物聯(lián)網(wǎng)應用來說非常重要,因為設備可能會經歷網(wǎng)絡不穩(wěn)定、斷開和重新連接等情況。
- 異步通信和發(fā)布-訂閱模式:MQTT使用發(fā)布-訂閱模式,設備可以通過訂閱特定主題來接收感興趣的消息,而無需直接與其他設備進行點對點通信。
- 支持廣播和多播:MQTT可以通過使用通配符和主題過濾器,實現(xiàn)消息的廣播和多播。這意味著一個設備可以發(fā)布一條消息,并且多個訂閱者可以接收到該消息,從而實現(xiàn)了一對多和多對多的通信模式。
- 支持安全性和認證:MQTT協(xié)議提供了各種安全機制,包括傳輸層安全性(TLS/SSL)和身份驗證機制,以確保數(shù)據(jù)的保密性和完整性。這對于物聯(lián)網(wǎng)應用來說至關重要,因為許多物聯(lián)網(wǎng)設備處理的是敏感數(shù)據(jù)。
實時通信設方面
- 即時通訊(Instant Messaging):EMQ X可以用作即時通訊系統(tǒng)的后端,支持實時的消息傳遞和即時聊天功能。它可以處理大量的并發(fā)連接和消息交換,保證實時性和可靠性。
- 在線游戲(Online Gaming):在線游戲通常需要實時的玩家互動和消息傳遞。EMQ X可以作為游戲服務器的消息中間件,處理游戲玩家之間的實時通信和事件傳遞,支持實時游戲場景的需求。
- 即時通知和推送服務:EMQ X可以用于構建實時通知和推送服務,例如本人之前基于EMQX做過交易所的行情數(shù)推送,實時新聞推送、社交網(wǎng)絡通知等。
- 實時監(jiān)控和數(shù)據(jù)分發(fā):EMQ X適用于實時監(jiān)控和數(shù)據(jù)分發(fā)應用,例如物流監(jiān)控、設備狀態(tài)監(jiān)測、實時數(shù)據(jù)分析等。它可以接收和分發(fā)實時數(shù)據(jù)流,支持實時事件處理和數(shù)據(jù)流轉換。
- 即時位置共享:EMQ X可以用于構建實時位置共享應用,例如實時定位服務、共享出行等。它可以處理實時位置數(shù)據(jù)的接收和分發(fā),支持實時位置更新和共享。
分布式集群設計原理
MQX 本身支持分布式集群架構,能夠在保證高可用性、容錯性和可擴展性的同時,處理大量的客戶端和消息。通過使用 EMQX 集群,您可以在一個或多個節(jié)點發(fā)生故障時仍然保持集群運行,從而享受到容錯和高可用性的好處。
以下是一個四個節(jié)點組成的EMQ集群,每個節(jié)點都運行一個 EMQX 實例,并且與集群中的其他節(jié)點通信,共享客戶端連接、訂閱、發(fā)布消息等信息。這允許集群在節(jié)點之間自動分配負載并在節(jié)點出現(xiàn)故障時提供高可用性
圖片
在集群架構下,我們可以隨著業(yè)務的增長向集群添加新節(jié)點,從而提供可擴展性。這樣可以處理越來越多的客戶端和消息,而不必擔心單個代理的限制。
消息轉發(fā)設計
EMQX 分布式集群的基本功能是轉發(fā)和發(fā)布消息到訂閱者,如下圖所示。
圖片
為了實現(xiàn)這一目標,EMQX 在 嵌入式數(shù)據(jù)庫 Mria 中維護著與之相關的幾個數(shù)據(jù)表:
- 訂閱表
- 路由表
- 主題樹
訂閱表:主題-訂閱者
EMQX 會維護一個訂閱表,用于存儲主題->訂閱者之間的映射關系,從而確保能將傳入消息正確路由到對應的客戶端。該數(shù)據(jù)只存在于訂閱者所在的 EMQX 節(jié)點上,類似的結構如下:
bash
node1:
topic1 -> client1, client2
topic2 -> client3
node2:
topic1 -> client4
路由表:Topic-Node
路由表記錄了 主題->節(jié)點 之間的映射,它存儲每個節(jié)點上客戶端訂閱的主題列表,并用于將消息路由到對應的節(jié)點。該數(shù)據(jù)會在同一集群中的所有節(jié)點復制一份,類似結構如下:
bash
topic1 -> node1, node2
topic2 -> node3
topic3 -> node2, node4
主題樹:主題匹配通配符
主題樹是一種分層數(shù)據(jù)結構,它存儲有關主題層次結構的信息,并用于消息與訂閱客戶端的匹配。
主題樹會在同一集群中的所有節(jié)點復制一份,下面是一個 主題-訂閱關系 的例子:
Client | Node | Subscribed topic |
client1 | node1 | t/+/x, t/+/y |
client2 | node2 | t/# |
client3 | node3 | t/+/x, t/a |
當所有的訂閱完成后,EMQX 會維護以下主題樹和路由表。
圖片
消息分發(fā)流程
當一個 MQTT 客戶端發(fā)布消息時,它所在的節(jié)點會查找路由表,并根據(jù)消息主題將消息轉發(fā)到對應的節(jié)點(可能是多個節(jié)點)。
然后,接收到消息的節(jié)點會查找本地訂閱表,并將消息發(fā)送至對應的訂閱者。
例如,當客戶端 1 發(fā)布一條消息到主題 t/a 時,消息在節(jié)點之間的路由和分發(fā)如下:
- 客戶端 1 向節(jié)點 1 發(fā)布一條主題為 t/a 的消息;
- 節(jié)點 1 查詢主題樹,了解到 t/a 與現(xiàn)有主題 t/a 和 t/# 相匹配。
- 節(jié)點 1 查詢路由表,并得知:
節(jié)點 2 上有客戶端訂閱了 t/# 主題;
節(jié)點 3 上有客戶端訂閱了 t/a 主題;因此節(jié)點 1 會將消息同時轉發(fā)給節(jié)點 2 和節(jié)點 3。
- 節(jié)點 2 收到轉發(fā)的t/a消息后,通過查詢本地訂閱表,將消息分發(fā)給訂閱了 t/# 的客戶端。
- 節(jié)點 3 收到轉發(fā)的 t/a 消息后,通過查詢本地訂閱表,將消息分發(fā)給訂閱了 t/a 的客戶端。
- 消息發(fā)布完成。
連接數(shù)測試
5.0支持并發(fā)連接數(shù)高達 1 億條測試報告:https://www.emqx.com/zh/blog/reaching-100m-mqtt-connections-with-emqx-5-0
快速體驗
安裝
容器化部署是體驗 EMQX 的最快方式,因此本節(jié)將以容器化部署為例,在命令行工具中輸入如下命令,下載并運行最新版 EMQX。
docker pull emqx/emqx:5.5.1
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.5.
圖片
通過瀏覽器訪問 http://localhost:18083/(localhost 可替換為您的實際 IP 地址)以訪問 EMQX Dashboard 管理控制臺,進行設備連接與相關指標監(jiān)控管理,默認用戶名及密碼:admin/public。
圖片
登錄成功之后如下圖
圖片
示例編寫
圖片
下面我們使用Java 語言,寫一個示例,發(fā)送消息至主題mytopic ,訂閱端分布為Java后端程序和JS訂閱
Maven依賴項
創(chuàng)建工程并添加Maven依賴項,這里依賴的paho是 mqtt 的一個工具類
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
創(chuàng)建發(fā)送消息代碼
package cn.g2link.seg.base.mqtt.test;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
public class MqttPublishExample {
public static void main(String[] args) {
//emq 的 tcp監(jiān)聽端口
String broker = "tcp://localhost:1883";
String clientId = "mqtt_client1";
//發(fā)送的主題
String topic = "mytopic";
//消息體
String message = "Hello, MQTT!";
try {
MqttClient mqttClient = new MqttClient(broker, clientId);
mqttClient.connect();
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttTopic.publish(mqttMessage);
mqttClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
創(chuàng)建訂閱消息代碼
package cn.g2link.seg.base.mqtt.test;
import org.eclipse.paho.client.mqttv3.*;
public class MqttSubscribeExample {
public static void main(String[] args) {
//emq 的 tcp監(jiān)聽端口
String broker = "tcp://localhost:1883";
String clientId = "mqtt_subsribe_client1";
//監(jiān)聽的主題
String topic = "mytopic";
try {
MqttClient mqttClient = new MqttClient(broker, clientId);
mqttClient.connect();
System.out.println("connect success" );
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost!");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
System.out.println("Received message: " + payload);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Not used in this example
}
});
mqttClient.subscribe(topic);
System.out.println(String.format("topic:%s subscribe success ", topic));
// Keep the program running to receive messages
while (true) {
// Do nothing
}
} catch (MqttException e) {
e.printStackTrace();
}
}
}
我們通過mqtt.min.js,來連接EMQX暴露的 webscoket 為8083端口,同時訂閱mytopic主題
<html>
<head>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
// 將在全局初始化一個 mqtt 變量
console.log(mqtt)
// 創(chuàng)建一個 MQTT 客戶端實例
var client = mqtt.connect('mqtt://localhost:8083/mqtt', {
clientId: 'web-mqtt-client' // 替換為您的客戶端ID
});
// 連接成功時的回調函數(shù)
client.on('connect', function () {
console.log('已連接到 MQTT 服務器');
// 訂閱主題
client.subscribe('mytopic'); // 替換為您要訂閱的主題
});
// 接收到消息時的回調函數(shù)
client.on('message', function (topic, message) {
console.log('收到消息:', message.toString());
// 在這里處理收到的消息,可以根據(jù)需要進行相應的邏輯操作
});
// 連接斷開時的回調函數(shù)
client.on('close', function () {
console.log('與 MQTT 服務器的連接已斷開');
});
// 連接錯誤時的回調函數(shù)
client.on('error', function (error) {
console.log('連接發(fā)生錯誤:', error);
});
</script>
</head>
<body>
</body>
</html>
監(jiān)控消息(可選)
在主題監(jiān)控頁面添加mytopic,這一步主要為了觀察發(fā)送和消費的次數(shù)
圖片
示例驗證
訂閱端啟動
點擊MqttSubscribeExample的 main 方法啟動訂閱
圖片
圖片
啟動成功以后,會在EQMX 控制臺,顯示客戶端連接信息,如下圖顯示了兩個訂閱端
圖片
消息發(fā)送
啟動MqttPublishExample的 main 方法,進行消息發(fā)送,發(fā)送后訂閱端會收到以下消息
Java 后端
圖片
瀏覽器端
圖片
主題監(jiān)控
查看EQMX 控制臺的主題監(jiān)控,會看到當前topic 流入和流出條數(shù)
圖片
總結
以上只是簡單介紹了什么是 EMQX 以及它的應用場景介紹,要想更多了解EMQX細節(jié),可以訪問官方進行了解。