偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

基于 Netty 服務端快速了解核心組件

開發(fā) 網(wǎng)絡
由于Netty優(yōu)秀的設計和封裝,開發(fā)一個高性能網(wǎng)絡程序就變得非常簡單,本文從一個簡單的服務端落地簡單介紹一下Netty中的幾個核心組件,希望對你有幫助。

由于Netty優(yōu)秀的設計和封裝,開發(fā)一個高性能網(wǎng)絡程序就變得非常簡單,本文從一個簡單的服務端落地簡單介紹一下Netty中的幾個核心組件,希望對你有幫助。

快速落地一個服務端

我們希望通過Netty快速落地一個簡單的主從reactor模型,由主reactor對應的線程組接收連接交由acceptor創(chuàng)建連接,與之建立的客戶端的讀寫事件都會交由從reactor對應的線程池處理:

基于此設計,我們通過Netty寫下如下代碼,可以看到我們做了如下幾件事:

  • 聲明一個服務端創(chuàng)建引導類ServerBootstrap ,負責配置服務端及其啟動工作。
  • 聲明主從reactor線程組,其中boss可以看作監(jiān)聽端口接收新連接的線程組,而work則是負責處理客戶端數(shù)據(jù)讀寫的線程組。
  • 基于上述線程池作為group的入?yún)⑼瓿芍鲝膔eactor模型創(chuàng)建。
  • 通過channel函數(shù)指定server channe為NioServerSocketChannel即采用NIO模型,而NioServerSocketChannel我們可以直接理解為serverSocket的抽象表示。
  • 通過childHandler方法給引導設置每一個連接數(shù)據(jù)讀寫的處理器handler。

最后調(diào)用bind啟動服務端并通過addListener對連接結果進行異步監(jiān)聽:

public static void main(String[] args) {
        //1. 聲明一個服務端創(chuàng)建引導類
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //2. 聲明主從reactor線程組
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());


        serverBootstrap.group(boss, worker)//3. 基于上述線程池創(chuàng)建主從reactor模型
                .channel(NioServerSocketChannel.class)//server channel采用NIO模型
                .childHandler(new ChannelInitializer<NioSocketChannel>() {//添加客戶端讀寫請求處理器到subreactor中
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 對于ChannelInboundHandlerAdapter,收到消息后會按照順序執(zhí)行即 A -> B->ServerHandler
                        ch.pipeline().addLast(new InboundHandlerA())
                                .addLast(new InboundHandlerB())
                                .addLast(new ServerHandler());

                        // 處理寫數(shù)據(jù)的邏輯,順序是反著的 B -> A
                        ch.pipeline().addLast(new OutboundHandlerA())
                                .addLast(new OutboundHandlerB())
                                .addLast(new OutboundHandlerC());

                        ch.pipeline().addLast(new ExceptionHandler());
                    }
                });
        //綁定8080端口并設置回調(diào)監(jiān)聽結果
        serverBootstrap.bind("127.0.0.1", 8080)
                .addListener(f -> {
                    if (f.isSuccess()) {
                        System.out.println("連接成功");
                    }
                });
    }

對于客戶端的發(fā)送的數(shù)據(jù),我們都會通過ChannelInboundHandlerAdapter添加順序處理,就如代碼所示我們的執(zhí)行順序為InboundHandlerA->InboundHandlerB->ServerHandler,對此我們給出InboundHandlerA的代碼,InboundHandlerB內(nèi)容一樣就不展示了:

public class InboundHandlerA extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerA : " + ((ByteBuf)msg).toString(StandardCharsets.UTF_8));
        //將當前的處理過的msg轉交給pipeline的下一個ChannelHandler
        super.channelRead(ctx, msg);
    }
}

而ServerHandler的則是:

  • 客戶端與服務端建立連接,對應客戶端channel被激活,觸發(fā)channelActive方法。
  • ChannelHandlerContext 的 Channel 已注冊到其 EventLoop 中,執(zhí)行channelRegistered。
  • 將 ChannelHandler 添加到實際上下文并準備好處理事件后調(diào)用。

解析客戶端的數(shù)據(jù)然后回復Hello Netty client :

private static class ServerHandler extends ChannelInboundHandlerAdapter {

  @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("channel被激活,執(zhí)行channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            System.out.println("執(zhí)行channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            System.out.println("執(zhí)行handlerAdded");
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            //打印讀取到的數(shù)據(jù)
            System.out.println(new Date() + ": 服務端讀到數(shù)據(jù) -> " + byteBuf.toString(StandardCharsets.UTF_8));

            // 回復客戶端數(shù)據(jù)
            System.out.println(new Date() + ": 服務端寫出數(shù)據(jù)");
            //組裝數(shù)據(jù)并發(fā)送
            ByteBuf out = getByteBuf(ctx);
            ctx.channel().writeAndFlush(out);
            super.channelRead(ctx, msg);
        }

        private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
            ByteBuf buffer = ctx.alloc().buffer();

            byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8);

            buffer.writeBytes(bytes);

            return buffer;
        }

      //......
    }

我們通過telnet 對應ip和端口后發(fā)現(xiàn)服務端輸出如下內(nèi)容,也很我們上文說明的一致:

執(zhí)行handlerAdded
執(zhí)行channelRegistered
端口綁定成功,channel被激活,執(zhí)行channelActive

然后我們發(fā)送消息 1,可以看到觸發(fā)所有inbound的channelRead方法:

InBoundHandlerA : 1
InBoundHandlerB: 1
Wed Jul 24 00:05:18 CST 2024: 服務端讀到數(shù)據(jù) -> 1

然后我們回復hello netty client,按照添加的倒敘觸發(fā)OutBoundHandler:

Wed Jul 24 00:05:18 CST 2024: 服務端寫出數(shù)據(jù)
OutBoundHandlerC: Hello Netty client 
OutBoundHandlerB: Hello Netty client 
OutBoundHandlerA: Hello Netty client 

詳解Netty中的核心組件

channel接口

channel是Netty對于底層class socket中的bind、connect、read、write等原語的封裝,簡化了我們網(wǎng)絡編程的復雜度,同時Netty也提供的各種現(xiàn)成的channel,我們可以根據(jù)個人需要自行使用。 下面便是筆者比較常用的Tcp或者UDP中比較常用的幾種channel。

  • NioServerSocketChannel:基于NIO選擇器處理新連接。
  • EpollServerSocketChannel:使用 linux EPOLL Edge 觸發(fā)模式實現(xiàn)最大性能的實現(xiàn)。
  • NioDatagramChannel:發(fā)送和接收 AddressedEnvelope 的 NIO 數(shù)據(jù)報通道。
  • EpollDatagramChannel:使用 linux EPOLL Edge 觸發(fā)模式實現(xiàn)最大性能的 DatagramChannel 實現(xiàn)。

EventLoop接口

在Netty中,所有channel都會注冊到某個eventLoop上, 每一個EventLoopGroup中有一個或者多個EventLoop,而每一個EventLoop都綁定一個線程,負責處理一個或者多個channel的事件:

這里我們也簡單的給出NioEventLoop中的run方法,它繼承自SingleThreadEventExecutor,我們可以大概看到NioEventLoop的核心邏輯本質(zhì)就是輪詢所有注冊到NioEventLoop上的channel(socket的抽象)是否有其就緒的事件,然后

  @Override
    protected void run() {
        for (;;) {
            try {
             //基于selectStrategy輪詢查看是否有就緒事件
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
      //......

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                //根據(jù)IO配比執(zhí)行網(wǎng)絡IO事件方法processSelectedKeys以及其他事件方法runAllTasks
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
           //......
        }
    }

pipeline和channelHandler以channelHandlerContext

每一個channel的事件都會交由channelHandler處理,而負責同一個channel的channelHandler都會交由pipeline一條邏輯鏈進行連接,這兩者的的關系都會一一封裝成channelHandlerContext,channelHandlerContext主要是負責當前channelHandler和與其同一條channelpipeline上的其他channelHandler之間的交互。

舉個例子,當我們收到客戶端的寫入數(shù)據(jù)時,這些數(shù)據(jù)就會交由pipeline上的channelHandler處理,如下圖所示,從第一個channelHandler處理完成之后,每個channelHandlerContext就會將消息轉交到當前pipeline的下一個channelHandler處理:

假設我們的channelHandler執(zhí)行完ChannelActive后,如希望繼續(xù)傳播則會調(diào)用fireChannelActive:

 @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("端口綁定成功,channel被激活,執(zhí)行channelActive");
            ctx.fireChannelActive()
        }

查看其內(nèi)部邏輯即可知曉,它就是通過AbstractChannelHandlerContext得到pipeline的下一個ChannelHandler并執(zhí)行其channelActive方法:

 @Override
    public ChannelHandlerContext fireChannelActive() {
        final AbstractChannelHandlerContext next = findContextInbound();
        invokeChannelActive(next);
        return this;
    }

回調(diào)的思想

我們可以說回調(diào)其實是一種設計思想,Netty對于連接或者讀寫操作都是異步非阻塞的,所以我們希望在連接被建立進行一些響應的處理,那么Netty就會在連接建立的時候方法暴露一個回調(diào)方法供用戶實現(xiàn)個性邏輯。

例如我們的channel連接被建立時,其底層就會調(diào)用invokeChannelActive獲取我們綁定的ChannelInboundHandler并執(zhí)行其channelActive方法:

private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

于是就會調(diào)用到我們服務端ServerHandler 的channelActive方法:

private static class ServerHandler extends ChannelInboundHandlerAdapter {

        //......

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("端口綁定成功,channel被激活,執(zhí)行channelActive");
        }
  //......
}

Future異步監(jiān)聽

為保證網(wǎng)絡服務器執(zhí)行的效率,Netty大部分網(wǎng)絡IO操作都采用異步的,以筆者建立連接設置的監(jiān)聽器為例,當前連接成功后,就會返回給監(jiān)聽器一個java.util.concurrent.Future,我們就可以通過這個f獲取連接的結果是否成功:

//綁定8080端口并設置回調(diào)監(jiān)聽結果
        serverBootstrap.bind("127.0.0.1", 8080)
                .addListener(f -> {
                    if (f.isSuccess()) {
                        System.out.println("連接成功");
                    }
                });

我們步入DefaultPromise的addListener即可發(fā)現(xiàn)其內(nèi)部就是添加監(jiān)聽后判斷這個連接的異步任務Future是否完成,如果完成調(diào)用notifyListeners回調(diào)我們的監(jiān)聽器的邏輯:

@Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        //......
  //添加監(jiān)聽
        synchronized (this) {
            addListener0(listener);
        }
  //連接任務完成,通知監(jiān)聽器
        if (isDone()) {
            notifyListeners();
        }

        return this;
    }
責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關推薦

2022-09-30 10:44:47

Netty組件數(shù)據(jù)

2023-09-11 10:53:32

2024-05-30 08:04:20

Netty核心組件架構

2024-04-10 10:09:07

2016-03-18 09:04:42

swift服務端

2023-07-26 10:21:26

服務端組件客戶端

2011-04-22 10:13:35

SimpleFrame

2012-03-02 10:38:33

MySQL

2013-03-25 10:08:44

PHPWeb

2021-10-14 08:39:17

Java Netty Java 基礎

2023-06-30 09:46:00

服務物理機自動化

2022-10-08 00:01:00

ssrvuereact

2024-01-16 08:05:53

2010-08-03 09:59:30

NFS服務

2016-11-03 09:59:38

kotlinjavaspring

2021-05-25 08:20:37

編程技能開發(fā)

2010-03-19 18:17:17

Java Server

2010-02-24 15:42:03

WCF服務端安全

2009-08-21 15:22:56

端口偵聽

2022-12-29 08:56:30

監(jiān)控服務平臺
點贊
收藏

51CTO技術棧公眾號