Netty的常用編解碼器與使用
本文轉(zhuǎn)載自微信公眾號(hào)「源碼學(xué)徒」,作者皇甫嗷嗷叫。轉(zhuǎn)載本文請(qǐng)聯(lián)系源碼學(xué)徒公眾號(hào)。
我們本章節(jié)將了解基本的編解碼器以及自定義編解碼器的使用,在了解之前,我們先看一段代碼:
一、開發(fā)服務(wù)端
1.開發(fā)服務(wù)端的Handler
- /**
 - * *********************************************************************
 - * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
 - * *********************************************************************
 - *
 - * @author huangfu
 - * @date 2021/5/6 21:22
 - */
 - public class CodecServerHandler extends ChannelInboundHandlerAdapter {
 - @Override
 - public void channelActive(ChannelHandlerContext ctx) throws Exception {
 - //開啟一個(gè)定時(shí)任務(wù)
 - ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
 - ByteBufAllocator aDefault = ByteBufAllocator.DEFAULT;
 - ByteBuf byteBuf = aDefault.directBuffer();
 - //向客戶端寫一句話
 - byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!".getBytes(StandardCharsets.UTF_8));
 - ctx.writeAndFlush(byteBuf);
 - }, 10, 10, TimeUnit.MILLISECONDS);
 - }
 - @Override
 - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 - cause.printStackTrace();
 - super.exceptionCaught(ctx, cause);
 - }
 - }
 
2. 開發(fā)服務(wù)端的Server
- /**
 - * *********************************************************************
 - * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
 - * *********************************************************************
 - *
 - * @author huangfu
 - * @date 2021/5/6 21:20
 - */
 - public class CodecServer {
 - public static void main(String[] args) throws InterruptedException {
 - EventLoopGroup boss = new NioEventLoopGroup(1);
 - EventLoopGroup worker = new NioEventLoopGroup();
 - try {
 - ServerBootstrap serverBootstrap = new ServerBootstrap();
 - serverBootstrap.group(boss, worker)
 - .channel(NioServerSocketChannel.class)
 - .localAddress(8989)
 - .childHandler(new ChannelInitializer<SocketChannel>() {
 - @Override
 - protected void initChannel(SocketChannel ch) throws Exception {
 - ch.pipeline().addLast("codecHandler", new CodecHandler());
 - }
 - });
 - ChannelFuture channelFuture = serverBootstrap.bind().sync();
 - channelFuture.channel().closeFuture().sync();
 - } finally {
 - boss.shutdownGracefully();
 - worker.shutdownGracefully();
 - }
 - }
 - }
 
二、開發(fā)客戶端
1.開發(fā)客戶端的Handler
- /**
 - * *********************************************************************
 - * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
 - * *********************************************************************
 - *
 - * @author huangfu
 - * @date 2021/5/6 21:31
 - */
 - public class CodecClientHandler extends ChannelInboundHandlerAdapter {
 - @Override
 - public void channelActive(ChannelHandlerContext ctx) throws Exception {
 - System.out.println("連接成功");
 - }
 - @Override
 - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 - ByteBuf byteBuf = (ByteBuf) msg;
 - System.out.println(byteBuf.toString(StandardCharsets.UTF_8));
 - super.channelRead(ctx, msg);
 - }
 - }
 
2.開發(fā)客戶端
- /**
 - * *********************************************************************
 - * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
 - * *********************************************************************
 - *
 - * @author huangfu
 - * @date 2021/5/6 21:29
 - */
 - public class CodecClient {
 - public static void main(String[] args) throws InterruptedException {
 - EventLoopGroup worker = new NioEventLoopGroup();
 - try {
 - Bootstrap bootstrap = new Bootstrap();
 - bootstrap.group(worker)
 - .remoteAddress(new InetSocketAddress("127.0.0.1",8989))
 - .channel(NioSocketChannel.class)
 - .handler(new ChannelInitializer<SocketChannel>() {
 - @Override
 - protected void initChannel(SocketChannel ch) throws Exception {
 - ch.pipeline().addLast("codecClientHandler",new CodecClientHandler());
 - }
 - });
 - ChannelFuture channelFuture = bootstrap.connect().sync();
 - channelFuture.channel().closeFuture().sync();
 - }finally {
 - worker.shutdownGracefully();
 - }
 - }
 - }
 
三、結(jié)果演示
上述的代碼相信大家都極其熟悉,就是開發(fā)一個(gè)服務(wù)端和客戶端,當(dāng)客戶端連接到服務(wù)端之后,服務(wù)端每隔10毫秒向客戶端輸出一句話,客戶端收到之后打印出來(lái)!
預(yù)期結(jié)果:
實(shí)際結(jié)果:
我們發(fā)現(xiàn),真正跑起來(lái),卻并沒(méi)有按照我們預(yù)期那樣逐行打印,而是好幾行連在一起打印,而且有些字符還出現(xiàn)了亂碼,這是為什么呢?
了解過(guò)網(wǎng)絡(luò)傳輸?shù)耐瑢W(xué)大概都明白,Socket其實(shí)也是TCP的一種,底層通過(guò)流的方式傳輸,由服務(wù)端發(fā)送的數(shù)據(jù)到客戶端,客戶端的Netty需要重新拼裝為一個(gè)完整的包:
- 當(dāng)傳輸?shù)臄?shù)據(jù)量過(guò)大的時(shí)候,Netty就 分多從拼裝,這就造成了亂碼的現(xiàn)象! 這種現(xiàn)象,術(shù)語(yǔ)叫做半包
 - 當(dāng)Netty讀取的時(shí)候,一次讀取了兩個(gè)數(shù)據(jù)包,那就會(huì)自動(dòng)將兩個(gè)數(shù)據(jù)包合為一個(gè)數(shù)據(jù)包,從而完成封裝為一個(gè)數(shù)據(jù)包,這就是造成好幾行連著打印的問(wèn)題! 這種現(xiàn)象 術(shù)語(yǔ)叫做粘包
 
四、常用的編解碼器
為什么會(huì)發(fā)生粘包、半包!Netty在解析底層數(shù)據(jù)流轉(zhuǎn)換成ByteBuf,但是當(dāng)請(qǐng)求過(guò)于頻繁的時(shí)候,兩次的請(qǐng)求數(shù)據(jù)可能會(huì)被合并為一個(gè),甚至,一次數(shù)據(jù)合并一個(gè)半的數(shù)據(jù)流,此時(shí)因?yàn)閿?shù)據(jù)流字節(jié)的不完全接收,會(huì)導(dǎo)致讀取數(shù)據(jù)不正確或者亂碼等問(wèn)題!
假設(shè),我們預(yù)先知道了這個(gè)數(shù)據(jù)包的一個(gè)規(guī)則,當(dāng)數(shù)據(jù)包規(guī)則不滿足的情況下等待,超過(guò)數(shù)據(jù)規(guī)則限制的時(shí)候進(jìn)行切分,那么是不是就能夠有效的區(qū)分?jǐn)?shù)據(jù)包的界限,從根本上上解決粘包半包的問(wèn)題?
1. 基于換行符的解碼器
LineBasedFrameDecoder
該代碼將以\n或者\(yùn)r\n 作為區(qū)分?jǐn)?shù)據(jù)包的依據(jù),程序在進(jìn)行數(shù)據(jù)解碼的時(shí)候,會(huì)判斷該當(dāng)前的數(shù)據(jù)包內(nèi)是否存在\n或者\(yùn)r\n,當(dāng)存在的時(shí)候會(huì)截取以\n或者\(yùn)r\n的一段字符,作為一個(gè)完整的數(shù)據(jù)包!
客戶端增加解碼器:
CodecClient:
- .handler(new ChannelInitializer<SocketChannel>() {
 - @Override
 - protected void initChannel(SocketChannel ch) throws Exception {
 - //增加數(shù)據(jù)包解碼器基于換行符的解碼器
 - ch.pipeline().addLast("lineBasedFrameDecoder", new LineBasedFrameDecoder(Integer.MAX_VALUE));
 - ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
 - }
 - });
 
服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:
CodecServerHandler:
- ByteBuf byteBuf = aDefault.directBuffer();
 - //增加一個(gè)換行符
 - byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!\n".getBytes(StandardCharsets.UTF_8));
 - ctx.writeAndFlush(byteBuf);
 
效果圖:
2. 基于自定義換行符的解碼器
DelimiterBasedFrameDecoder
該代碼將以自定義符號(hào)作為區(qū)分?jǐn)?shù)據(jù)包的依據(jù),程序在進(jìn)行數(shù)據(jù)解碼的時(shí)候,會(huì)判斷該當(dāng)前的數(shù)據(jù)包內(nèi)是否存在指定的自定義的符號(hào),當(dāng)存在的時(shí)候會(huì)截取以自定義符號(hào)為結(jié)尾的一段字符,作為一個(gè)完整的數(shù)據(jù)包!
客戶端增加解碼器:
CodecClient:
- .handler(new ChannelInitializer<SocketChannel>() {
 - @Override
 - protected void initChannel(SocketChannel ch) throws Exception {
 - ByteBuf byteBuf = Unpooled.copiedBuffer("|".getBytes(StandardCharsets.UTF_8));
 - ch.pipeline().addLast("delimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, byteBuf));
 - ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
 - }
 - });
 
服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:
CodecServerHandler:
- ByteBuf byteBuf = aDefault.directBuffer();
 - //末尾增加一個(gè)指定的字符
 - byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!|".getBytes(StandardCharsets.UTF_8));
 - ctx.writeAndFlush(byteBuf);
 
效果圖:
3. 基于固定長(zhǎng)度的解碼器
FixedLengthFrameDecoder
定長(zhǎng)數(shù)據(jù)解碼器適用于每次發(fā)送的數(shù)據(jù)包是一個(gè)固定長(zhǎng)度的場(chǎng)景,指定每次讀取的數(shù)據(jù)包的數(shù)據(jù)長(zhǎng)度來(lái)進(jìn)行解碼操作!
我們查看我們的數(shù)據(jù)總共長(zhǎng)度是多少:
- 無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!
 
經(jīng)過(guò)計(jì)算為213各字符,我們假設(shè)以后的數(shù)據(jù)都是這個(gè),我們就可以使用固定字符串,作為區(qū)分一個(gè)完整數(shù)據(jù)包的依據(jù):
客戶端增加解碼器:
CodecClient:
- .handler(new ChannelInitializer<SocketChannel>() {
 - @Override
 - protected void initChannel(SocketChannel ch) throws Exception {
 - //指定一個(gè)完整數(shù)據(jù)包的長(zhǎng)度為213個(gè)
 - ch.pipeline().addLast("fixedLengthFrameDecoder", new FixedLengthFrameDecoder(213));
 - ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
 - }
 - });
 
服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:
CodecServerHandler:
- ByteBuf byteBuf = aDefault.directBuffer();
 - //發(fā)送原數(shù)據(jù) 不做任何更改
 - byteBuf.writeBytes("無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!".getBytes(StandardCharsets.UTF_8));
 - ctx.writeAndFlush(byteBuf);
 
效果圖:
4. 基于不定長(zhǎng)的解碼器
LengthFieldBasedFrameDecoder
不定長(zhǎng)長(zhǎng)度域解碼器的使用是用在我們不確定數(shù)據(jù)包的大小的場(chǎng)景下,這也是比較常用的一個(gè)解碼器
客戶端增加解碼器:
CodecClient:
- .handler(new ChannelInitializer<SocketChannel>() {
 - @Override
 - protected void initChannel(SocketChannel ch) throws Exception {
 - ch.pipeline().addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
 - ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
 - }
 - });
 
服務(wù)端數(shù)據(jù)結(jié)構(gòu)發(fā)生改變:
CodecServerHandler:
- ByteBuf byteBuf = aDefault.directBuffer();
 - byte[] bytes = "無(wú)論是任何的源碼學(xué)習(xí),永遠(yuǎn)都是枯燥、乏味的,他遠(yuǎn)沒(méi)有寫出一段很牛逼的代碼有成就感!但是當(dāng)你登堂入室的那一刻,你會(huì)發(fā)現(xiàn),源碼的閱讀是如此的享受!".getBytes(StandardCharsets.UTF_8);
 - byteBuf.writeInt(bytes.length);
 - byteBuf.writeBytes(bytes);
 - ctx.writeAndFlush(byteBuf);
 
他的參數(shù)比較多,我們做幾個(gè)基本的認(rèn)識(shí):
maxFrameLength:本次能接收的最大的數(shù)據(jù)長(zhǎng)度
lengthFieldOffset:設(shè)置的長(zhǎng)度域的偏移量,長(zhǎng)度域在數(shù)據(jù)包的起始位置,所以偏移量為0
lengthFieldLength:長(zhǎng)度域的長(zhǎng)度,例子使用的是Int占4位 所以參數(shù)為4
lengthAdjustment:數(shù)據(jù)包的偏移量,計(jì)算方式=數(shù)據(jù)長(zhǎng)度 +lengthAdjustment=數(shù)據(jù)總長(zhǎng)度 這里數(shù)據(jù)包的總長(zhǎng)度=lengthFieldLength ,所以不需要補(bǔ)充,所以參數(shù)為0
initialBytesToStrip:需要跳過(guò)的字節(jié)數(shù),這里我們只關(guān)注真正的數(shù)據(jù),不關(guān)注數(shù)據(jù)包的長(zhǎng)度,所以我們把長(zhǎng)度域跳過(guò)去,長(zhǎng)度域?yàn)?,所以跳過(guò)4
效果圖:
5. 自定義編解碼器
I. ByteToMessageDecoder
需求:我們需要在解碼器中就將ByteBuf解碼,并轉(zhuǎn)成字符串,后面直接打印
開發(fā)一個(gè)自定義的解碼器:
- /**
 - * *********************************************************************
 - * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
 - * 自定義一個(gè)基于固定長(zhǎng)度的解碼器,當(dāng)解碼成功后,將數(shù)據(jù)轉(zhuǎn)成字符串
 - * *********************************************************************
 - *
 - * @author huangfu
 - * @date 2021/5/7 22:43
 - */
 - public class MyByteToMessageDecoder extends ByteToMessageDecoder {
 - private Integer length;
 - public MessageEqualDecoder(Integer length) {
 - this.length = length;
 - }
 - @Override
 - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
 - //當(dāng)前的可讀字節(jié)數(shù)
 - int readableBytes = in.readableBytes();
 - //當(dāng)可讀字節(jié)數(shù)超過(guò)預(yù)設(shè)數(shù)量的時(shí)候
 - if(readableBytes >= length) {
 - byte[] bytes = new byte[length];
 - //讀取出來(lái)
 - in.readBytes(bytes);
 - //轉(zhuǎn)換成字符串 并添加進(jìn)集合中
 - out.add(new String(bytes, StandardCharsets.UTF_8));
 - }
 - }
 - }
 
客戶端處理器開發(fā):
CodecClientHandler
- /**
 - * *********************************************************************
 - * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
 - * *********************************************************************
 - *
 - * @author huangfu
 - * @date 2021/5/6 21:31
 - */
 - public class CodecClientHandler extends ChannelInboundHandlerAdapter {
 - @Override
 - public void channelActive(ChannelHandlerContext ctx) throws Exception {
 - System.out.println("連接成功");
 - }
 - @Override
 - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 - //解碼器已經(jīng)將數(shù)據(jù)轉(zhuǎn)換成字符串了,這里直接強(qiáng)壯為字符串使用
 - String msgStr = (String) msg;
 - System.out.println(msgStr);
 - super.channelRead(ctx, msg);
 - }
 - }
 
客戶端開發(fā):
CodecClient
- public class CodecClient {
 - public static void main(String[] args) throws InterruptedException {
 - EventLoopGroup worker = new NioEventLoopGroup();
 - try {
 - Bootstrap bootstrap = new Bootstrap();
 - bootstrap.group(worker)
 - .remoteAddress(new InetSocketAddress("127.0.0.1", 8989))
 - .channel(NioSocketChannel.class)
 - .handler(new ChannelInitializer<SocketChannel>() {
 - @Override
 - protected void initChannel(SocketChannel ch) throws Exception {
 - //添加自定義的解碼器
 - ch.pipeline().addLast("messageEqualDecoder", new MyByteToMessageDecoder(213));
 - ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
 - }
 - });
 - ChannelFuture channelFuture = bootstrap.connect().sync();
 - channelFuture.channel().closeFuture().sync();
 - } finally {
 - worker.shutdownGracefully();
 - }
 - }
 - }
 
效果圖:
II. MessageToMessageDecoder
需求:我們?cè)偕厦孀远x的解碼器的基礎(chǔ)上增加一個(gè)需求,要求上一個(gè)解碼器解碼出來(lái)的數(shù)據(jù),在傳播到客戶端的時(shí)候,需用[]包裹住。
開發(fā)自定義的消息轉(zhuǎn)換器(泛型為String的原因是 上一個(gè)解碼器已經(jīng)將其轉(zhuǎn)換為了String):
- /**
 - * 將消息用[]包裹起來(lái)
 - *
 - * @author huangfu
 - * @date 2021年5月8日08:25:21
 - */
 - public class MyMessageToMessageDecoder extends MessageToMessageDecoder<String> {
 - @Override
 - protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
 - if(!StringUtil.isNullOrEmpty(msg)){
 - out.add(String.format("[%s]", msg));
 - }
 - }
 - }
 
客戶端開發(fā):
CodecClient
- /**
 - * *********************************************************************
 - * 歡迎關(guān)注公眾號(hào): 【源碼學(xué)徒】
 - * *********************************************************************
 - *
 - * @author huangfu
 - * @date 2021/5/6 21:29
 - */
 - public class CodecClient {
 - public static void main(String[] args) throws InterruptedException {
 - EventLoopGroup worker = new NioEventLoopGroup();
 - try {
 - Bootstrap bootstrap = new Bootstrap();
 - bootstrap.group(worker)
 - .remoteAddress(new InetSocketAddress("127.0.0.1",8989))
 - .channel(NioSocketChannel.class)
 - .handler(new ChannelInitializer<SocketChannel>() {
 - @Override
 - protected void initChannel(SocketChannel ch) throws Exception {
 - //添加自定義的解碼器
 - ch.pipeline().addLast("messageEqualDecoder", new MyByteToMessageDecoder(213));
 - ch.pipeline().addLast("myMessageToMessageDecoder", new MyMessageToMessageDecoder());
 - ch.pipeline().addLast("codecClientHandler", new CodecClientHandler());
 - }
 - });
 - ChannelFuture channelFuture = bootstrap.connect().sync();
 - channelFuture.channel().closeFuture().sync();
 - }finally {
 - worker.shutdownGracefully();
 - }
 - }
 - }
 
效果圖:
6. 心跳檢測(cè)
我們現(xiàn)在假設(shè)有一個(gè)客戶端與服務(wù)端,客戶端與服務(wù)端進(jìn)行數(shù)據(jù)交互,服務(wù)端探測(cè)到客戶端5秒沒(méi)有發(fā)送數(shù)據(jù) 3次以上關(guān)閉連接!
開發(fā)一個(gè)心跳服務(wù)端處理器
- /**
 - * 心跳處理的Handler
 - *
 - * @author huangfu
 - * @date 2021年5月8日09:03:46
 - */
 - public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
 - /**
 - * 讀空閑次數(shù)
 - */
 - private int readIdleTimes = 0;
 - @Override
 - public void channelActive(ChannelHandlerContext ctx) throws Exception {
 - System.out.println("客戶端連接:"+ ctx.channel().remoteAddress());
 - super.channelActive(ctx);
 - }
 - @Override
 - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 - ByteBuf byteBuf = (ByteBuf) msg;
 - String string = byteBuf.toString(StandardCharsets.UTF_8);
 - System.out.println(string);
 - //有數(shù)據(jù) 次數(shù)歸0
 - readIdleTimes = 0;
 - super.channelRead(ctx, msg);
 - }
 - @Override
 - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
 - if (evt instanceof IdleStateEvent) {
 - IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
 - if (idleStateEvent.state() == IdleState.READER_IDLE) {
 - System.out.println("發(fā)生讀空閑");
 - readIdleTimes++;
 - }
 - //3次讀空閑之后,關(guān)閉客戶端連接
 - if (readIdleTimes > 3) {
 - //關(guān)閉客戶端連接
 - System.out.println("客戶端連接被關(guān)閉:"+ ctx.channel().remoteAddress());
 - ctx.close();
 - }
 - }
 - }
 - }
 
開發(fā)一個(gè)心跳服務(wù)端
- /**
 - * 心跳服務(wù)器
 - *
 - * @author huangfu
 - * @date 2021年5月8日08:52:56
 - */
 - public class HeartBeatServer {
 - public static void main(String[] args) {
 - EventLoopGroup boss = new NioEventLoopGroup(1);
 - EventLoopGroup worker = new NioEventLoopGroup();
 - try {
 - ServerBootstrap bootstrap = new ServerBootstrap();
 - bootstrap.group(boss,worker)
 - .channel(NioServerSocketChannel.class)
 - .localAddress(8989)
 - .childHandler(new ChannelInitializer<SocketChannel>() {
 - @Override
 - protected void initChannel(SocketChannel ch) throws Exception {
 - //心跳觸發(fā)器 讀空閑 寫空閑 讀寫空閑5秒的均會(huì)觸發(fā)心跳事件
 - ch.pipeline().addLast(new IdleStateHandler(5,5,5, TimeUnit.SECONDS));
 - ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4,0,4));
 - //定義處理器
 - ch.pipeline().addLast(new HeartBeatServerHandler());
 - }
 - });
 - ChannelFuture channelFuture = bootstrap.bind().sync();
 - channelFuture.channel().closeFuture().sync();
 - } catch (InterruptedException e) {
 - e.printStackTrace();
 - } finally {
 - boss.shutdownGracefully();
 - worker.shutdownGracefully();
 - }
 - }
 - }
 
開發(fā)一個(gè)心跳客戶端處理器
- /**
 - * 客戶端心跳處理
 - *
 - * @author huangfu
 - * @date 2021年5月8日09:29:05
 - */
 - public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
 - @Override
 - public void channelActive(ChannelHandlerContext ctx) throws Exception {
 - System.out.println("通道被激活");
 - super.channelActive(ctx);
 - }
 - @Override
 - public void channelInactive(ChannelHandlerContext ctx) throws Exception {
 - System.out.println("通道被銷毀");
 - super.channelInactive(ctx);
 - }
 - }
 
開發(fā)一個(gè)心跳客戶端
- /**
 - * 心跳消息服務(wù)
 - *
 - * @author huangfu
 - * @date 2021年5月8日09:37:07
 - */
 - public class HeartBeatClient {
 - private static Channel channel = null;
 - private static Scanner sc = new Scanner(System.in);
 - public static void main(String[] args) {
 - EventLoopGroup worker = new NioEventLoopGroup();
 - try {
 - Bootstrap bootstrap = new Bootstrap();
 - bootstrap.group(worker)
 - .channel(NioSocketChannel.class)
 - .remoteAddress("127.0.0.1",8989)
 - .handler(new ChannelInitializer<SocketChannel>() {
 - @Override
 - protected void initChannel(SocketChannel ch) throws Exception {
 - //長(zhǎng)度解碼器
 - ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4,0,4));
 - ch.pipeline().addLast(new HeartBeatClientHandler());
 - }
 - });
 - //連接服務(wù)端
 - ChannelFuture channelFuture = bootstrap.connect().sync();
 - channel = channelFuture.channel();
 - Thread thread = new Thread(HeartBeatClient::writeStr);
 - thread.setDaemon(true);
 - thread.start();
 - channel.closeFuture().sync();
 - } catch (InterruptedException e) {
 - e.printStackTrace();
 - } finally {
 - worker.shutdownGracefully();
 - }
 - }
 - /**
 - * 向服務(wù)端寫入數(shù)據(jù)
 - */
 - public static void writeStr(){
 - while (true) {
 - System.out.print("請(qǐng)輸入要發(fā)送的數(shù)據(jù):");
 - //從鍵盤讀入數(shù)據(jù)
 - String line = sc.nextLine();
 - ByteBuf buffer = Unpooled.buffer();
 - buffer.writeInt(line.length());
 - buffer.writeBytes(line.getBytes(StandardCharsets.UTF_8));
 - //發(fā)送數(shù)據(jù)
 - channel.writeAndFlush(buffer).addListener(future -> {
 - if (future.isSuccess()) {
 - System.out.println("發(fā)送成功");
 - }
 - });
 - }
 - }
 - }
 
























 
 
 










 
 
 
 