偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

1.8w字圖解Java并發(fā)容器框架:并發(fā)安全 Map、JUC 集合、Java 7 種阻塞隊(duì)列正確使用場(chǎng)景和原理詳解

開發(fā) 前端
所謂雙向隊(duì)列指的你可以從隊(duì)列的兩端插入和移出元素。雙端隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)。

今天進(jìn)入 Java 并發(fā)編程第二章節(jié),圍繞著并發(fā)容器展開,主要內(nèi)容如下:

  • ConcurrentHashMap的使用和原理
  • ConcurrentLinkedQueue 的使用和原理
  • Java 7 種阻塞隊(duì)列使用場(chǎng)景和原理詳解:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue以及LinkedBlockingDeque

ConcurrentHashMap 的使用和原理

Map 是一個(gè)接口,它的實(shí)現(xiàn)方式有很多種,比如常見的 HashMap、LinkedHashMap,但是這些 Map 的實(shí)現(xiàn)并不是線程安全的,在多線程高并發(fā)的環(huán)境中會(huì)出現(xiàn)線程安全的問(wèn)題。

鑒于 Map 是一個(gè)在高并發(fā)的應(yīng)用環(huán)境中應(yīng)用比較廣泛的數(shù)據(jù)結(jié)構(gòu),Doug Lea 自 JDK 1.5 版本起在 Java 中引入了 ConcurrentHashMap。

ConcurrentHashMap 的使用

對(duì)一個(gè)技術(shù)的掌握,從使用開始。我們現(xiàn)在來(lái)實(shí)現(xiàn)一個(gè)一個(gè)高并發(fā)計(jì)數(shù)器,例如記錄網(wǎng)站訪問(wèn)量、接口調(diào)用次數(shù)等。

@Service
public class Counter {
    private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

    public void increase(String key) {
        map.compute(key, (k, v) -> (v == null) ? 1 : v + 1);
    }

    public int get(String key) {
        return map.getOrDefault(key, 0);
    }
}
  • compute :這是一個(gè)并發(fā)安全原子操作,我們使用 compute 方法實(shí)現(xiàn)對(duì)計(jì)數(shù)器的增加操作。

如果 key 不存在則新建一個(gè)值為 1 的計(jì)數(shù)器;

否則將其 value 遞增 1。

  • 通過(guò) get 方法可以獲取指定 key 對(duì)應(yīng)的計(jì)數(shù)器值。

這個(gè)例子比較簡(jiǎn)單,現(xiàn)在開始上強(qiáng)度。ConcurrentHashMap 還可以用來(lái)實(shí)現(xiàn)緩存管理器,例如存儲(chǔ)經(jīng)常使用的業(yè)務(wù)數(shù)據(jù)、系統(tǒng)配置等信息,從而避免頻繁的數(shù)據(jù)庫(kù)查詢或網(wǎng)絡(luò)請(qǐng)求。

以下是一個(gè)支持過(guò)期時(shí)間、自動(dòng)刷新和并發(fā)控制的緩存管理器實(shí)現(xiàn),包含詳細(xì)注釋和最佳實(shí)踐:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
 * 高性能并發(fā)緩存管理器
 * @param <K> 鍵類型
 * @param <V> 值類型
 */
publicclass ConcurrentCache<K, V> {

    privatefinal ConcurrentHashMap<K, CacheEntry<V>> cache = new ConcurrentHashMap<>();
    privatefinal Function<K, V> loader;  // 緩存加載器
    privatefinal ScheduledExecutorService cleaner;  // 過(guò)期清理線程

    // 默認(rèn)配置
    privatelong defaultTTL = 30_000;          // 默認(rèn)30秒
    privatelong cleanupInterval = 5_000;      // 5秒清理一次
    privateint maxRetries = 3;                // 最大重試次數(shù)
    privateboolean refreshOnAccess = true;    // 訪問(wèn)時(shí)刷新TTL

    public ConcurrentCache(Function<K, V> loader) {
        this.loader = loader;
        this.cleaner = Executors.newSingleThreadScheduledExecutor();
        startCleanupTask();
    }

    /**
     * 獲取緩存值(線程安全)
     */
    public V get(K key) {
        CacheEntry<V> entry = cache.get(key);

        // 無(wú)緩存或已過(guò)期時(shí)加載
        if (entry == null || entry.isExpired()) {
            return loadAndCache(key);
        }

        // 更新訪問(wèn)時(shí)間(可選)
        if (refreshOnAccess) {
            entry.touch();
        }
        return entry.value;
    }

    /**
     * 原子性的加載和緩存操作
     */
    private V loadAndCache(K key) {
        int retry = 0;
        while (retry++ < maxRetries) {
            try {
                // 使用compute保證原子性
                CacheEntry<V> newEntry = cache.compute(key, (k, oldEntry) -> {
                    // 檢查其他線程是否已經(jīng)加載
                    if (oldEntry != null && !oldEntry.isExpired()) {
                        return oldEntry;
                    }

                    V value = loader.apply(k);
                    returnnew CacheEntry<>(value, defaultTTL, TimeUnit.MILLISECONDS);
                });

                return newEntry.value;
            } catch (Exception ex) {
                if (retry >= maxRetries) {
                    thrownew CacheLoadException("加載緩存失敗,key=" + key, ex);
                }
                // 指數(shù)退避重試
                sleepUninterruptibly((long) Math.pow(2, retry), TimeUnit.MILLISECONDS);
            }
        }
        thrownew CacheLoadException("超過(guò)最大重試次數(shù),key=" + key);
    }

    /**
     * 主動(dòng)放入緩存(支持自定義TTL)
     */
    public void put(K key, V value, long ttl, TimeUnit unit) {
        cache.put(key, new CacheEntry<>(value, ttl, unit));
    }

    /**
     * 啟動(dòng)定期清理任務(wù)(雙重檢查鎖模式)
     */
    private void startCleanupTask() {
        if (cleaner.isShutdown()) return;

        cleaner.scheduleWithFixedDelay(() -> {
            cache.forEach((key, entry) -> {
                if (entry.isExpired()) {
                    cache.remove(key, entry); // 使用CAS刪除
                }
            });
        }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
    }

    // 其他實(shí)用方法
    public void remove(K key) { cache.remove(key); }
    public void clear() { cache.clear(); }
    public long size() { return cache.mappingCount(); }

    // 配置方法(Builder模式風(fēng)格)
    public ConcurrentCache<K, V> defaultTTL(long ttl, TimeUnit unit) {
        this.defaultTTL = unit.toMillis(ttl);
        returnthis;
    }

    public ConcurrentCache<K, V> cleanupInterval(long interval, TimeUnit unit) {
        this.cleanupInterval = unit.toMillis(interval);
        returnthis;
    }

    // 異常處理
    privatestaticclass CacheLoadException extends RuntimeException {
        CacheLoadException(String message, Throwable cause) {
            super(message, cause);
        }

        CacheLoadException(String message) {
            super(message);
        }
    }

    // 工具方法:不可中斷的休眠
    private static void sleepUninterruptibly(long duration, TimeUnit unit) {
        try {
            Thread.sleep(unit.toMillis(duration));
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
    }

    // 關(guān)閉時(shí)釋放資源
    public void shutdown() {
        cleaner.shutdownNow();
        cache.clear();
    }

    // 緩存條目:包含值、過(guò)期時(shí)間和訪問(wèn)時(shí)間戳
    privatestaticclass CacheEntry<V> {
        final V value;
        finallong expireAt;  // 絕對(duì)過(guò)期時(shí)間(納秒)
        final AtomicLong accessTime = new AtomicLong(); // 最后訪問(wèn)時(shí)間(納秒)

        CacheEntry(V value, long ttl, TimeUnit unit) {
            this.value = value;
            this.expireAt = System.nanoTime() + unit.toNanos(ttl);
            touch();
        }
        // 刷新訪問(wèn)時(shí)間
        void touch() {
            accessTime.set(System.nanoTime());
        }
        // 判斷是否已過(guò)期
        boolean isExpired() {
            return System.nanoTime() > expireAt;
        }
    }
}

實(shí)現(xiàn)原理

  • CHM 的源碼有 6k 多行,包含的內(nèi)容多,精巧,不容易理解;建議在查看源碼的時(shí)候,可以首先把握整體結(jié)構(gòu)脈絡(luò),對(duì)于一些精巧的優(yōu)化,哈希技巧可以先了解目的就可以了,不用深究;
  • 對(duì)整體把握比較清楚后,在逐步分析,可以比較快速的看懂;
  • JDK1.8 版本中的 CHM,和 JDK1.7 版本的差別非常大,在查看資料的時(shí)候要注意區(qū)分,1.7 中主要是使用 Segment 分段鎖 來(lái)解決并發(fā)問(wèn)題的。

JDK 1.7 版本 ConcurrentHashMap

在 JDK1.7 版本中,ConcurrentHashMap 的數(shù)據(jù)結(jié)構(gòu)是由一個(gè) Segment 數(shù)組和多個(gè) HashEntry 組成。

而每一個(gè) Segment 元素存儲(chǔ)的是 HashEntry 數(shù)組+鏈表,并對(duì)應(yīng)一個(gè) ReentrantLock 鎖,用于并發(fā)訪問(wèn)控制。

圖片圖片

以 put 操作為例,來(lái)看一下 ConcurrentHashMap 的實(shí)現(xiàn)過(guò)程:

  1. 首先計(jì)算 key 的哈希值;
  2. 根據(jù)哈希值找到對(duì)應(yīng)的 Segment;
  3. 獲取 Segment 對(duì)應(yīng)的鎖;
  4. 如果還沒(méi)有元素,就直接插入到 Segment 中;
  5. 如果已經(jīng)存在元素,就循環(huán)比較 key 是否相等;
  6. 如果 key 已經(jīng)存在,就根據(jù)要求更新 value;
  7. 如果 key 不存在,就插入新的元素(鏈表或者紅黑樹)。

上述操作中,步驟 2 到 3 相當(dāng)于對(duì)對(duì)應(yīng)的 Segment 加了一個(gè)悲觀鎖,如果 Segment 數(shù)組只有一個(gè) Segment 元素,效果與 Hashtable 類似;

如果存在多個(gè) Segment,效果就相當(dāng)于使用了分段鎖機(jī)制,提高了并發(fā)訪問(wèn)性能。

JDK 1.8 ConcurrentHashMap

在 JDK1.8 中,ConcurrentHashMap 的實(shí)現(xiàn)原理摒棄了這種設(shè)計(jì),而是選擇了與 HashMap 類似的數(shù)組+鏈表+紅黑樹的方式實(shí)現(xiàn),而加鎖則采用 CAS 和 synchronized 實(shí)現(xiàn)。

圖片圖片

其主要區(qū)別就在 CHM 支持并發(fā):

  • 使用 Unsafe 方法操作數(shù)組內(nèi)部元素,保證可見性;(U.getObjectVolatile、U.compareAndSwapObject、U.putObjectVolatile);
  • 在更新和移動(dòng)節(jié)點(diǎn)的時(shí)候,直接鎖住對(duì)應(yīng)的哈希桶,鎖粒度更小,且動(dòng)態(tài)擴(kuò)展;
  • 針對(duì)擴(kuò)容慢操作進(jìn)行優(yōu)化。

a.首先擴(kuò)容過(guò)程的中,節(jié)點(diǎn)首先移動(dòng)到過(guò)度表 nextTable ,所有節(jié)點(diǎn)移動(dòng)完畢時(shí)替換散列表 table;

b.移動(dòng)時(shí)先將散列表定長(zhǎng)等分,然后逆序依次領(lǐng)取任務(wù)擴(kuò)容,設(shè)置 sizeCtl 標(biāo)記正在擴(kuò)容;

c.移動(dòng)完成一個(gè)哈希桶或者遇到空桶時(shí),將其標(biāo)記為 ForwardingNode 節(jié)點(diǎn),并指向 nextTable ;

d.后有其他線程在操作哈希表時(shí),遇到 ForwardingNode 節(jié)點(diǎn),則先幫助擴(kuò)容(繼續(xù)領(lǐng)取分段任務(wù)),擴(kuò)容完成后再繼續(xù)之前的操作;

  • 優(yōu)化哈希表計(jì)數(shù)器,采用 LongAdder、Striped64 類似思想;
  • 以及大量的哈希算法優(yōu)化和狀態(tài)變量?jī)?yōu)化;
  • 關(guān)注「碼哥跳動(dòng)」,并設(shè)置星標(biāo),一起擁抱硬核技術(shù)和對(duì)象,面向人民幣編程。
類定義和成員變量
// node數(shù)組最大容量:2^30=1073741824
privatestaticfinalint MAXIMUM_CAPACITY = 1 << 30;
// 默認(rèn)初始值,必須是2的幕數(shù)
privatestaticfinalint DEFAULT_CAPACITY = 16;
//數(shù)組可能最大值,需要與toArray()相關(guān)方法關(guān)聯(lián)
staticfinalint MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并發(fā)級(jí)別,遺留下來(lái)的,為兼容以前的版本
privatestaticfinalint DEFAULT_CONCURRENCY_LEVEL = 16;
// 負(fù)載因子
privatestaticfinalfloat LOAD_FACTOR = 0.75f;
// 鏈表轉(zhuǎn)紅黑樹閥值,> 8 鏈表轉(zhuǎn)換為紅黑樹
staticfinalint TREEIFY_THRESHOLD = 8;
//樹轉(zhuǎn)鏈表閥值,小于等于6(tranfer時(shí),lc、hc=0兩個(gè)計(jì)數(shù)器分別++記錄原bin、新binTreeNode數(shù)量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
staticfinalint UNTREEIFY_THRESHOLD = 6;
staticfinalint MIN_TREEIFY_CAPACITY = 64;
privatestaticfinalint MIN_TRANSFER_STRIDE = 16;
privatestaticint RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize的最大線程數(shù)
privatestaticfinalint MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中記錄size大小的偏移量
privatestaticfinalint RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
staticfinalint MOVED     = -1;
// 樹根節(jié)點(diǎn)的hash值
staticfinalint TREEBIN   = -2;
// ReservationNode的hash值
staticfinalint RESERVED  = -3;
// 可用處理器數(shù)量
staticfinalint NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數(shù)組
transientvolatile Node<K,V>[] table;
/*控制標(biāo)識(shí)符,用來(lái)控制table的初始化和擴(kuò)容的操作,不同的值有不同的含義
 *當(dāng)為負(fù)數(shù)時(shí):-1代表正在初始化,-N代表有N-1個(gè)線程正在 進(jìn)行擴(kuò)容
 *當(dāng)為0時(shí):代表當(dāng)時(shí)的table還沒(méi)有被初始化
 *當(dāng)為正數(shù)時(shí):表示初始化或者下一次進(jìn)行擴(kuò)容的大小
 */
privatetransientvolatileint sizeCtl;

上面有幾個(gè)重要的地方這里單獨(dú)講:

LOAD_FACTOR:

這里的負(fù)載系數(shù),同 HashMap 等其他 Map 的系數(shù)有明顯區(qū)別:

  • 通常的系數(shù)默認(rèn) 0.75,可以由構(gòu)造函數(shù)傳入,當(dāng)節(jié)點(diǎn)數(shù) size 超過(guò) loadFactor * capacity 時(shí)擴(kuò)容;
  • 而 CMH 的系數(shù)則固定 0.75(使用 n - (n >>> 2) 表示),構(gòu)造函數(shù)傳入的系數(shù)只影響初始化容量,見第 5 個(gè)構(gòu)造函數(shù)。

sizeCtl:

sizeCtl 是 CHM 中最重要的狀態(tài)變量,其中包括很多中狀態(tài),這里先整體介紹幫助后面源碼理解;

  • sizeCtl = 0 :初始值,還未指定初始容量;
  • sizeCtl > 0 :

table 未初始化,表示初始化容量;

table 已初始化,表示擴(kuò)容閾值(0.75n);

  • sizeCtl = -1 :表示正在初始化;
  • sizeCtl < -1 :表示正在擴(kuò)容,具體結(jié)構(gòu)如圖所示:

圖片圖片

Node 節(jié)點(diǎn)

Node 是 ConcurrentHashMap 存儲(chǔ)結(jié)構(gòu)的基本單元,繼承于 HashMap 中的 Entry,用于存儲(chǔ)數(shù)據(jù),源代碼如下。

static class Node<K,V> implements Map.Entry<K,V> {  // 哈希表普通節(jié)點(diǎn)
finalint hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node<K,V> find(int h, Object k) {}   // 主要在擴(kuò)容時(shí),利用多態(tài)查詢已轉(zhuǎn)移節(jié)點(diǎn)
}

staticfinalclass ForwardingNode<K,V> extends Node<K,V> {  // 標(biāo)識(shí)擴(kuò)容節(jié)點(diǎn)
final Node<K,V>[] nextTable;  // 指向成員變量 ConcurrentHashMap.nextTable

  ForwardingNode(Node<K,V>[] tab) {
    super(MOVED, null, null, null);  // hash = -1,快速確定 ForwardingNode 節(jié)點(diǎn)
    this.nextTable = tab;
  }

Node<K,V> find(int h, Object k) {}
}

staticfinalclass TreeBin<K,V> extends Node<K,V> { // 紅黑樹根節(jié)點(diǎn)
  TreeBin(TreeNode<K,V> b) {
    super(TREEBIN, null, null, null);  // hash = -2,快速確定紅黑樹,
    ...
  }
}
staticfinalclass TreeNode<K,V> extends Node<K,V> { } // 紅黑樹普通節(jié)點(diǎn),其 hash 同 Node 普通節(jié)點(diǎn) > 0;
哈希計(jì)算
static finalint MOVED     = -1;          // hash for forwarding nodes
staticfinalint TREEBIN   = -2;          // hash for roots of trees
staticfinalint RESERVED  = -3;          // hash for transient reservations
staticfinalint HASH_BITS = 0x7fffffff;  // usable bits of normal node hash

// 讓高位16位,參與哈希桶定位運(yùn)算的同時(shí),保證 hash 為正
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
哈希桶可見性

一個(gè)數(shù)組即使聲明為 volatile,也只能保證這個(gè)數(shù)組引用本身的可見性,其內(nèi)部元素的可見性是無(wú)法保證的,如果每次都加鎖,則效率必然大大降低,在 CHM 中則使用 Unsafe 方法來(lái)保證:

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

staticfinal <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

staticfinal <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
  U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
put 操作

思路是對(duì)當(dāng)前的 table 進(jìn)行無(wú)條件自循環(huán)直到 put 成功,可以分成以下六步流程來(lái)概述。

  1. 如果沒(méi)有初始化就先調(diào)用 initTable()方法來(lái)進(jìn)行初始化過(guò)程
  2. 如果沒(méi)有 hash 沖突就直接 CAS 插入
  3. 如果還在進(jìn)行擴(kuò)容操作就先進(jìn)行擴(kuò)容
  4. 如果存在 hash 沖突,就加鎖來(lái)保證線程安全,這里有兩種情況,一種是鏈表形式就直接遍歷到尾端插入,一種是紅黑樹就按照紅黑樹結(jié)構(gòu)插入,
  5. 最后一個(gè)如果該鏈表的數(shù)量大于閾值 8,就要先轉(zhuǎn)換成黑紅樹的結(jié)構(gòu),break 再一次進(jìn)入循環(huán)
  6. 如果添加成功就調(diào)用 addCount()方法統(tǒng)計(jì) size,并且檢查是否需要擴(kuò)容
public V put(K key, V value) {
    return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) thrownew NullPointerException();
    int hash = spread(key.hashCode()); //兩次hash,減少hash沖突,可以均勻分布
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { //對(duì)這個(gè)table進(jìn)行迭代
        Node<K,V> f; int n, i, fh;
        //這里就是上面構(gòu)造方法沒(méi)有進(jìn)行初始化,在這里進(jìn)行判斷,為null就調(diào)用initTable進(jìn)行初始化,屬于懶漢模式初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        elseif ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置沒(méi)有數(shù)據(jù),就直接無(wú)鎖插入
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        elseif ((fh = f.hash) == MOVED)//如果在進(jìn)行擴(kuò)容,則先進(jìn)行擴(kuò)容操作
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //如果以上條件都不滿足,那就要進(jìn)行加鎖操作,也就是存在hash沖突,鎖住鏈表或者紅黑樹的頭結(jié)點(diǎn)
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { //表示該節(jié)點(diǎn)是鏈表結(jié)構(gòu)
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //這里涉及到相同的key進(jìn)行put就會(huì)覆蓋原先的value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {  //插入鏈表尾部
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    elseif (f instanceof TreeBin) {//紅黑樹結(jié)構(gòu)
                        Node<K,V> p;
                        binCount = 2;
                        //紅黑樹結(jié)構(gòu)旋轉(zhuǎn)插入
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) { //如果鏈表的長(zhǎng)度大于8時(shí)就會(huì)進(jìn)行紅黑樹的轉(zhuǎn)換
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);//統(tǒng)計(jì)size,并且檢查是否需要擴(kuò)容
    returnnull;
}

流程圖如下所示:

圖片圖片

get 操作

get 方法可能看代碼不是很長(zhǎng),但是他卻能 保證無(wú)鎖狀態(tài)下的內(nèi)存一致性 。

public V get(Object key) {
  Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 計(jì)算 hash
if ((tab = table) != null && (n = tab.length) > 0 &&  // 確保 table 已經(jīng)初始化

    // 確保對(duì)應(yīng)的哈希桶不為空,注意這里是 Volatile 語(yǔ)義獲??;因?yàn)閿U(kuò)容的時(shí)候,是完全拷貝,所以只要不為空,則鏈表必然完整
    (e = tabAt(tab, (n - 1) & h)) != null) {
    if ((eh = e.hash) == h) {
      if ((ek = e.key) == key || (ek != null && key.equals(ek)))
        return e.val;
    }

    // hash < 0,則必然在擴(kuò)容,原來(lái)位置的節(jié)點(diǎn)可能全部移動(dòng)到 i + oldCap 位置,所以利用多態(tài)到 nextTable 中查找
    elseif (eh < 0) return (p = e.find(h, key)) != null ? p.val : null;

    while ((e = e.next) != null) { // 遍歷鏈表
      if (e.hash == h &&
        ((ek = e.key) == key || (ek != null && key.equals(ek))))
        return e.val;
    }
  }
returnnull;
}

ConcurrentHashMap 的 get 操作的流程很簡(jiǎn)單,也很清晰,可以分為三個(gè)步驟來(lái)描述.

  1. 計(jì)算 hash 值,定位到該 table 索引位置,如果是首節(jié)點(diǎn)符合就返回
  2. 如果遇到擴(kuò)容的時(shí)候,會(huì)調(diào)用標(biāo)志正在擴(kuò)容節(jié)點(diǎn) ForwardingNode 的 find 方法,查找該節(jié)點(diǎn),匹配就返回
  3. 以上都不符合的話,就往下遍歷節(jié)點(diǎn),匹配就返回,否則最后就返回 null
size 操作

在 JDK1.8 版本中,對(duì)于 size 的計(jì)算,在擴(kuò)容和 addCount()方法就已經(jīng)有處理了,JDK1.7 是在調(diào)用 size()方法才去計(jì)算,其實(shí)在并發(fā)集合中去計(jì)算 size 是沒(méi)有多大的意義的,因?yàn)?size 是實(shí)時(shí)在變的,只能計(jì)算某一刻的大小,但是某一刻太快了,人的感知是一個(gè)時(shí)間段,所以并不是很精確。

擴(kuò)容

擴(kuò)容操作一直都是比較慢的操作,而 CHM 中巧妙的利用任務(wù)劃分,使得多個(gè)線程可能同時(shí)參與擴(kuò)容;

另外擴(kuò)容條件也有兩個(gè):

  • 有鏈表長(zhǎng)度超過(guò) 8,但是容量小于 64 的時(shí)候,發(fā)生擴(kuò)容;
  • 節(jié)點(diǎn)數(shù)超過(guò)閾值的時(shí)候,發(fā)生擴(kuò)容;

其擴(kuò)容的過(guò)程可描述為:

  • 首先擴(kuò)容過(guò)程的中,節(jié)點(diǎn)首先移動(dòng)到過(guò)度表 nextTable ,所有節(jié)點(diǎn)移動(dòng)完畢時(shí)替換散列表 table;
  • 移動(dòng)時(shí)先將散列表定長(zhǎng)等分,然后逆序依次領(lǐng)取任務(wù)擴(kuò)容,設(shè)置 sizeCtl 標(biāo)記正在擴(kuò)容;
  • 移動(dòng)完成一個(gè)哈希桶或者遇到空桶時(shí),將其標(biāo)記為 ForwardingNode 節(jié)點(diǎn),并指向 nextTable ;
  • 后有其他線程在操作哈希表時(shí),遇到 ForwardingNode 節(jié)點(diǎn),則先幫助擴(kuò)容(繼續(xù)領(lǐng)取分段任務(wù)),擴(kuò)容完成后再繼續(xù)之前的操作;

如圖:

圖片圖片

具體源碼如下:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    stride = MIN_TRANSFER_STRIDE; // 根據(jù) CPU 數(shù)量計(jì)算任務(wù)步長(zhǎng)
if (nextTab == null) {          // 初始化 nextTab
    try {
      @SuppressWarnings("unchecked")
      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];  // 擴(kuò)容一倍
      nextTab = nt;
    } catch (Throwable ex) {
      sizeCtl = Integer.MAX_VALUE; // 發(fā)生 OOM 時(shí),不再擴(kuò)容
      return;
    }
    nextTable = nextTab;
    transferIndex = n;
  }
int nextn = nextTab.length;
  ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);  // 標(biāo)記空桶,或已經(jīng)轉(zhuǎn)移完畢的桶
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {  // 逆向遍歷擴(kuò)容
    Node<K,V> f; int fh;
    while (advance) {  // 向前獲取哈希桶
      int nextIndex, nextBound;
      if (--i >= bound || finishing)               // 已經(jīng)取到哈希桶,或已完成時(shí)退出
        advance = false;
      elseif ((nextIndex = transferIndex) <= 0) { // 遍歷到達(dá)頭節(jié)點(diǎn),已經(jīng)沒(méi)有待遷移的桶,線程準(zhǔn)備退出
        i = -1;
        advance = false;
      }
      elseif (U.compareAndSwapInt
           (this, TRANSFERINDEX, nextIndex,
            nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {  // 當(dāng)前任務(wù)完成,領(lǐng)取下一批哈希桶
        bound = nextBound;
        i = nextIndex - 1;  // 索引指向下一批哈希桶
        advance = false;
      }
    }

    // i < 0  :表示擴(kuò)容結(jié)束,已經(jīng)沒(méi)有待移動(dòng)的哈希桶
    // i >= n :擴(kuò)容結(jié)束,再次檢查確認(rèn)
    // i + n >= nextn : 在使用 nextTable 替換 table 時(shí),有線程進(jìn)入擴(kuò)容就會(huì)出現(xiàn)
    if (i < 0 || i >= n || i + n >= nextn) { // 完成擴(kuò)容準(zhǔn)備退出
      int sc;
      if (finishing) {  // 兩次檢查,只有最后一個(gè)擴(kuò)容線程退出時(shí),才更新變量
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1); // 0.75*2*n
        return;
      }
      if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  // 擴(kuò)容線程減一
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return;  // 不是最后一個(gè)線程,直接退出
        finishing = advance = true;   // 最后一個(gè)線程,再次檢查
        i = n;                        // recheck before commit
      }
    }
    elseif ((f = tabAt(tab, i)) == null)  // 當(dāng)前節(jié)點(diǎn)為空,直接標(biāo)記為 ForwardingNode,然后繼續(xù)獲取下一個(gè)桶
      advance = casTabAt(tab, i, null, fwd);

    // 之前的線程已經(jīng)完成該桶的移動(dòng),直接跳過(guò),正常情況下自己的任務(wù)區(qū)間,不會(huì)出現(xiàn) ForwardingNode 節(jié)點(diǎn),
    elseif ((fh = f.hash) == MOVED)  // 此處為極端條件下的健壯性檢查
      advance = true; // already processed

    // 開始處理鏈表
    else {
      // 注意在 get 的時(shí)候,可以無(wú)鎖獲取,是因?yàn)閿U(kuò)容是全拷貝節(jié)點(diǎn),完成后最后在更新哈希桶
      // 而在 put 的時(shí)候,是直接將節(jié)點(diǎn)加入尾部,獲取修改其中的值,此時(shí)如果允許 put 操作,最后就會(huì)發(fā)生臟讀,
      // 所以 put 和 transfer,需要競(jìng)爭(zhēng)同一把鎖,也就是對(duì)應(yīng)的哈希桶,以保證內(nèi)存一致性效果
      synchronized (f) {
        if (tabAt(tab, i) == f) {  // 確認(rèn)鎖定的是同一個(gè)桶
          Node<K,V> ln, hn;
          if (fh >= 0) {  // 正常節(jié)點(diǎn)
            int runBit = fh & n;  // hash & n,判斷擴(kuò)容后的索引
            Node<K,V> lastRun = f;

            // 此處找到鏈表最后擴(kuò)容后處于同一位置的連續(xù)節(jié)點(diǎn),這樣最后一節(jié)就不用再一次復(fù)制了
            for (Node<K,V> p = f.next; p != null; p = p.next) {
              int b = p.hash & n;
              if (b != runBit) {
                runBit = b;
                lastRun = p;
              }
            }
            if (runBit == 0) {
              ln = lastRun;
              hn = null;
            }
            else {
              hn = lastRun;
              ln = null;
            }

            // 依次將鏈表拆分成,lo、hi 兩條鏈表,即位置不變的鏈表,和位置 + oldCap 的鏈表
            // 注意最后一節(jié)鏈表沒(méi)有new,而是直接使用原來(lái)的節(jié)點(diǎn)
            // 同時(shí)鏈表的順序也被打亂了,lastRun 到最后為正序,前面一節(jié)為逆序
            for (Node<K,V> p = f; p != lastRun; p = p.next) {
              int ph = p.hash; K pk = p.key; V pv = p.val;
              if ((ph & n) == 0)
                ln = new Node<K,V>(ph, pk, pv, ln);
              else
                hn = new Node<K,V>(ph, pk, pv, hn);
            }
            setTabAt(nextTab, i, ln);      // 插入 lo 鏈表
            setTabAt(nextTab, i + n, hn);  // 插入 hi 鏈表
            setTabAt(tab, i, fwd);         // 哈希桶移動(dòng)完成,標(biāo)記為 ForwardingNode 節(jié)點(diǎn)
            advance = true;                // 繼續(xù)獲取下一個(gè)桶
          }
          elseif (f instanceof TreeBin) { // 拆分紅黑樹
            TreeBin<K,V> t = (TreeBin<K,V>)f;
            TreeNode<K,V> lo = null, loTail = null; // 為避免最后在反向遍歷,先留頭結(jié)點(diǎn)的引用,
            TreeNode<K,V> hi = null, hiTail = null; // 因?yàn)轫樞虻逆湵恚梢约铀偌t黑樹構(gòu)造
            int lc = 0, hc = 0;  // 同樣記錄 lo,hi 鏈表的長(zhǎng)度
            for (Node<K,V> e = t.first; e != null; e = e.next) {  // 中序遍歷紅黑樹
              int h = e.hash;
              TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);  // 構(gòu)造紅黑樹節(jié)點(diǎn)
              if ((h & n) == 0) {
                if ((p.prev = loTail) == null)
                  lo = p;
                else
                  loTail.next = p;
                loTail = p;
                ++lc;
              }
              else {
                if ((p.prev = hiTail) == null)
                  hi = p;
                else
                  hiTail.next = p;
                hiTail = p;
                ++hc;
              }
            }

            // 判斷是否需要將其轉(zhuǎn)化為紅黑樹,同時(shí)如果只有一條鏈,那么就可以不用在構(gòu)造
            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
            setTabAt(nextTab, i, ln);
            setTabAt(nextTab, i + n, hn);
            setTabAt(tab, i, fwd);
            advance = true;
          }
        }
      }
    }
  }
}

ConcurrentLinkedQueue 的使用和原理

ConcurerntLinkedQueue 一個(gè)基于單向鏈表的無(wú)界線程安全隊(duì)列,支持高并發(fā)的隊(duì)列操作,無(wú)需顯式的鎖,而且容量沒(méi)有上限。

此隊(duì)列按照 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。新的元素插入到隊(duì)列的尾部,隊(duì)列獲取操作從隊(duì)列頭部獲得元素。

應(yīng)用場(chǎng)景

常見的使用場(chǎng)景可能包括任務(wù)調(diào)度、事件處理、日志記錄等。比如訂單處理系統(tǒng),其中多個(gè)生產(chǎn)者生成訂單,多個(gè)消費(fèi)者處理訂單。

// 訂單事件處理器(生產(chǎn)環(huán)境級(jí)實(shí)現(xiàn))
publicclass OrderEventProcessor {
    // 使用隊(duì)列作為訂單緩沖區(qū)(無(wú)容量限制)
    privatefinal ConcurrentLinkedQueue<OrderEvent> queue = new ConcurrentLinkedQueue<>();
    privatefinal ExecutorService workers = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors() * 2,
        new NamedThreadFactory("order-processor")
    );

    // 初始化處理線程
    public void start() {
        for (int i = 0; i < workers.getCorePoolSize(); i++) {
            workers.submit(this::processEvents);
        }
    }

    // 接收訂單事件(來(lái)自網(wǎng)絡(luò)IO線程)
    public void receiveEvent(OrderEvent event) {
        queue.offer(event); // 無(wú)阻塞插入
        metrics.recordEnqueue(); // 監(jiān)控埋點(diǎn)
    }

    // 事件處理核心邏輯
    private void processEvents() {
        while (!Thread.currentThread().isInterrupted()) {
            OrderEvent event = queue.poll(); // 無(wú)阻塞獲取
            if (event != null) {
                try {
                    handleEvent(event);
                } catch (Exception ex) {
                    handleFailure(event, ex); // 異常處理
                }
            } else {
                // 隊(duì)列空時(shí)自適應(yīng)休眠(避免CPU空轉(zhuǎn))
                sleepBackoff();
            }
        }
    }

    // 指數(shù)退避休眠(動(dòng)態(tài)調(diào)節(jié)CPU使用率)
    private void sleepBackoff() {
        long delay = 1; // 初始1ms
        while (queue.isEmpty() && delay < 100) {
            try {
                TimeUnit.MILLISECONDS.sleep(delay);
                delay <<= 1; // 指數(shù)增加等待時(shí)間
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    // 優(yōu)雅關(guān)閉
    public void shutdown() {
        workers.shutdown();
        while (!queue.isEmpty()) {
            // 等待剩余任務(wù)處理完成
            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
        }
    }
}

需要注意的是,以下場(chǎng)景不適合。

  1. 阻塞需求,需要 take()阻塞等待。
  2. 有界隊(duì)列。
  3. 強(qiáng)一致性,要求精確的 size。

實(shí)現(xiàn)原理

以 JDK 17 源碼為基線,ConcurrentLinkedQueue 是 Java 并發(fā)包中基于無(wú)鎖算法實(shí)現(xiàn)的線程安全隊(duì)列,專為高并發(fā)場(chǎng)景設(shè)計(jì)。其核心設(shè)計(jì)目標(biāo)包括:

  1. 無(wú)阻塞操作:通過(guò) CAS 實(shí)現(xiàn)非阻塞算法
  2. 線性擴(kuò)展能力:性能隨 CPU 核心數(shù)增加而提升
  3. 弱一致性:迭代器與 size() 方法返回近似值
  4. 內(nèi)存效率:每個(gè)元素僅需 24 字節(jié)存儲(chǔ)開銷

ConcurrentLinkedQueue 的結(jié)構(gòu)

ConcurrentLinkedQueue 由 head 節(jié)點(diǎn)和 tail 節(jié)點(diǎn)組成,每個(gè)節(jié)點(diǎn)(Node)由節(jié)點(diǎn)元素(item)和指向下一個(gè)節(jié)點(diǎn)的引用(next)組成,節(jié)點(diǎn)與節(jié)點(diǎn)之間就是通過(guò)這個(gè) next 關(guān)聯(lián)起來(lái),從而組成一張鏈表結(jié)構(gòu)的隊(duì)列。

ConcurrentLinkedQueue 的節(jié)點(diǎn)都是 Node 類型的。

static finalclass Node<E> {
    volatile E item;
    volatile Node<E> next;


    Node(E item) {
        ITEM.set(this, item);
    }


    Node() {}

    void appendRelaxed(Node<E> next) {

        NEXT.set(this, next);
    }

    boolean casItem(E cmp, E val) {
        return ITEM.compareAndSet(this, cmp, val);
    }
}

入隊(duì)操作(offer)

流程圖如下。

圖片圖片

代碼解析。

public boolean offer(E e) {
    final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // CAS插入新節(jié)點(diǎn)
            if (NEXT.compareAndSet(p, null, newNode)) {
                // 惰性更新tail(允許失?。?                if (p != t)
                    TAIL.weakCompareAndSet(this, t, newNode);
                returntrue;
            }
        }
        elseif (p == q) // 處理已移除節(jié)點(diǎn)
            p = (t != (t = tail)) ? t : head;
        else// 推進(jìn)指針
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

優(yōu)化點(diǎn):

  1. weakCompareAndSet 減少內(nèi)存屏障開銷
  2. 允許尾指針最多滯后 log(n) 個(gè)節(jié)點(diǎn)
  3. 通過(guò) VarHandle 實(shí)現(xiàn)精確內(nèi)存排序

出隊(duì)操作(poll)

核心機(jī)制:

  1. 兩階段出隊(duì):先標(biāo)記 item 為 null,再更新 head
  2. 頭指針可能跳躍多個(gè)已消費(fèi)節(jié)點(diǎn)
  3. 自動(dòng)清理無(wú)效節(jié)點(diǎn)

流程圖如下。

圖片圖片

核心源碼。

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item;
            if ((item = p.item) != null && p.casItem(item, null)) {
                // 成功獲取數(shù)據(jù)
                if (p != h)
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            elseif ((q = p.next) == null) {
                updateHead(h, p);
                returnnull;
            }
            elseif (p == q)
              // 繼續(xù)循環(huán)
                 continue restartFromHead;
        }
    }
}

Java 7 種阻塞隊(duì)列

Chaya:“什么是阻塞隊(duì)列?”

阻塞隊(duì)列,顧名思義,首先它是一個(gè)隊(duì)列,線程 1 往阻塞隊(duì)列中添加元素,而線程 2 從阻塞隊(duì)列中移除元素。

  • 當(dāng)阻塞隊(duì)列是空時(shí),從隊(duì)列中獲取元素的操作將會(huì)被阻塞。
  • 當(dāng)阻塞隊(duì)列是滿時(shí),從隊(duì)列中添加元素的操作將會(huì)被阻塞。

JDK1.8 中的阻塞隊(duì)列實(shí)現(xiàn)共有 7 個(gè),分別是:

  • ArrayBlockingQueue:基于數(shù)組的有界隊(duì)列;
  • LinkedBlockingQueue:基于鏈表的無(wú)界隊(duì)列(可以設(shè)置容量);
  • PriorityBlockingQueue:基于二叉堆的無(wú)界優(yōu)先級(jí)隊(duì)列;
  • DelayQueue:基于 PriorityBlockingQueue 的無(wú)界延遲隊(duì)列;
  • SynchronousQueue:無(wú)容量的阻塞隊(duì)列(Executors.newCachedThreadPool() 中使用的隊(duì)列);
  • LinkedTransferQueue:基于鏈表的無(wú)界隊(duì)列;
  • LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。

阻塞隊(duì)列核心接口

阻塞隊(duì)列統(tǒng)一實(shí)現(xiàn)了BlockingQueue接口,BlockingQueue接口在java.util包Queue接口的基礎(chǔ)上提供了put(e)以及take()兩個(gè)阻塞方法。

除了阻塞功能,BlockingQueue 接口還定義了定時(shí)的offer以及poll,以及一次性移除方法drainTo。

//插入元素,隊(duì)列滿后會(huì)拋出異常
boolean add(E e);
//移除元素,隊(duì)列為空時(shí)會(huì)拋出異常
E remove();

//插入元素,成功反會(huì)true
boolean offer(E e);
//移除元素
E poll();

//插入元素,隊(duì)列滿后會(huì)阻塞
void put(E e) throws InterruptedException;
//移除元素,隊(duì)列空后會(huì)阻塞
E take() throws InterruptedException;

//限時(shí)插入
boolean offer(E e, long timeout, TimeUnit unit)
//限時(shí)移除
E poll(long timeout, TimeUnit unit);

//獲取所有元素到Collection中
int drainTo(Collection<? super E> c);

阻塞隊(duì)列 6 大使用場(chǎng)景

Java 阻塞隊(duì)列(BlockingQueue)是并發(fā)編程中的核心工具,其線程安全和阻塞特性使其在以下場(chǎng)景中發(fā)揮重要作用。

    生產(chǎn)者-消費(fèi)者模型(經(jīng)典場(chǎng)景)

    電商系統(tǒng)中,用戶下單后需異步處理庫(kù)存扣減、支付回調(diào)、物流通知等操作。

    痛點(diǎn):生產(chǎn)(下單)與消費(fèi)(處理)速度不一致,需解耦并保證高吞吐。

    public class OrderProcessor {
        // 生產(chǎn)級(jí)配置:建議隊(duì)列大小為 CPU 核心數(shù)*2~4
        privatestaticfinal BlockingQueue<Order> queue = new LinkedBlockingQueue<>(2048);
        privatestaticfinal ExecutorService consumerPool = Executors.newFixedThreadPool(8);
    
        // 生產(chǎn)者(Web 服務(wù)線程)
        public void submitOrder(Order order) {
            if (!queue.offer(order)) {  // 隊(duì)列滿時(shí)快速失敗
                log.warn("Order queue overflow! Reject order: {}", order.getId());
                thrownew ServiceException("系統(tǒng)繁忙,請(qǐng)稍后重試");
            }
            log.info("Order submitted: {}", order.getId());
        }
    
        // 消費(fèi)者(后臺(tái)線程池)
        @PostConstruct
        public void initConsumers() {
            for (int i = 0; i < 8; i++) {
                consumerPool.execute(() -> {
                    while (!Thread.currentThread().isInterrupted()) {
                        try {
                            Order order = queue.take();  // 阻塞直到有訂單
                            processOrder(order);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                });
            }
        }
    
        private void processOrder(Order order) {
            // 扣減庫(kù)存、支付回調(diào)等業(yè)務(wù)邏輯
        }
    }

    線程池任務(wù)調(diào)度

    場(chǎng)景問(wèn)題

    視頻平臺(tái)需將上傳的視頻轉(zhuǎn)碼為不同分辨率,任務(wù)具有突發(fā)性。

    public class VideoTranscoder {
        // 使用 PriorityBlockingQueue 確保 VIP 用戶優(yōu)先處理
        privatestaticfinal BlockingQueue<TranscodeTask> queue =
            new PriorityBlockingQueue<>(1000, Comparator.comparing(TranscodeTask::getPriority));
    
        // 自定義線程池(核心線程數(shù)=CPU數(shù), 最大線程數(shù)=CPU數(shù)*2)
        privatestaticfinal ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 30, TimeUnit.SECONDS, queue
        );
    
        public void submitTranscodeTask(TranscodeTask task) {
            executor.execute(() -> {
                // 執(zhí)行實(shí)際轉(zhuǎn)碼操作
                process(task);
            });
        }
    
        // 監(jiān)控隊(duì)列狀態(tài)(生產(chǎn)環(huán)境建議接入 Prometheus)
        public MonitorData getQueueStatus() {
            returnnew MonitorData(
                queue.size(),
                executor.getActiveCount(),
                queue.remainingCapacity()
            );
        }
    }

    生產(chǎn)級(jí)要點(diǎn):

    1. 使用有界隊(duì)列避免 OOM
    2. RejectedExecutionHandler 需配置合理拒絕策略
    3. 隊(duì)列監(jiān)控接入告警系統(tǒng)

    流量削峰

    瞬時(shí)流量高峰可達(dá)平時(shí) 100 倍,數(shù)據(jù)庫(kù)無(wú)法承受直接壓力。

    public class SeckillService {
        // 隊(duì)列容量=商品庫(kù)存*2(內(nèi)存可控)
        privatefinal BlockingQueue<SeckillRequest> queue =
            new ArrayBlockingQueue<>(20000);
    
        // 異步消費(fèi)隊(duì)列
        @Scheduled(fixedRate = 100)
        public void processQueue() {
            List<SeckillRequest> batch = new ArrayList<>(100);
            queue.drainTo(batch, 100);  // 批量取100條
            if (!batch.isEmpty()) {
                seckillDao.batchProcess(batch); // 批量寫入數(shù)據(jù)庫(kù)
            }
        }
    
        public boolean trySeckill(SeckillRequest request) {
            return queue.offer(request);  // 非阻塞提交
        }
    }

    圖片圖片

    生產(chǎn)級(jí)設(shè)計(jì):

    1. 隊(duì)列容量與數(shù)據(jù)庫(kù)吞吐量匹配
    2. 批量處理減少數(shù)據(jù)庫(kù)壓力
    3. 前端配合顯示排隊(duì)狀態(tài)

    延遲任務(wù)調(diào)度:訂單超時(shí)關(guān)閉

    需在訂單創(chuàng)建 30 分鐘后檢查支付狀態(tài),未支付自動(dòng)關(guān)閉。

    public class OrderTimeoutChecker implements Runnable {
        privatefinal DelayQueue<DelayedOrder> queue = new DelayQueue<>();
    
        public void addOrder(Order order) {
            queue.put(new DelayedOrder(order, 30, TimeUnit.MINUTES));
        }
    
        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    DelayedOrder order = queue.take();  // 阻塞直到有到期訂單
                    checkPayment(order.getOrderId());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    
        privatestaticclass DelayedOrder implements Delayed {
            privatefinal Order order;
            privatefinallong expireTime;
    
            // 實(shí)現(xiàn) getDelay() 和 compareTo()
        }
    }

    生產(chǎn)級(jí)注意:

    1. 分布式場(chǎng)景需用 Redis/ZooKeeper 替代
    2. 集群環(huán)境下需防重復(fù)處理
    3. 添加 JVM 關(guān)閉鉤子確保任務(wù)不丟失

    異步日志系統(tǒng)

    需要記錄詳細(xì)業(yè)務(wù)日志但磁盤 I/O 不能影響主線程性能。

    public class AsyncLogger {
        privatestaticfinal BlockingQueue<LogEvent> queue =
            new LinkedTransferQueue<>();  // 高吞吐無(wú)界隊(duì)列
    
        static {
            // 守護(hù)線程消費(fèi)日志
            Thread loggerThread = new Thread(() -> {
                while (true) {
                    try {
                        LogEvent event = queue.take();
                        writeToDisk(event);
                    } catch (InterruptedException e) {
                        // 優(yōu)雅關(guān)閉處理
                        drainRemainingLogs();
                        break;
                    }
                }
            });
            loggerThread.setDaemon(true);
            loggerThread.start();
        }
    
        public static void log(LogEvent event) {
            if (!queue.offer(event)) {  // 防御性設(shè)計(jì)
                fallbackLog(event);
            }
        }
    }

    線程池隊(duì)列

    • 線程池中活躍線程數(shù)達(dá)到 corePoolSize 時(shí),線程池將會(huì)將后續(xù)的 task 提交到 BlockingQueue 中;
    • 線程池的核心方法 ThreadPoolExecutor,用 BlockingQueue 存放任務(wù)的阻塞隊(duì)列,被提交但尚未被執(zhí)行的任務(wù)。
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

    線程池在內(nèi)部實(shí)際也是構(gòu)建了一個(gè)生產(chǎn)者消費(fèi)者模型,將線程和任務(wù)兩者解耦,并不直接關(guān)聯(lián),從而良好的緩沖任務(wù),復(fù)用線程。

    圖片圖片

    不同的線程池實(shí)現(xiàn)用的是不同的阻塞隊(duì)列,newFixedThreadPool 和 newSingleThreadExecutor 用的是 LinkedBlockingQueue,newCachedThreadPool 用的是 SynchronousQueue。

    各類隊(duì)列對(duì)比和選型

    圖片圖片

    生產(chǎn)選型建議:

    • 網(wǎng)絡(luò)請(qǐng)求緩沖 → ArrayBlockingQueue(可控內(nèi)存)
    • 任務(wù)調(diào)度 → PriorityBlockingQueue(優(yōu)先級(jí)控制)
    • 線程間直接通信 → SynchronousQueue
    • 磁盤 I/O 解耦 → LinkedBlockingQueue(吞吐優(yōu)先)

    性能優(yōu)化要點(diǎn)

    隊(duì)列監(jiān)控:

    // 通過(guò) JMX 暴露指標(biāo)
    public class QueueMonitor implements QueueMonitorMXBean {
        private final BlockingQueue<?> queue;
    
        public int getQueueSize() {
            return queue.size();
        }
    
        // 注冊(cè)到 MBeanServer...
    }

    拒絕策略(以線程池為例):

    new ThreadPoolExecutor.CallerRunsPolicy() {  // 生產(chǎn)推薦策略
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            log.warn("Task rejected, running in caller thread");
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    動(dòng)態(tài)擴(kuò)縮容:

    // 根據(jù)監(jiān)控指標(biāo)調(diào)整隊(duì)列容量
    public void adjustQueueCapacity(int newSize) {
        if (queue instanceof ResizableBlockingQueue) {
            ((ResizableBlockingQueue) queue).setCapacity(newSize);
        }
    }

    掌握了各種阻塞隊(duì)列的使用場(chǎng)景,接下來(lái)我們深入拆解每個(gè)阻塞隊(duì)列的實(shí)現(xiàn)原理。

    ArrayBlockingQueue

    ArrayBlockingQueue是一個(gè)底層用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,有界是指他的容量大小是固定的,不能擴(kuò)充容量,在初始化時(shí)就必須確定隊(duì)列大小。

    它通過(guò)可重入的獨(dú)占鎖ReentrantLock來(lái)控制并發(fā),Condition來(lái)實(shí)現(xiàn)阻塞和通知喚醒。

    結(jié)構(gòu)概述

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
        final Object[] items;               // 容器數(shù)組
        int takeIndex;                      // 出隊(duì)索引
        int putIndex;                       // 入隊(duì)索引
        int count;                          // 排隊(duì)個(gè)數(shù)
        final ReentrantLock lock;           // 全局鎖
        private final Condition notEmpty;   // 出隊(duì)條件隊(duì)列
        private final Condition notFull;    // 入隊(duì)條件隊(duì)列
        ...
    }

    ArrayBlockingQueue 的結(jié)構(gòu)如圖所示:

    圖片圖片

    • ArrayBlockingQueue 的數(shù)組其實(shí)是一個(gè)邏輯上的環(huán)狀結(jié)構(gòu),在添加、取出數(shù)據(jù)的時(shí)候,并沒(méi)有像 ArrayList 一樣發(fā)生數(shù)組元素的移動(dòng)(當(dāng)然除了 removeAt(final int removeIndex));
    • 并且由 takeIndex 和 putIndex 指示讀寫位置;
    • 在讀寫的時(shí)候還有兩個(gè)讀寫條件隊(duì)列。

    阻塞入隊(duì)

    阻塞入隊(duì) put 方法:

    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        //獲取獨(dú)占鎖
        lock.lockInterruptibly();
        try {
          //如果隊(duì)列已滿則通過(guò)await阻塞put方法
            while (count == items.length)
                notFull.await();
            //滿足條件,插入元素,并喚醒因notEmpty等待的消費(fèi)線程
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
    //插入元素后將putIndex+1,當(dāng)隊(duì)列使用完后重置為0
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
    //隊(duì)列添加元素后喚醒因notEmpty等待的消費(fèi)線程
        notEmpty.signal();
    }

    阻塞出隊(duì)

    //移除隊(duì)列中的元素
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
    //獲取獨(dú)占鎖
        lock.lockInterruptibly();
        try {
          //如果隊(duì)列已空則通過(guò)await阻塞take方法
            while (count == 0)
                notEmpty.await();
            return dequeue(); //移除元素
        } finally {
            lock.unlock();
        }
    }
    
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
    //移除元素后將takeIndex+1,當(dāng)隊(duì)列使用完后重置為0
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    //隊(duì)列消費(fèi)元素后喚醒因notFull等待的消費(fèi)線程
        notFull.signal();
        return x;
    }

    隊(duì)列滿后通過(guò)notFull.await()來(lái)阻塞生產(chǎn)者線程,消費(fèi)元素后通過(guò) notFull.signal()來(lái)喚醒阻塞的生產(chǎn)者線程。

    隊(duì)列為空后通過(guò)notEmpty.await()來(lái)阻塞消費(fèi)者線程,生產(chǎn)元素后通過(guò)notEmpty.signal()喚醒阻塞的消費(fèi)者線程。

    drainTo

    drainTo方法可以一次性獲取隊(duì)列中所有的元素,它減少了鎖定隊(duì)列的次數(shù),使用得當(dāng)在某些場(chǎng)景下對(duì)性能有不錯(cuò)的提升。

    public int drainTo(Collection<? super E> c, int maxElements) {
        checkNotNull(c);
        if (c == this)
            thrownew IllegalArgumentException();
        if (maxElements <= 0)
            return0;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock; //僅獲取一次鎖
        lock.lock();
        try {
            int n = Math.min(maxElements, count); //獲取隊(duì)列中所有元素
            int take = takeIndex;
            int i = 0;
            try {
                while (i < n) {
                    @SuppressWarnings("unchecked")
                    E x = (E) items[take];
                    c.add(x); //循環(huán)插入元素
                    items[take] = null;
                    if (++take == items.length)
                        take = 0;
                    i++;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                    count -= i;
                    takeIndex = take;
                    if (itrs != null) {
                        if (count == 0)
                            itrs.queueIsEmpty();
                        elseif (i > take)
                            itrs.takeIndexWrapped();
                    }
                    for (; i > 0 && lock.hasWaiters(notFull); i--)
                        notFull.signal(); //喚醒等待的生產(chǎn)者線程
                }
            }
        } finally {
            lock.unlock();
        }
    }

    LinkedBlockingQueue

    LinkedBlockingQueue是一個(gè)底層用單向鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列,和ArrayBlockingQueue一樣,采用ReentrantLock來(lái)控制并發(fā),不同的是它使用了兩個(gè)獨(dú)占鎖來(lái)控制消費(fèi)和生產(chǎn)。

    如果不是特殊業(yè)務(wù),LinkedBlockingQueue 使用時(shí),切記要定義容量 new LinkedBlockingQueue(capacity),防止過(guò)度膨脹。

    結(jié)構(gòu)概述

    public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    // 默認(rèn) Integer.MAX_VALUE
    privatefinalint capacity;
    // 容量
    privatefinal AtomicInteger count = new AtomicInteger();
    // 頭結(jié)點(diǎn) head.item == null
    transient Node<E> head;
    // 尾節(jié)點(diǎn) last.next == null
    privatetransient Node<E> last;
    // take鎖,出隊(duì)鎖,只有take,poll方法會(huì)持有
    privatefinal ReentrantLock takeLock = new ReentrantLock();
    // 出隊(duì)等待條件
    // 當(dāng)隊(duì)列無(wú)元素時(shí),take鎖會(huì)阻塞在notEmpty條件上,等待其它線程喚醒
    privatefinal Condition notEmpty = takeLock.newCondition();
    // 入隊(duì)鎖,只有put,offer會(huì)持有
    privatefinal ReentrantLock putLock = new ReentrantLock();
    // 入隊(duì)等待條件
    // 當(dāng)隊(duì)列滿了時(shí),put鎖會(huì)會(huì)阻塞在notFull上,等待其它線程喚醒
    privatefinal Condition notFull = putLock.newCondition();
    // 基于鏈表實(shí)現(xiàn),肯定要有結(jié)點(diǎn)類,典型的單鏈表結(jié)構(gòu)
    staticclass Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
      }
    }

    LinkedBlockingQueue 的結(jié)構(gòu)如圖所示:

    圖片圖片

    如圖所示,

    • LinkedBlockingQueue 其實(shí)就是一個(gè)簡(jiǎn)單的單向鏈表,其中頭部元素的數(shù)據(jù)為空,尾部元素的 next 為空;
    • 因?yàn)樽x寫都有競(jìng)爭(zhēng),所以在頭部和尾部分別有一把鎖;同時(shí)還有對(duì)應(yīng)的兩個(gè)條件隊(duì)列;

    put 和 take 方法

    public void put(E e) throws InterruptedException {
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        //因?yàn)槭褂昧穗p鎖,需要使用AtomicInteger計(jì)算元素總量,避免并發(fā)計(jì)算不準(zhǔn)確
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                //隊(duì)列已滿,阻塞生產(chǎn)線程
                notFull.await();
            }
          //插入元素到隊(duì)列尾部
            enqueue(node);
          //count + 1
            c = count.getAndIncrement();
          //如果+1后隊(duì)列還未滿,通過(guò)其他生產(chǎn)線程繼續(xù)生產(chǎn)
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
    //只有當(dāng)之前是空時(shí),消費(fèi)隊(duì)列才會(huì)阻塞,否則是不需要通知的
        if (c == 0)
            signalNotEmpty();
    }
    
    private void enqueue(Node<E> node) {
        //將新元素添加到鏈表末尾,然后將last指向尾部元素
        last = last.next = node;
    }
    
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
              //隊(duì)列為空,阻塞消費(fèi)線程
                notEmpty.await();
            }
          //消費(fèi)一個(gè)元素
            x = dequeue();
          //count - 1
            c = count.getAndDecrement();
          // 通知其他等待的消費(fèi)線程繼續(xù)消費(fèi)
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
          //只有當(dāng)前隊(duì)列是滿的,生產(chǎn)隊(duì)列才會(huì)阻塞,否則是不需要通知的
            signalNotFull();
        return x;
    }
    
    //消費(fèi)隊(duì)列頭部的下一個(gè)元素,同時(shí)將新頭部置空
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    可以看到LinkedBlockingQueue通過(guò)takeLock和putLock兩個(gè)鎖來(lái)控制生產(chǎn)和消費(fèi),互不干擾,只要隊(duì)列未滿,生產(chǎn)線程可以一直生產(chǎn),只要隊(duì)列不為空,消費(fèi)線程可以一直消費(fèi),不會(huì)相互因?yàn)楠?dú)占鎖而阻塞。

    Chaya:“為什么 ArrayBlockingQueue 中不使用雙鎖來(lái)實(shí)現(xiàn)隊(duì)列的生產(chǎn)和消費(fèi)呢?”

    我的理解是 ArrayBlockingQueue 也能使用雙鎖來(lái)實(shí)現(xiàn)功能,但由于它底層使用了數(shù)組這種簡(jiǎn)單結(jié)構(gòu),相當(dāng)于一個(gè)共享變量,如果通過(guò)兩個(gè)鎖,需要更加精確的鎖控制。

    LinkedBlockingQueue不存在這個(gè)問(wèn)題,鏈表這種數(shù)據(jù)結(jié)構(gòu)頭尾節(jié)點(diǎn)都相對(duì)獨(dú)立,存儲(chǔ)上也不連續(xù),雙鎖控制不存在復(fù)雜性。

    PriorityBlockingQueue

    PriorityBlockingQueue是一個(gè)底層由數(shù)組實(shí)現(xiàn)的無(wú)界隊(duì)列,并帶有排序功能,同樣采用ReentrantLock來(lái)控制并發(fā)。

    由于是無(wú)界的,所以插入元素時(shí)不會(huì)阻塞,沒(méi)有隊(duì)列滿的狀態(tài),只有隊(duì)列為空的狀態(tài)。

    PriorityBlockingQueue 具有以下特性:

    • 基于優(yōu)先級(jí)排序:元素按自然順序(Comparable)或自定義 Comparator 排序。
    • 線程安全:通過(guò) ReentrantLock 保證并發(fā)操作的安全性。
    • 動(dòng)態(tài)擴(kuò)容:底層是數(shù)組實(shí)現(xiàn)的二叉堆,容量不足時(shí)自動(dòng)擴(kuò)容。
    • 阻塞操作:take() 在隊(duì)列為空時(shí)阻塞;put() 不會(huì)阻塞(因?yàn)殛?duì)列無(wú)界)。

    適用場(chǎng)景

    例子中,創(chuàng)建了一個(gè)優(yōu)先級(jí)阻塞隊(duì)列,用于存儲(chǔ)和檢索PriorityTask對(duì)象,這些對(duì)象根據(jù)它們的優(yōu)先級(jí)進(jìn)行排序,client 代碼會(huì)向隊(duì)列中添加任務(wù),并從隊(duì)列中檢索并處理優(yōu)先級(jí)最高的任務(wù)。

    // 定義一個(gè)具有優(yōu)先級(jí)的任務(wù)類
    class PriorityTask implements Comparable<PriorityTask>{
        privateint priority;
        private String name;
    
        public PriorityTask(int priority, String name) {
            this.priority = priority;
            this.name = name;
        }
    
        @Override
        public int compareTo(PriorityTask o) {
            if (this.priority < o.priority) {
                return -1;
            } elseif (this.priority > o.priority) {
                return1;
            } else {
                return0;
            }
        }
    
        @Override
        public String toString() {
            return"PriorityTask{" +
                    "priority=" + priority +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
    
    
    publicclass PriorityBlockingQueueExample {
    
        public static void main(String[] args) throws InterruptedException {
            // 創(chuàng)建一個(gè)優(yōu)先級(jí)阻塞隊(duì)列,使用PriorityTask類的自然順序進(jìn)行排序
            PriorityBlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();
    
            // 添加任務(wù)到隊(duì)列
            queue.put(new PriorityTask(3, "Task 3"));
            queue.put(new PriorityTask(1, "Task 1"));
            queue.put(new PriorityTask(2, "Task 2"));
    
            // 從隊(duì)列中取出并打印任務(wù),優(yōu)先級(jí)高的先出隊(duì)
            while (!queue.isEmpty()) {
                PriorityTask task = queue.take();
                System.out.println("Processing: " + task);
            }
        }
    
    }

    在這個(gè)示例中,定義了一個(gè)名為 PriorityTask 的類,它實(shí)現(xiàn)了 Comparable 接口,并且重寫了 compareTo 方法來(lái)定義優(yōu)先級(jí)規(guī)則。

    隊(duì)列中的元素將根據(jù)這個(gè)規(guī)則自動(dòng)排序,從而保證優(yōu)先級(jí)高的任務(wù)先被處理。

    實(shí)現(xiàn)原理

    PriorityBlockingQueue 內(nèi)部通過(guò) 數(shù)組 維護(hù)一個(gè) 最小二叉堆(默認(rèn)),堆頂元素始終是優(yōu)先級(jí)最高的(最小元素)。 數(shù)組下標(biāo)關(guān)系:

    • 父節(jié)點(diǎn):parent = (childIndex - 1) / 2
    • 左子節(jié)點(diǎn):leftChild = parent * 2 + 1
    • 右子節(jié)點(diǎn):rightChild = parent * 2 + 2
    關(guān)鍵字段
    // 存儲(chǔ)元素的數(shù)組
    privatetransient Object[] queue;
    
    // 元素?cái)?shù)量
    privatetransientint size;
    
    // 排序規(guī)則(為 null 時(shí)使用自然排序)
    privatetransient Comparator<? super E> comparator;
    
    // 保證線程安全的鎖
    privatefinal ReentrantLock lock = new ReentrantLock();
    
    // 非空條件變量(用于 take() 阻塞)
    privatefinal Condition notEmpty = lock.newCondition();
    入隊(duì)源碼
    public boolean offer(E e) {
        if (e == null)
            thrownew NullPointerException();
       // 首先獲取鎖對(duì)象。
        final ReentrantLock lock = this.lock;
       // 只有一個(gè)線程操作入隊(duì)和出隊(duì)動(dòng)作。
        lock.lock();
       // n代表數(shù)組的實(shí)際存儲(chǔ)內(nèi)容的大小
       // cap代表隊(duì)列的整體大小,也就是數(shù)組的長(zhǎng)度。
        int n, cap;
        Object[] array;
       // 如果數(shù)組實(shí)際長(zhǎng)度大于等于數(shù)組的長(zhǎng)度時(shí),需要進(jìn)行擴(kuò)容操作。
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
           // 如果用戶指定比較器,則使用用戶指定的比較器來(lái)進(jìn)行比較,如果沒(méi)有則使用默認(rèn)比較器。
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
               // 進(jìn)行上浮操作。
                siftUpComparable(n, e, array);
            else
               // 進(jìn)行上浮操作。
                siftUpUsingComparator(n, e, array, cmp);
           // 實(shí)際長(zhǎng)度增加1,由于有且僅有一個(gè)線程操作隊(duì)列,所以這里并沒(méi)有使用原子性操作。
            size = n + 1;
           // 通知等待的線程,隊(duì)列已經(jīng)有數(shù)據(jù),可以獲取數(shù)據(jù)。
            notEmpty.signal();
        } finally {
           // 解鎖操作。
            lock.unlock();
        }
       // 返回操作成功。
        returntrue;
    }
    關(guān)鍵步驟
    • 加鎖:確保線程安全。
    • 擴(kuò)容檢查:若數(shù)組已滿,調(diào)用 tryGrow() 擴(kuò)容(通常擴(kuò)容 50%)。
    • 堆上浮:將新元素插入數(shù)組末尾,逐步與父節(jié)點(diǎn)比較并交換,直到滿足堆性質(zhì)。
    • 喚醒消費(fèi)者:通知可能阻塞的 take() 線程。
    出隊(duì)
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
    
    private E dequeue() {
       // 數(shù)組的元素的個(gè)數(shù)。
        int n = size - 1;
       // 如果數(shù)組中不存在元素則直接返回null。
        if (n < 0)
            returnnull;
        else {
           // 獲取隊(duì)列數(shù)組。
            Object[] array = queue;
           // 將第一個(gè)元素也就是二叉堆的根結(jié)點(diǎn)堆頂元素作為返回結(jié)果。
            E result = (E) array[0];
           // 獲取數(shù)組中最后一個(gè)元素。
            E x = (E) array[n];
           // 將最后一個(gè)元素設(shè)置為null。
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
               // 進(jìn)行下沉操作。
                siftDownComparable(0, x, array, n);
            else
               // 進(jìn)行下沉操作。
                siftDownUsingComparator(0, x, array, n, cmp);
           // 實(shí)際元素大小減少1.
            size = n;
           // 返回結(jié)果。
            return result;
        }
    }
    關(guān)鍵步驟
    • 加鎖:確保線程安全。
    • 取出堆頂:返回堆頂元素(優(yōu)先級(jí)最高)。
    • 堆下沉:將末尾元素移到堆頂,逐步與子節(jié)點(diǎn)比較并交換,直到滿足堆性質(zhì)。

    DelayQueue

    DelayQueue 也是一個(gè)無(wú)界隊(duì)列,它是在PriorityQueue基礎(chǔ)上實(shí)現(xiàn)的,先按延遲優(yōu)先級(jí)排序,延遲時(shí)間短的排在前面。

    和PriorityBlockingQueue相似,底層也是數(shù)組,采用一個(gè)ReentrantLock來(lái)控制并發(fā)。

    由于是無(wú)界的,所以插入元素時(shí)不會(huì)阻塞,沒(méi)有隊(duì)列滿的狀態(tài)。

    private finaltransient ReentrantLock lock = new ReentrantLock();
    privatefinal PriorityQueue<E> q = new PriorityQueue<E>();//優(yōu)先級(jí)隊(duì)列
    
    public void put(E e) {
        offer(e);
    }
    
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e); //插入元素到優(yōu)先級(jí)隊(duì)列
            if (q.peek() == e) { //如果插入的元素在隊(duì)列頭部
                leader = null;
                available.signal(); //通知消費(fèi)線程
            }
            returntrue;
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek(); //獲取頭部元素
                if (first == null)
                    available.await(); //空隊(duì)列阻塞
                else {
                    long delay = first.getDelay(NANOSECONDS); //檢查元素是否延遲到期
                    if (delay <= 0)
                        return q.poll(); //到期則彈出元素
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay); //阻塞未到期的時(shí)間
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

    SynchronousQueue

    SynchronousQueue相比較之前的 4 個(gè)隊(duì)列就比較特殊了,它是一個(gè)沒(méi)有容量的隊(duì)列,也就是說(shuō)它內(nèi)部時(shí)不會(huì)對(duì)數(shù)據(jù)進(jìn)行存儲(chǔ),每進(jìn)行一次 put 之后必須要進(jìn)行一次 take,否則相同線程繼續(xù) put 會(huì)阻塞。

    這種特性很適合做一些傳遞性的工作,一個(gè)線程生產(chǎn),一個(gè)線程消費(fèi)。

    這里只對(duì)它的非公平實(shí)現(xiàn)下的 take 和 put 方法做下簡(jiǎn)單分析:

    //非公平情況下調(diào)用內(nèi)部類TransferStack的transfer方法put
    public void put(E e) throws InterruptedException {
        if (e == null) thrownew NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            thrownew InterruptedException();
        }
    }
    //非公平情況下調(diào)用內(nèi)部類TransferStack的transfer方法take
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        thrownew InterruptedException();
    }
    
    //具體的put以及take方法,只有E的區(qū)別,通過(guò)E來(lái)區(qū)別REQUEST還是DATA模式
    E transfer(E e, boolean timed, long nanos) {
        SNode s = null; // constructed/reused as needed
        int mode = (e == null) ? REQUEST : DATA;
    
        for (;;) {
            SNode h = head;
            //棧無(wú)元素或者元素和插入的元素模式相匹配,也就是說(shuō)都是插入元素
            if (h == null || h.mode == mode) {
                //有時(shí)間限制并且超時(shí)
                if (timed && nanos <= 0) {
                    if (h != null && h.isCancelled())
                        casHead(h, h.next);  // 重新設(shè)置頭節(jié)點(diǎn)
                    else
                        returnnull;
                }
                //未超時(shí)cas操作嘗試設(shè)置頭節(jié)點(diǎn)
                elseif (casHead(h, s = snode(s, e, h, mode))) {
                    //自旋一段時(shí)間后未消費(fèi)元素則掛起put線程
                    SNode m = awaitFulfill(s, timed, nanos);
                    if (m == s) {               // wait was cancelled
                        clean(s);
                        returnnull;
                    }
                    if ((h = head) != null && h.next == s)
                        casHead(h, s.next);     // help s's fulfiller
                    return (E) ((mode == REQUEST) ? m.item : s.item);
                }
            }
            //棧不為空并且和頭節(jié)點(diǎn)模式不匹配,存在元素則消費(fèi)元素并重新設(shè)置head節(jié)點(diǎn)
            elseif (!isFulfilling(h.mode)) { // try to fulfill
                if (h.isCancelled())            // already cancelled
                    casHead(h, h.next);         // pop and retry
                elseif (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                    for (;;) { // loop until matched or waiters disappear
                        SNode m = s.next;       // m is s's match
                        if (m == null) {        // all waiters are gone
                            casHead(s, null);   // pop fulfill node
                            s = null;           // use new node next time
                            break;              // restart main loop
                        }
                        SNode mn = m.next;
                        if (m.tryMatch(s)) {
                            casHead(s, mn);     // pop both s and m
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        } else                  // lost match
                            s.casNext(m, mn);   // help unlink
                    }
                }
            }
            //節(jié)點(diǎn)正在匹配階段
            else {                            // help a fulfiller
                SNode m = h.next;               // m is h's match
                if (m == null)                  // waiter is gone
                    casHead(h, null);           // pop fulfilling node
                else {
                    SNode mn = m.next;
                    if (m.tryMatch(h))          // help match
                        casHead(h, mn);         // pop both h and m
                    else                        // lost match
                        h.casNext(m, mn);       // help unlink
                }
            }
        }
    }
    
    //先自旋后掛起的核心方法
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        finallong deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        //計(jì)算自旋的次數(shù)
        int spins = (shouldSpin(s) ?
                        (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (;;) {
            if (w.isInterrupted())
                s.tryCancel();
            SNode m = s.match;
            //匹配成功過(guò)返回節(jié)點(diǎn)
            if (m != null)
                return m;
            //超時(shí)控制
            if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    s.tryCancel();
                    continue;
                }
            }
            //自旋檢查,是否進(jìn)行下一次自旋
            if (spins > 0)
                spins = shouldSpin(s) ? (spins-1) : 0;
            elseif (s.waiter == null)
                s.waiter = w; // establish waiter so can park next iter
            elseif (!timed)
                LockSupport.park(this); //在這里掛起線程
            elseif (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

    代碼非常復(fù)雜,這里說(shuō)下我所理解的核心邏輯。

    代碼中可以看到put以及take方法都是通過(guò)調(diào)用transfer方法來(lái)實(shí)現(xiàn)的,然后通過(guò)參數(shù)mode來(lái)區(qū)別,在生產(chǎn)元素時(shí)如果是同一個(gè)線程多次put則會(huì)采取自旋的方式多次嘗試put元素,可能自旋過(guò)程中元素會(huì)被消費(fèi),這樣可以及時(shí)put,降低線程掛起的性能損耗,高吞吐量的核心也在這里。

    消費(fèi)線程一樣,空棧時(shí)也會(huì)先自旋,自旋失敗然后通過(guò)線程的LockSupport.park方法掛起。

    LinkedTransferQueue

    LinkedTransferQueue 是一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞 TransferQueue 隊(duì)列。

    LinkedTransferQueue采用一種預(yù)占模式。意思就是消費(fèi)者線程取元素時(shí),如果隊(duì)列不為空,則直接取走數(shù)據(jù),若隊(duì)列為空,那就生成一個(gè)節(jié)點(diǎn)(節(jié)點(diǎn)元素為 null)入隊(duì),然后消費(fèi)者線程被等待在這個(gè)節(jié)點(diǎn)上,后面生產(chǎn)者線程入隊(duì)時(shí)發(fā)現(xiàn)有一個(gè)元素為 null 的節(jié)點(diǎn),生產(chǎn)者線程就不入隊(duì)了,直接就將元素填充到該節(jié)點(diǎn),并喚醒該節(jié)點(diǎn)等待的線程,被喚醒的消費(fèi)者線程取走元素,從調(diào)用的方法返回。我們稱這種節(jié)點(diǎn)操作為“匹配”方式。

    隊(duì)列實(shí)現(xiàn)了 TransferQueue 接口重寫了 tryTransfer 和transfer方法,這組方法和SynchronousQueue` 公平模式的隊(duì)列類似,具有匹配的功能.。

    LinkedBlockingDeque

    LinkedBlockingDeque 是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。

    所謂雙向隊(duì)列指的你可以從隊(duì)列的兩端插入和移出元素。雙端隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)。

    相比其他的阻塞隊(duì)列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結(jié)尾的方法,表示插入,獲?。╬eek)或移除雙端隊(duì)列的第一個(gè)元素。

    以 Last 單詞結(jié)尾的方法,表示插入,獲取或移除雙端隊(duì)列的最后一個(gè)元素。

    另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。

    在初始化 LinkedBlockingDeque 時(shí)可以設(shè)置容量防止其過(guò)渡膨脹,默認(rèn)容量也是 Integer.MAX_VALUE。

    另外雙向阻塞隊(duì)列可以運(yùn)用在“工作竊取”模式中。

    責(zé)任編輯:武曉燕 來(lái)源: 碼哥跳動(dòng)
    相關(guān)推薦

    2025-04-22 08:32:50

    2012-03-06 11:01:44

    Java

    2021-01-09 13:57:05

    阻塞隊(duì)列并發(fā)

    2025-05-08 08:31:33

    2025-04-23 08:31:26

    Java并發(fā)框架

    2017-02-14 10:00:19

    Java開發(fā)Lock

    2018-09-19 14:53:02

    NIOBIO運(yùn)行

    2025-04-14 08:31:20

    2025-06-13 08:00:00

    Java并發(fā)編程volatile

    2016-09-01 09:01:00

    MySQLRedisJMQ

    2025-06-10 10:15:00

    Java容器并發(fā)

    2025-04-27 08:30:48

    2019-07-18 11:08:09

    Java并發(fā)框架

    2024-05-29 08:49:45

    2024-02-23 10:10:00

    List接口Java

    2021-04-30 08:39:10

    架構(gòu)消息隊(duì)列高并發(fā)

    2019-08-28 14:25:00

    線程安全容器

    2017-04-12 10:02:21

    Java阻塞隊(duì)列原理分析

    2020-09-22 12:00:23

    Javahashmap高并發(fā)

    2024-08-26 09:51:57

    點(diǎn)贊
    收藏

    51CTO技術(shù)棧公眾號(hào)