NioEventLoop源碼解析
本文轉(zhuǎn)載自微信公眾號(hào)「源碼學(xué)徒」,作者皇甫嗷嗷叫 。轉(zhuǎn)載本文請(qǐng)聯(lián)系源碼學(xué)徒公眾號(hào)。
源碼分析
上一節(jié)課,我們就 new NioEventLoopGroup();的初始化過(guò)程做了一個(gè)深度的解析,后來(lái)我們發(fā)現(xiàn),NioEventLoopGroup在初始化過(guò)程中會(huì)構(gòu)建一個(gè)執(zhí)行器數(shù)組,數(shù)組內(nèi)部存儲(chǔ)的元素是NioEventLoop類型的,但是NioEventLoop是什么呢?為什么說(shuō)他是Netty的精髓呢?
我們直接進(jìn)入到NioEventLoop看他的構(gòu)造方法:
上一節(jié)課我們是在循環(huán)填充執(zhí)行器數(shù)組的過(guò)程中創(chuàng)建的,具體參見(jiàn)上一節(jié)課的for循環(huán)中的 newChild方法,這里直接分析源碼
- NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
- SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
- EventLoopTaskQueueFactory queueFactory) {
- //保存外部線程任務(wù)newTaskQueue(queueFactory)
- super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
- rejectedExecutionHandler);
- this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
- this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
- final SelectorTuple selectorTuple = openSelector();
- this.selector = selectorTuple.selector;
- this.unwrappedSelector = selectorTuple.unwrappedSelector;
- }
關(guān)于super我們一會(huì)再往后追
一、保存選擇器生產(chǎn)者
- this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
是綁定了一個(gè)類似于生產(chǎn)者的東西,使我們?cè)俪跏蓟疦ioEventLoopGroup的時(shí)候初始化的,使用該生產(chǎn)者,后續(xù)可以獲取選擇器或者Socket通道等!
二、保存選擇器
- this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
保存一個(gè)默認(rèn)的選擇策略到NioEventLoop對(duì)象里面
三、開(kāi)啟一個(gè)選擇器
- final SelectorTuple selectorTuple = openSelector();
開(kāi)啟一個(gè)選擇器包裝對(duì)象,內(nèi)含一個(gè)選擇器!Netty官方為了Netty性能的進(jìn)一步優(yōu)化,喪心病狂的對(duì)這個(gè)選擇器也進(jìn)行了優(yōu)化,我們跟進(jìn)一下openSelector方法,看看他是如何優(yōu)化的,內(nèi)部代碼比較復(fù)雜,我們逐行分析:
1. 獲取原始的選擇器
- unwrappedSelector = provider.openSelector();
使用原始的生產(chǎn)者對(duì)象,獲取一個(gè)原始的選擇器,后續(xù)使用!
2. 判斷是否啟動(dòng)用選擇器優(yōu)化
- //禁用優(yōu)化選項(xiàng) 默認(rèn)false
- if (DISABLE_KEY_SET_OPTIMIZATION) {
- //如果不優(yōu)化那么就直接包裝原始的選擇器
- return new SelectorTuple(unwrappedSelector);
- }
DISABLE_KEY_SET_OPTIMIZATION默認(rèn)為false, 當(dāng)禁用優(yōu)化的時(shí)候,會(huì)將selector選擇器直接進(jìn)行包裝返回! 默認(rèn)會(huì)進(jìn)行優(yōu)化,所以一般不會(huì)進(jìn)這個(gè)邏輯分支!
3. 獲取一個(gè)選擇器的類的對(duì)象
- //如果需要優(yōu)化
- //反射獲取對(duì)應(yīng)的類的對(duì)象 SelectorImpl
- Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- return Class.forName(
- "sun.nio.ch.SelectorImpl",
- false,
- PlatformDependent.getSystemClassLoader());
- } catch (Throwable cause) {
- return cause;
- }
- }
- });
這個(gè)代碼是返回一個(gè) SelectorImpl的Class對(duì)象,這里是返回SelectorImpl的Class對(duì)象!我們由上述代碼可以看出來(lái),如果獲取失敗,會(huì)返回一個(gè)異常,異常的話肯定不行,所以就要對(duì)可能會(huì)發(fā)生的異常做出操作:
- //如果沒(méi)有獲取成功
- f (!(maybeSelectorImplClass instanceof Class) ||
- // 確保當(dāng)前的選擇器實(shí)現(xiàn)是我們可以檢測(cè)到的。 判斷是 unwrappedSelector的子類或者同類
- !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
- //發(fā)生異常
- if (maybeSelectorImplClass instanceof Throwable) {
- Throwable t = (Throwable) maybeSelectorImplClass;
- logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
- }
- //還是包裝為未優(yōu)化的選擇器
- return new SelectorTuple(unwrappedSelector);
如果發(fā)生了異常,或者獲取的和原始選擇器不是一個(gè)對(duì)象,就還使用原始選擇器包裝返回!
4. 創(chuàng)建一個(gè)優(yōu)化后的selectKeys
- final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
使用過(guò)NIO的都應(yīng)該知道,使用選擇器是能夠獲取一個(gè)事件的Set集合的,這里Netty官方自己實(shí)現(xiàn)了一個(gè)Set集合,內(nèi)部使用數(shù)組來(lái)進(jìn)行優(yōu)化!因?yàn)镠ashset集合是使用HashMap方法來(lái)實(shí)現(xiàn)的,(this ->Object) 再添加元素的時(shí)候如果發(fā)生了hash碰撞的話會(huì)遍歷hash槽上的鏈表 算法復(fù)雜度為O(n),但是數(shù)組不一樣 數(shù)組是O(1) 所以Netty官方使用數(shù)組來(lái)優(yōu)化選擇器事件集合 默認(rèn)是1024 滿了之后2倍擴(kuò)容!
大家可以簡(jiǎn)單的把它看做一個(gè)Set集合,只不過(guò)他是使用數(shù)組的形式來(lái)實(shí)現(xiàn)的!內(nèi)部重寫了add、size、iterator的方法,其余方法全部廢棄!
這個(gè)是Netty對(duì)選擇器優(yōu)化的一個(gè)重要對(duì)象,使得再追加事件的時(shí)候,算法復(fù)雜度由O(N)直接變?yōu)榱薕(1)!
5. 開(kāi)始進(jìn)行反射替換selectedKeys
- //開(kāi)始進(jìn)行反射替換
- Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- //獲取選擇器事件中的 事件對(duì)象 selectedKeys的屬性對(duì)象
- Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
- //獲取公開(kāi)選擇的密鑰
- Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
- //java9且存在 Unsafe的情況下 直接替換內(nèi)存空間的數(shù)據(jù)
- if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
- // 讓我們嘗試使用sun.misc.Unsafe替換SelectionKeySet。
- // 這使我們也可以在Java9 +中執(zhí)行此操作,而無(wú)需任何額外的標(biāo)志。
- long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
- long publicSelectedKeysFieldOffset =
- PlatformDependent.objectFieldOffset(publicSelectedKeysField);
- if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
- PlatformDependent.putObject(
- unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
- PlatformDependent.putObject(
- unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
- return null;
- }
- // 如果沒(méi)法直接替換內(nèi)存空間的數(shù)據(jù) 就想辦法用反射
- }
- //java8或者java9+上述未操作完成的 使用反射來(lái)替換
- Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
- if (cause != null) {
- return cause;
- }
- cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
- if (cause != null) {
- return cause;
- }
- //開(kāi)始進(jìn)行替換 將我們創(chuàng)建的優(yōu)化后的事件數(shù)組來(lái)反射的替換進(jìn)選擇器中
- selectedKeysField.set(unwrappedSelector, selectedKeySet);
- publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
- return null;
- } catch (NoSuchFieldException e) {
- return e;
- } catch (IllegalAccessException e) {
- return e;
- }
- }
- });
代碼雖然多,但是,邏輯比較簡(jiǎn)單!
- 首先獲取SelectorImpl類對(duì)象的 selectedKeys屬性和publicSelectedKeys屬性!
- 判斷使用的JDK版本是不是9以上,如果使用的9的話,直接操作JAVA的Unsafe對(duì)象操作系統(tǒng)的內(nèi)存空間!有關(guān)Unsafe的介紹,再零拷貝章節(jié)介紹的很詳細(xì),可以復(fù)習(xí)零拷貝章節(jié)! 我們這里使用的JDK8
- 如果使用的是JDK8,使用反射,將我們創(chuàng)建出來(lái)的SelectedKeys的優(yōu)化對(duì)象SelectedSelectionKeySet反射的替換進(jìn)unwrappedSelector這個(gè)原始的選擇器!
6. 包裝選擇器
- return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
首先把unwrappedSelector選擇器包裝為 SelectedSelectionKeySetSelector包裝類!
再把unwrappedSelector和SelectedSelectionKeySetSelector對(duì)應(yīng)起來(lái),包裝Wie元組返回!
四、保存優(yōu)化后的選擇器和原始選擇器
- this.selector = selectorTuple.selector;
- this.unwrappedSelector = selectorTuple.unwrappedSelector;
五、調(diào)用父類,創(chuàng)建隊(duì)列
- super(parent, executor, false, newTaskQueue(queueFactory),
- newTaskQueue(queueFactory),rejectedExecutionHandler);
首先,他會(huì)通過(guò)newTaskQueue構(gòu)建兩個(gè)隊(duì)列 ,這兩個(gè)隊(duì)列是什么類型的呢?
上一節(jié)課我們分析過(guò),queueFactory == null,所以會(huì)走如圖分支代碼,DEFAULT_MAX_PENDING_TASKS如果沒(méi)有指定的話,默認(rèn)是Integer.MAX,最小為16!我們進(jìn)入到 該分支代碼看一下,他創(chuàng)建的是一個(gè)什么隊(duì)列:
- public static <T> Queue<T> newMpscQueue() {
- return Mpsc.newMpscQueue();
- }
可以看出他創(chuàng)建的是一個(gè)Mpsc隊(duì)列,他是一個(gè)多生產(chǎn)者,單消費(fèi)者隊(duì)列,是由jctools框架提供的,后續(xù)如果可以,我會(huì)具體對(duì)該隊(duì)列進(jìn)行一個(gè)講解,我們到這里就知道,再創(chuàng)建NIOEventLoop的時(shí)候,向父類內(nèi)部傳遞了兩個(gè)Mpsc隊(duì)列,我們繼續(xù)回到主線:
進(jìn)入到super(xxx)的源碼中:
- protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
- RejectedExecutionHandler rejectedExecutionHandler) {
- super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
- //保存一個(gè) tailTasks 尾部隊(duì)列
- tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
- }
這里會(huì)保存一個(gè)隊(duì)列,尾部隊(duì)列,這個(gè)尾部隊(duì)列,官方的意思是想對(duì)Netty 的運(yùn)行狀態(tài)做一些統(tǒng)計(jì)數(shù)據(jù),例如任務(wù)循環(huán)的耗時(shí)、占用物理內(nèi)存的大小等等,但是實(shí)際上應(yīng)用tailTasks的場(chǎng)景極少,這里不做太多講解!
我們繼續(xù)跟進(jìn)到super方法源碼里面:
- //parent 線程執(zhí)行器 false mpsc隊(duì)列 拒絕策略
- protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
- boolean addTaskWakesUp, Queue<Runnable> taskQueue,
- RejectedExecutionHandler rejectedHandler) {
- super(parent);
- this.addTaskWakesUp = addTaskWakesUp;
- this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
- //保存線程執(zhí)行器
- this.executor = ThreadExecutorMap.apply(executor, this);
- //創(chuàng)建一個(gè)隊(duì)列 Mpscq,外部線程執(zhí)行的時(shí)候使用這個(gè)隊(duì)列(不是在EventLoop的線程內(nèi)執(zhí)行的時(shí)候) newTaskQueue(queueFactory)
- this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
- //保存拒絕策略
- this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
- }
這里是進(jìn)一步保存,將該NioEventLoop對(duì)應(yīng)的線程執(zhí)行器 、MpscQuerey任務(wù)隊(duì)列、對(duì)應(yīng)的拒絕策略保存起來(lái)! 大家再后續(xù)看到使用對(duì)應(yīng)變量的代碼千萬(wàn)不要覺(jué)得陌生哦!
總結(jié)
創(chuàng)建和保存了兩個(gè)多生產(chǎn)者單消費(fèi)者隊(duì)列tailTasks和taskQueue
保存一個(gè)線程執(zhí)行器executor
保存一個(gè)拒絕策略,該拒絕策略主要用于隊(duì)列滿了之后如何處理!
保存一個(gè)選擇器生產(chǎn)者!
創(chuàng)建一個(gè)優(yōu)化后的選擇器,并進(jìn)行保存!
將原始選擇器和優(yōu)化后的選擇器進(jìn)行保存!






























