詳解LongAdder實(shí)現(xiàn)原理
前言
AtomicInteger、AtomicLong使用非阻塞的CAS算法原子性地更新某一個變量,比synchronized這些阻塞算法擁有更好的性能,但是在高并發(fā)情況下,大量線程同時(shí)去更新一個變量,由于同一時(shí)間只有一個線程能夠成功,絕大部分的線程在嘗試更新失敗后,會通過自旋的方式再次進(jìn)行嘗試,嚴(yán)重占用了CPU的時(shí)間片。
AtomicLong的實(shí)現(xiàn)原理圖:

LongAdder是JDK8新增的原子操作類,它提供了一種新的思路,既然AtomicLong的性能瓶頸是由于大量線程同時(shí)更新一個變量造成的,那么能不能把這個變量拆分出來,變成多個變量,然后讓線程去競爭這些變量,最后合并即可?LongAdder的設(shè)計(jì)精髓就在這里,通過將變量拆分成多個元素,降低該變量的并發(fā)度,最后進(jìn)行合并元素,變相的減少了CAS的失敗次數(shù)。
LongAdder的實(shí)現(xiàn)原理圖:

常用方法
- public class LongAdder extends Striped64 implements Serializable {
 - //構(gòu)造方法
 - public LongAdder() {
 - }
 - //加1操作
 - public void increment();
 - //減1操作
 - public void decrement();
 - //獲取原子變量的值
 - public long longValue();
 - }
 
下面給出一個簡單的例子,模擬50線程同時(shí)進(jìn)行更新
- package com.xue.testLongAdder;
 - import java.util.concurrent.ExecutorService;
 - import java.util.concurrent.Executors;
 - import java.util.concurrent.atomic.LongAdder;
 - public class Main {
 - public static void main(String[] args) {
 - LongAdder adder = new LongAdder();
 - ExecutorService threadPool = Executors.newFixedThreadPool(20);
 - for (int i = 0; i < 50; i++) {
 - Runnable r = () -> {
 - adder.add(1);
 - };
 - threadPool.execute(r);
 - }
 - threadPool.shutdown();
 - //若關(guān)閉線程池后,所有任務(wù)執(zhí)行完畢,則isTerminated()返回true
 - while (!threadPool.isTerminated()) {
 - System.out.println(adder.longValue());
 - break;
 - }
 - }
 - }
 
輸出結(jié)果是50
其中,如果對線程池不熟悉的同學(xué),可以先參考我的另外一篇文章說說線程池
原理解析
類圖

LongAdder內(nèi)部維護(hù)了一個Cell類型的數(shù)組,其中Cell是Striped64中的一個靜態(tài)內(nèi)部類。
Cell類
- abstract class Striped64 extends Number {
 - @sun.misc.Contended static final class Cell {
 - volatile long value;
 - Cell(long x) { value = x; }
 - final boolean cas(long cmp, long val) {
 - return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
 - }
 - }
 - }
 
Cell用來封裝被拆分出來的元素,內(nèi)部用一個value字段保存當(dāng)前元素的值,等到需要合并時(shí),則累加所有Cell數(shù)組中的value。Cell內(nèi)部使用CAS操作來更新value值,對CAS操作不熟悉的同學(xué),可以參考我的另外一篇文章淺探CAS實(shí)現(xiàn)原理
可以注意到,Cell類被 @sun.misc.Contended注解修飾了,這個注解是為了解決偽共享問題的,什么是偽共享?
- 一個緩存行可以存儲多個變量(存滿當(dāng)前緩存行的字節(jié)數(shù));而CPU對緩存的修改又是以緩存行為最小單位的,在多線程情況下,如果需要修改“共享同一個緩存行的變量”,就會無意中影響彼此的性能,這就是偽共享(False Sharing)。
 
對偽共享還不理解的同學(xué),可以參考這位大佬的文章偽共享(False Sharing)底層原理及其解決方式
而LongAdder采用的是Cell數(shù)組,而數(shù)組元素是連續(xù)的,因此多個Cell對象共享一個緩存行的情況非常普遍,因此這里@sun.misc.Contended注解對單個Cell元素進(jìn)行字節(jié)填充,確保一個Cell對象占據(jù)一個緩存行,即填充至64字節(jié)。
關(guān)于如何確定一個對象的大小,可以參考我的另外一篇文章對象的內(nèi)存布局,怎樣確定對象的大小,這樣可以算出來,還需要填充多少字節(jié)。
longValue()
longValue()返回累加后的值
- public long longValue() {
 - return sum();
 - }
 - public long sum() {
 - Cell[] as = cells; Cell a;
 - long sum = base;
 - //當(dāng)Cell數(shù)組不為null時(shí),進(jìn)行累加后返回,否則直接返回基準(zhǔn)數(shù)base
 - if (as != null) {
 - for (int i = 0; i < as.length; ++i) {
 - if ((a = as[i]) != null)
 - sum += a.value;
 - }
 - }
 - return sum;
 - }
 
這可能是LongAdder中最簡單的方法了,就不進(jìn)行贅述了。什么,你要看復(fù)雜的?好的,這就來了。
increment()
- public void increment() {
 - add(1L);
 - }
 - public void add(long x) {
 - Cell[] as; long b, v; int m; Cell a;
 - /**
 - * 如果一下兩種條件則繼續(xù)執(zhí)行if內(nèi)的語句
 - * 1. cells數(shù)組不為null(不存在爭用的時(shí)候,cells數(shù)組一定為null,一旦對base的cas操作失敗,
 - * 才會初始化cells數(shù)組)
 - * 2. 如果cells數(shù)組為null,如果casBase執(zhí)行成功,則直接返回,如果casBase方法執(zhí)行失敗
 - * (casBase失敗,說明第一次爭用沖突產(chǎn)生,需要對cells數(shù)組初始化)進(jìn)入if內(nèi);
 - * casBase方法很簡單,就是通過UNSAFE類的cas設(shè)置成員變量base的值為base+要累加的值
 - * casBase執(zhí)行成功的前提是無競爭,這時(shí)候cells數(shù)組還沒有用到為null,可見在無競爭的情況下是
 - * 類似于AtomticInteger處理方式,使用cas做累加。
 - */
 - if ((as = cells) != null || !casBase(b = base, b + x)) {
 - //uncontended判斷cells數(shù)組中,當(dāng)前線程要做cas累加操作的某個元素是否#不#存在爭用,
 - //如果cas失敗則存在爭用;uncontended=false代表存在爭用,uncontended=true代表不存在爭用。
 - boolean uncontended = true;
 - /**
 - *1. as == null : cells數(shù)組未被初始化,成立則直接進(jìn)入if執(zhí)行cell初始化
 - *2. (m = as.length - 1) < 0: cells數(shù)組的長度為0
 - *條件1與2都代表cells數(shù)組沒有被初始化成功,初始化成功的cells數(shù)組長度為2;
 - *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的長度不為0,
 - * 則通過getProbe方法獲取當(dāng)前線程Thread的threadLocalRandomProbe變量的值,初始為0,
 - * 然后執(zhí)行threadLocalRandomProbe&(cells.length-1 ),相當(dāng)于m%cells.length;
 - * 如果cells[threadLocalRandomProbe%cells.length]的位置為null,
 - * 這說明這個位置從來沒有線程做過累加,
 - * 需要進(jìn)入if繼續(xù)執(zhí)行,在這個位置創(chuàng)建一個新的Cell對象;
 - *4. !(uncontended = a.cas(v = a.value, v + x)):
 - * 嘗試對cells[threadLocalRandomProbe%cells.length]位置的Cell對象中的value值做累加操作,
 - * 并返回操作結(jié)果,如果失敗了則進(jìn)入if,重新計(jì)算一個threadLocalRandomProbe;
 - 如果進(jìn)入if語句執(zhí)行l(wèi)ongAccumulate方法,有三種情況
 - 1. 前兩個條件代表cells沒有初始化,
 - 2. 第三個條件指當(dāng)前線程hash到的cells數(shù)組中的位置還沒有其它線程做過累加操作,
 - 3. 第四個條件代表產(chǎn)生了沖突,uncontended=false
 - **/
 - if (as == null || (m = as.length - 1) < 0 ||
 - (a = as[getProbe() & m]) == null ||
 - !(uncontended = a.cas(v = a.value, v + x)))
 - longAccumulate(x, null, uncontended);
 - }
 - }
 
其中l(wèi)ongAccumulate()的解析如下:
- final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
 - //獲取當(dāng)前線程的threadLocalRandomProbe值作為hash值,如果當(dāng)前線程的threadLocalRandomProbe為0,
 - // 說明當(dāng)前線程是第一次進(jìn)入該方法,則強(qiáng)制設(shè)置線程的threadLocalRandomProbe為ThreadLocalRandom類的成員
 - // 靜態(tài)私有變量probeGenerator的值,后面會詳細(xì)將hash值的生成;
 - //另外需要注意,如果threadLocalRandomProbe=0,代表新的線程開始參與cell爭用的情況
 - //1.當(dāng)前線程之前還沒有參與過cells爭用(也許cells數(shù)組還沒初始化,進(jìn)到當(dāng)前方法來就是為了初始化cells數(shù)組
 - //后爭用的),
 - // 是第一次執(zhí)行base的cas累加操作失??;
 - //2.或者是在執(zhí)行add方法時(shí),對cells某個位置的Cell的cas操作第一次失敗,則將wasUncontended設(shè)置為false,
 - // 那么這里會將其重新置為true;第一次執(zhí)行操作失??;
 - //凡是參與了cell爭用操作的線程threadLocalRandomProbe都不為0;
 - int h;
 - if ((h = getProbe()) == 0) {
 - //初始化ThreadLocalRandom;
 - ThreadLocalRandom.current(); // force initialization
 - //將h設(shè)置為0x9e3779b9
 - h = getProbe();
 - //設(shè)置未競爭標(biāo)記為true
 - wasUncontended = true;
 - }
 - //cas沖突標(biāo)志,表示當(dāng)前線程hash到的Cells數(shù)組的位置,做cas累加操作時(shí)與其它線程發(fā)生了沖突,cas失??;
 - // collide=true代表有沖突,collide=false代表無沖突
 - boolean collide = false;
 - for (;;) {
 - Cell[] as; Cell a; int n; long v;
 - //這個主干if有三個分支
 - //1.主分支一:處理cells數(shù)組已經(jīng)正常初始化了的情況(這個if分支處理add方法的四個條件中的3和4)
 - //2.主分支二:處理cells數(shù)組沒有初始化或者長度為0的情況;(這個分支處理add方法的四個條件中的1和2)
 - //3.主分支三:處理如果cell數(shù)組沒有初始化,并且其它線程正在執(zhí)行對cells數(shù)組初始化的操作,
 - // 及cellbusy=1;
 - // 則嘗試將累加值通過cas累加到base上
 - //先看主分支一
 - if ((as = cells) != null && (n = as.length) > 0) {
 - /**
 - *內(nèi)部小分支一:這個是處理add方法內(nèi)部if分支的條件3:如果被hash到的位置為null,
 - * 說明沒有線程在這個位置設(shè)置過值,
 - * 沒有競爭,可以直接使用,則用x值作為初始值創(chuàng)建一個新的Cell對象,
 - * 對cells數(shù)組使用cellsBusy加鎖,
 - * 然后將這個Cell對象放到cells[m%cells.length]位置上
 - */
 - if ((a = as[(n - 1) & h]) == null) {
 - //cellsBusy == 0 代表當(dāng)前沒有線程cells數(shù)組做修改
 - if (cellsBusy == 0) {
 - //將要累加的x值作為初始值創(chuàng)建一個新的Cell對象,
 - Cell r = new Cell(x);
 - //如果cellsBusy=0無鎖,則通過cas將cellsBusy設(shè)置為1加鎖
 - if (cellsBusy == 0 && casCellsBusy()) {
 - //標(biāo)記Cell是否創(chuàng)建成功并放入到cells數(shù)組被hash的位置上
 - boolean created = false;
 - try {
 - Cell[] rs; int m, j;
 - //再次檢查cells數(shù)組不為null,且長度不為空,且hash到的位置的Cell為null
 - if ((rs = cells) != null &&
 - (m = rs.length) > 0 &&
 - rs[j = (m - 1) & h] == null) {
 - //將新的cell設(shè)置到該位置
 - rs[j] = r;
 - created = true;
 - }
 - } finally {
 - //去掉鎖
 - cellsBusy = 0;
 - }
 - //生成成功,跳出循環(huán)
 - if (created)
 - break;
 - //如果created為false,說明上面指定的cells數(shù)組的位置cells[m%cells.length]
 - // 已經(jīng)有其它線程設(shè)置了cell了,
 - // 繼續(xù)執(zhí)行循環(huán)。
 - continue;
 - }
 - }
 - //如果執(zhí)行的當(dāng)前行,代表cellsBusy=1,有線程正在更改cells數(shù)組,代表產(chǎn)生了沖突,將collide設(shè)置為false
 - collide = false;
 - /**
 - *內(nèi)部小分支二:如果add方法中條件4的通過cas設(shè)置cells[m%cells.length]位置的Cell對象中的
 - * value值設(shè)置為v+x失敗,
 - * 說明已經(jīng)發(fā)生競爭,將wasUncontended設(shè)置為true,跳出內(nèi)部的if判斷,
 - * 最后重新計(jì)算一個新的probe,然后重新執(zhí)行循環(huán);
 - */
 - } else if (!wasUncontended)
 - //設(shè)置未競爭標(biāo)志位true,繼續(xù)執(zhí)行,后面會算一個新的probe值,然后重新執(zhí)行循環(huán)。
 - wasUncontended = true;
 - /**
 - *內(nèi)部小分支三:新的爭用線程參與爭用的情況:處理剛進(jìn)入當(dāng)前方法時(shí)threadLocalRandomProbe=0的情況,
 - * 也就是當(dāng)前線程第一次參與cell爭用的cas失敗,這里會嘗試將x值加到cells[m%cells.length]
 - * 的value ,如果成功直接退出
 - */
 - else if (a.cas(v = a.value, ((fn == null) ? v + x :
 - fn.applyAsLong(v, x))))
 - break;
 - /**
 - *內(nèi)部小分支四:分支3處理新的線程爭用執(zhí)行失敗了,這時(shí)如果cells數(shù)組的長度已經(jīng)到了最大值
 - * (大于等于cup數(shù)量),
 - * 或者是當(dāng)前cells已經(jīng)做了擴(kuò)容,則將collide設(shè)置為false,后面重新計(jì)算prob的值*/
 - else if (n >= NCPU || cells != as)
 - collide = false;
 - /**
 - *內(nèi)部小分支五:如果發(fā)生了沖突collide=false,則設(shè)置其為true;會在最后重新計(jì)算hash值后,
 - * 進(jìn)入下一次for循環(huán)
 - */
 - else if (!collide)
 - //設(shè)置沖突標(biāo)志,表示發(fā)生了沖突,需要再次生成hash,重試。
 - // 如果下次重試任然走到了改分支此時(shí)collide=true,!collide條件不成立,則走后一個分支
 - collide = true;
 - /**
 - *內(nèi)部小分支六:擴(kuò)容cells數(shù)組,新參與cell爭用的線程兩次均失敗,且符合庫容條件,會執(zhí)行該分支
 - */
 - else if (cellsBusy == 0 && casCellsBusy()) {
 - try {
 - //檢查cells是否已經(jīng)被擴(kuò)容
 - if (cells == as) { // Expand table unless stale
 - Cell[] rs = new Cell[n << 1];
 - for (int i = 0; i < n; ++i)
 - rs[i] = as[i];
 - cells = rs;
 - }
 - } finally {
 - cellsBusy = 0;
 - }
 - collide = false;
 - continue; // Retry with expanded table
 - }
 - //為當(dāng)前線程重新計(jì)算hash值
 - h = advanceProbe(h);
 - //這個大的分支處理add方法中的條件1與條件2成立的情況,如果cell表還未初始化或者長度為0,
 - // 先嘗試獲取cellsBusy鎖。
 - }else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
 - boolean init = false;
 - try { // Initialize table
 - //初始化cells數(shù)組,初始容量為2,并將x值通過hash&1,放到0個或第1個位置上
 - if (cells == as) {
 - Cell[] rs = new Cell[2];
 - rs[h & 1] = new Cell(x);
 - cells = rs;
 - init = true;
 - }
 - } finally {
 - //解鎖
 - cellsBusy = 0;
 - }
 - //如果init為true說明初始化成功,跳出循環(huán)
 - if (init)
 - break;
 - }
 - /**
 - *如果以上操作都失敗了,則嘗試將值累加到base上;
 - */
 - else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
 - // Fall back on using base
 - break;
 - }
 - }
 
以上2個方法的解析搬自于源碼閱讀:全方位講解LongAdder,此處對代碼做了微調(diào),方便閱讀。
總結(jié)
LongAdder在沒有線程競爭的時(shí)候,只使用base值,此時(shí)的情況就類似與AtomicLong。但LongAdder的高明之處在于,發(fā)生線程競爭時(shí),便會使用到Cell數(shù)組,所以該數(shù)組是惰性加載的。
Cell數(shù)組初始值為2,每次擴(kuò)容(當(dāng)線程競爭異常激烈時(shí),發(fā)生擴(kuò)容)為上次長度的2倍,因此數(shù)組長度一直是2的次冪,但是當(dāng)數(shù)組長度≥CPU的核心數(shù)時(shí),就不再進(jìn)行擴(kuò)容。為什么?我的理解是在一臺電腦中,最多能有CPU核心數(shù)個線程能夠并行,因此同時(shí)也就這么多個線程操作Cell數(shù)組,每個線程更新一個位置上的元素,且又因?yàn)閿?shù)組中每個元素由于字節(jié)填充機(jī)制,十分的占據(jù)內(nèi)存??紤]到這兩個因素,Cell數(shù)組在長度≥CPU核心數(shù)時(shí),停止擴(kuò)容。
確實(shí),LongAdder花了很多心思提高了高并發(fā)下程序運(yùn)行的效率,每一步都是在沒有更好的辦法下才會去選擇開銷更大的操作。在低并發(fā)下,LongAdder和AtomicLong效率上差不多,但LongAdder更加耗費(fèi)內(nèi)存。不過在高并發(fā)下,LongAdder將更加高效。















 
 
 









 
 
 
 