如何基于Netty實現即時消息下發(fā)
想象一個場景,你與女友在網上聊天,她問了一句:你愛我嗎?然后很忐忑地等你回答??傻攘撕靡欢螘r間,你才收到她的消息,趕緊回復了一句:愛,愛你一萬年。又過了好久,你女友才收到你的回復。這時,你說的是什么已經不重要了,準備回去跪鍵盤吧。可以說這樣的用戶體驗非常的糟糕。
采用輪詢拉取消息就會出現上面的場景,而如果采用長連接的方式,服務器可以與客戶端建立一條實時的連接,服務器有新消息可以直接推送給客戶端,不需要等待客戶端請求,這樣既保證了實時性,整個系統(tǒng)的抗壓能力也優(yōu)于大量輪詢的方式。
什么是長連接通信
那么,什么是長連接呢?我們都知道短連接是什么,比如我們熟悉的 HTTP 協(xié)議,就是使用短連接的方式來請求數據的,它先是建立連接,然后進行數據傳輸,最后關閉連接。而且只能由客戶端主動發(fā)起請求,數據傳輸之后,連接就關閉了,服務端無法主動給客戶端發(fā)送數據。
而長連接是和短連接相對的,它的過程是:建立連接—>數據傳輸...(保持連接)……數據傳輸—>關閉連接??蛻舳伺c服務端建立連接之后,客戶端和服務端保持住連接不斷開,就可以一直在這個連接上傳輸數據,直到一方主動關閉連接。
圖片
如何建立長連接通信
那怎么建立長連接通信呢?我們常見的網絡服務例如 Tomcat、Apache 等主要都是面向短連接的,對長連接支持不是很好。而且長連接需要服務端長期保持連接,如果有大量的連接同時在線,服務端的壓力會非常大,所以,就需要一套高性能的網絡框架來支撐。幸運的是,有 Netty 這樣的異步網絡框架來幫助我們管理連接。
你可能多少了解過 Netty,它是基于事件驅動的,易開發(fā)、易維護、高性能,完全滿足長連接通信的需求。我們使用 Netty 實現我們的服務端,當然也可以實現客戶端,但是我們的客戶端一般會根據不同的平臺采用不同的實現方案。
有服務端,客戶端之后,我們還不可以進行通信,因為缺少一個通信協(xié)議。我們知道,進行短連接通信的時候采用的是 HTTP 協(xié)議,而這次我們要采用 MQTT,一個物聯(lián)網的標準信息傳輸協(xié)議。它是一個十分輕量級的發(fā)布/訂閱模型協(xié)議,占用網絡帶寬極小,因為它的固定消息頭只占 2 字節(jié),已經被廣泛應用于電信、汽車、工業(yè)制造等領域。
服務端、協(xié)議、客戶端,我們都已經知道采用的方案了,來看下整體系統(tǒng)結構:
圖片
現在,我們一起動手實現這樣一個消息下發(fā)服務端吧。我們只需要引入 Netty 的 jar 包即可,代碼如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.54.Final</version>
</dependency>
啟動一個 Netty 服務,如同我們正常啟動 Java 程序一樣:
public static void main(String[] args) {
int port = 1883;
if (args.length >= 1) {
port = Integer.parseInt(args[1]);
}
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(MqttEncoder.INSTANCE);
pipeline.addLast(new MqttDecoder());
//處理MQTT消息
pipeline.addLast(MyMqttHandlers.INSTANCE);
}
});
//啟動服務
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("MQTT server start success,port=" + port);
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
通過這段代碼我們啟動了 Netty 長連接服務,服務端口是 1883,客戶端可以通過這個端口連接到服務端,Netty 本身已經實現了 TCP 連接的建立、管理以及 MQTT 協(xié)議的編碼和解碼等,我們只需要按照需求實現自己的業(yè)務邏輯即可。上述代碼中,我們只需要實現 MyMqttHandlers.INSTANCE,來完成我們對客戶端連接的認證、Topic 訂閱、消息發(fā)布、心跳檢測等等。
上面的 MyMqttHandlers 類,是我們自定義的 Netty Handler,用來處理 MQTT 業(yè)務數據,它繼承自 Netty 的適配器 SimpleChannelInboundHandler,僅需覆寫我們關心的方法 channelRead0,根據不同的 MQTT 報文類型做處理。
.....
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) {
switch (msg.fixedHeader().messageType()) {
case CONNECT:
connect(ctx, (MqttConnectMessage) msg);
break;
case SUBSCRIBE:
subscribe(ctx, (MqttSubscribeMessage) msg);
break;
case PINGREQ:
pingReq(ctx);
break;
//...處理其他報文
default:
}
}
....
通信報文處理
怎么處理這些報文呢?MQTT 采用的是發(fā)布訂閱模式的消息通信協(xié)議,通過交換預定義的 MQTT 控制報文來通信。這里簡單介紹下 MQTT 協(xié)議的內容,因為在我們進行編碼的時候需要解析消息內容、回復 ACK 消息、發(fā)布消息,了解消息結構,可以更好地編碼。
MQTT 控制報文由固定報頭、可變報頭、有效載荷三部分組成,具體格式如下表:
根據 MQTT 3.1.1 規(guī)定,固定報頭的控制報文類型共有 14 種,我們這次主要使用 CONNECT(連接服務端)、SUBSCRIBE(訂閱主題)、PUBLISH(發(fā)布消息)、PINGRESP(心跳響應)這四種報文以及對應的 ACK 報文。
CONNECT 報文如何處理?客戶端到服務端的網絡連接建立后,客戶端發(fā)送給服務端的第一個報文必須是 CONNECT 報文,這個報文傳輸設備標識、用戶標識、密碼等信息,通過這個報文,服務端需判斷要不要和客戶端連接,常用的方法就是鑒權。如果校驗失敗,就可以在 ACK 報文中設置狀態(tài)碼 CONNECTION_REFUSED_xxx;檢驗成功,則設置為 CONNECTION_ACCEPTED。鑒權成功之后,我們就可以把該設備的信息入庫保存,實際場景中,我們把設備的實時狀態(tài)維護在 Redis 中,保證高的吞吐量。
代碼如下:
private void connect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String clientIdentifier = msg.payload().clientIdentifier();
String userName = msg.payload().userName();
String password = new String(msg.payload().passwordInBytes());
//此處可以鑒權
System.out.println(clientIdentifier + " " + userName + " " + password);
//此處保存用戶和連接之間的關系
ChannelManager.saveChannelMapping(clientIdentifier, ctx.channel());
MqttFixedHeader connAckFixedHeaderRes = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
//連接成功設置為MqttConnectReturnCode.CONNECTION_ACCEPTED,失敗可以返回其他狀態(tài)碼
MqttConnAckVariableHeader connAckVariableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
MqttConnAckMessage ackMessage = new MqttConnAckMessage(connAckFixedHeaderRes, connAckVariableHeader);
ctx.channel().writeAndFlush(ackMessage);
}
SUBSCRIBE 報文一般作為 CONNECT 之后的下一個報文,客戶端上報它需要的 Topic,服務端可以根據客戶端的訂閱情況,針對性地推送消息,這個可以是廣播的 Topic(所有用戶都可以收到同一個消息的副本),也可以是點對點的(只有一個用戶收到此消息)。同時,服務端需要存儲 Topic 到 Channel 的關系。代碼如下:
private void subscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) {
List<MqttTopicSubscription> topics = msg.payload().topicSubscriptions();
//存儲客戶端訂閱的主題
ChannelManager.saveTopics(ctx.channel(),
topics.stream().map(MqttTopicSubscription::topicName).collect(Collectors.toList()));
System.out.println("訂閱成功:" + topics);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(msg.variableHeader().messageId(), null);
MqttSubAckPayload payload = new MqttSubAckPayload();
MqttSubAckMessage ackMessage = new MqttSubAckMessage(header, variableHeader, payload);
ctx.writeAndFlush(ackMessage);
}
PUBLISH 報文是我們最終的目標報文,服務端需要根據客戶端訂閱的 Topic 發(fā)送這個報文,由于在處理訂閱消息時,已經保存了 Topic 和 Channel 的映射,所以推送消息就簡單了,只需要找到 Topic 下所有的 Channel,就可以直接寫消息到 Channel 中即可,代碼如下:
List<Channel> channels = ChannelManager.listChannels(topic);
channels.forEach(channel -> {
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, 0);
ByteBuf payload = Unpooled.copiedBuffer(messageData, StandardCharsets.UTF_8);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(fixedHeader, variableHeader, payload);
channel.writeAndFlush(mqttPublishMessage);
});
為何要處理 PINGREQ 報文?鏈路上如果長時間沒有數據傳輸,可能會被運營商把鏈路回收了,所以設備需要在?;钇陂g內至少發(fā)送一個報文,如果沒有實際的數據需要傳輸,那么較小的 PINGREQ 就是最佳選擇。連接保活時間的取值范圍一般為 30 秒~1200 秒。這個可以根據實際情況,不斷調整這個值,我的選擇是是 60 秒,如果網絡環(huán)境好,可以設置 500 秒以上。
實際在線上運行的時候,我發(fā)現有些客戶端就是一直無法連接上,這時可以再結合短輪詢做個備用方案,當多次嘗試之后,無法連接上 MQTT 服務,可以暫時啟動短輪詢,保證用戶可以收到消息。
好了,到目前我們已經處理完核心功能了,其它類型的控制報文和處理流程也類似,這里就不再贅述了,總體報文交換流程如下圖:
圖片
長連接通信測試
我們來試一下效果吧,首先我們需要模擬一個客戶端,同樣的,也可以使用 Netty 實現一個客戶端,主要流程和服務端差不多,有一點需要注意的是,客戶端需要定時發(fā)送心跳到服務端,以保證鏈路不會因為長時間空閑被系統(tǒng)斷開。
測試流程如下:
- 啟動服務端,端口在 1883
- 啟動客戶端,連接到服務端 127.0.0.1:1883
- 客戶端訂閱 Topic,名稱為 demo
- 服務端每隔 1 秒向 demo 發(fā)送一個當前時間的消息
看下運行效果:
圖片
總結
以上就是我今天的分享,通過 Netty 和 MQTT,我們可以實現一個高性能的消息下發(fā)系統(tǒng),當然,我今天講的是最基本的功能實現,當連接數超過一臺機器的上限時,就需要設計一個可擴展的架構。我把整體的知識點匯總成一張思維導圖,供你參考。
圖片