1.8w字圖解Java并發(fā)容器框架:并發(fā)安全 Map、JUC 集合、Java 7 種阻塞隊(duì)列正確使用場(chǎng)景和原理詳解
今天進(jìn)入 Java 并發(fā)編程第二章節(jié),圍繞著并發(fā)容器展開,主要內(nèi)容如下:
- ConcurrentHashMap的使用和原理
- ConcurrentLinkedQueue 的使用和原理
- Java 7 種阻塞隊(duì)列使用場(chǎng)景和原理詳解:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue以及LinkedBlockingDeque
ConcurrentHashMap 的使用和原理
Map 是一個(gè)接口,它的實(shí)現(xiàn)方式有很多種,比如常見的 HashMap、LinkedHashMap,但是這些 Map 的實(shí)現(xiàn)并不是線程安全的,在多線程高并發(fā)的環(huán)境中會(huì)出現(xiàn)線程安全的問(wèn)題。
鑒于 Map 是一個(gè)在高并發(fā)的應(yīng)用環(huán)境中應(yīng)用比較廣泛的數(shù)據(jù)結(jié)構(gòu),Doug Lea 自 JDK 1.5 版本起在 Java 中引入了 ConcurrentHashMap。
ConcurrentHashMap 的使用
對(duì)一個(gè)技術(shù)的掌握,從使用開始。我們現(xiàn)在來(lái)實(shí)現(xiàn)一個(gè)一個(gè)高并發(fā)計(jì)數(shù)器,例如記錄網(wǎng)站訪問(wèn)量、接口調(diào)用次數(shù)等。
@Service
public class Counter {
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
public void increase(String key) {
map.compute(key, (k, v) -> (v == null) ? 1 : v + 1);
}
public int get(String key) {
return map.getOrDefault(key, 0);
}
}
- compute :這是一個(gè)并發(fā)安全原子操作,我們使用 compute 方法實(shí)現(xiàn)對(duì)計(jì)數(shù)器的增加操作。
如果 key 不存在則新建一個(gè)值為 1 的計(jì)數(shù)器;
否則將其 value 遞增 1。
- 通過(guò) get 方法可以獲取指定 key 對(duì)應(yīng)的計(jì)數(shù)器值。
這個(gè)例子比較簡(jiǎn)單,現(xiàn)在開始上強(qiáng)度。ConcurrentHashMap 還可以用來(lái)實(shí)現(xiàn)緩存管理器,例如存儲(chǔ)經(jīng)常使用的業(yè)務(wù)數(shù)據(jù)、系統(tǒng)配置等信息,從而避免頻繁的數(shù)據(jù)庫(kù)查詢或網(wǎng)絡(luò)請(qǐng)求。
以下是一個(gè)支持過(guò)期時(shí)間、自動(dòng)刷新和并發(fā)控制的緩存管理器實(shí)現(xiàn),包含詳細(xì)注釋和最佳實(shí)踐:
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* 高性能并發(fā)緩存管理器
* @param <K> 鍵類型
* @param <V> 值類型
*/
publicclass ConcurrentCache<K, V> {
privatefinal ConcurrentHashMap<K, CacheEntry<V>> cache = new ConcurrentHashMap<>();
privatefinal Function<K, V> loader; // 緩存加載器
privatefinal ScheduledExecutorService cleaner; // 過(guò)期清理線程
// 默認(rèn)配置
privatelong defaultTTL = 30_000; // 默認(rèn)30秒
privatelong cleanupInterval = 5_000; // 5秒清理一次
privateint maxRetries = 3; // 最大重試次數(shù)
privateboolean refreshOnAccess = true; // 訪問(wèn)時(shí)刷新TTL
public ConcurrentCache(Function<K, V> loader) {
this.loader = loader;
this.cleaner = Executors.newSingleThreadScheduledExecutor();
startCleanupTask();
}
/**
* 獲取緩存值(線程安全)
*/
public V get(K key) {
CacheEntry<V> entry = cache.get(key);
// 無(wú)緩存或已過(guò)期時(shí)加載
if (entry == null || entry.isExpired()) {
return loadAndCache(key);
}
// 更新訪問(wèn)時(shí)間(可選)
if (refreshOnAccess) {
entry.touch();
}
return entry.value;
}
/**
* 原子性的加載和緩存操作
*/
private V loadAndCache(K key) {
int retry = 0;
while (retry++ < maxRetries) {
try {
// 使用compute保證原子性
CacheEntry<V> newEntry = cache.compute(key, (k, oldEntry) -> {
// 檢查其他線程是否已經(jīng)加載
if (oldEntry != null && !oldEntry.isExpired()) {
return oldEntry;
}
V value = loader.apply(k);
returnnew CacheEntry<>(value, defaultTTL, TimeUnit.MILLISECONDS);
});
return newEntry.value;
} catch (Exception ex) {
if (retry >= maxRetries) {
thrownew CacheLoadException("加載緩存失敗,key=" + key, ex);
}
// 指數(shù)退避重試
sleepUninterruptibly((long) Math.pow(2, retry), TimeUnit.MILLISECONDS);
}
}
thrownew CacheLoadException("超過(guò)最大重試次數(shù),key=" + key);
}
/**
* 主動(dòng)放入緩存(支持自定義TTL)
*/
public void put(K key, V value, long ttl, TimeUnit unit) {
cache.put(key, new CacheEntry<>(value, ttl, unit));
}
/**
* 啟動(dòng)定期清理任務(wù)(雙重檢查鎖模式)
*/
private void startCleanupTask() {
if (cleaner.isShutdown()) return;
cleaner.scheduleWithFixedDelay(() -> {
cache.forEach((key, entry) -> {
if (entry.isExpired()) {
cache.remove(key, entry); // 使用CAS刪除
}
});
}, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
}
// 其他實(shí)用方法
public void remove(K key) { cache.remove(key); }
public void clear() { cache.clear(); }
public long size() { return cache.mappingCount(); }
// 配置方法(Builder模式風(fēng)格)
public ConcurrentCache<K, V> defaultTTL(long ttl, TimeUnit unit) {
this.defaultTTL = unit.toMillis(ttl);
returnthis;
}
public ConcurrentCache<K, V> cleanupInterval(long interval, TimeUnit unit) {
this.cleanupInterval = unit.toMillis(interval);
returnthis;
}
// 異常處理
privatestaticclass CacheLoadException extends RuntimeException {
CacheLoadException(String message, Throwable cause) {
super(message, cause);
}
CacheLoadException(String message) {
super(message);
}
}
// 工具方法:不可中斷的休眠
private static void sleepUninterruptibly(long duration, TimeUnit unit) {
try {
Thread.sleep(unit.toMillis(duration));
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
// 關(guān)閉時(shí)釋放資源
public void shutdown() {
cleaner.shutdownNow();
cache.clear();
}
// 緩存條目:包含值、過(guò)期時(shí)間和訪問(wèn)時(shí)間戳
privatestaticclass CacheEntry<V> {
final V value;
finallong expireAt; // 絕對(duì)過(guò)期時(shí)間(納秒)
final AtomicLong accessTime = new AtomicLong(); // 最后訪問(wèn)時(shí)間(納秒)
CacheEntry(V value, long ttl, TimeUnit unit) {
this.value = value;
this.expireAt = System.nanoTime() + unit.toNanos(ttl);
touch();
}
// 刷新訪問(wèn)時(shí)間
void touch() {
accessTime.set(System.nanoTime());
}
// 判斷是否已過(guò)期
boolean isExpired() {
return System.nanoTime() > expireAt;
}
}
}
實(shí)現(xiàn)原理
- CHM 的源碼有 6k 多行,包含的內(nèi)容多,精巧,不容易理解;建議在查看源碼的時(shí)候,可以首先把握整體結(jié)構(gòu)脈絡(luò),對(duì)于一些精巧的優(yōu)化,哈希技巧可以先了解目的就可以了,不用深究;
- 對(duì)整體把握比較清楚后,在逐步分析,可以比較快速的看懂;
- JDK1.8 版本中的 CHM,和 JDK1.7 版本的差別非常大,在查看資料的時(shí)候要注意區(qū)分,1.7 中主要是使用 Segment 分段鎖 來(lái)解決并發(fā)問(wèn)題的。
JDK 1.7 版本 ConcurrentHashMap
在 JDK1.7 版本中,ConcurrentHashMap 的數(shù)據(jù)結(jié)構(gòu)是由一個(gè) Segment 數(shù)組和多個(gè) HashEntry 組成。
而每一個(gè) Segment 元素存儲(chǔ)的是 HashEntry 數(shù)組+鏈表,并對(duì)應(yīng)一個(gè) ReentrantLock 鎖,用于并發(fā)訪問(wèn)控制。
圖片
以 put 操作為例,來(lái)看一下 ConcurrentHashMap 的實(shí)現(xiàn)過(guò)程:
- 首先計(jì)算 key 的哈希值;
- 根據(jù)哈希值找到對(duì)應(yīng)的 Segment;
- 獲取 Segment 對(duì)應(yīng)的鎖;
- 如果還沒(méi)有元素,就直接插入到 Segment 中;
- 如果已經(jīng)存在元素,就循環(huán)比較 key 是否相等;
- 如果 key 已經(jīng)存在,就根據(jù)要求更新 value;
- 如果 key 不存在,就插入新的元素(鏈表或者紅黑樹)。
上述操作中,步驟 2 到 3 相當(dāng)于對(duì)對(duì)應(yīng)的 Segment 加了一個(gè)悲觀鎖,如果 Segment 數(shù)組只有一個(gè) Segment 元素,效果與 Hashtable 類似;
如果存在多個(gè) Segment,效果就相當(dāng)于使用了分段鎖機(jī)制,提高了并發(fā)訪問(wèn)性能。
JDK 1.8 ConcurrentHashMap
在 JDK1.8 中,ConcurrentHashMap 的實(shí)現(xiàn)原理摒棄了這種設(shè)計(jì),而是選擇了與 HashMap 類似的數(shù)組+鏈表+紅黑樹的方式實(shí)現(xiàn),而加鎖則采用 CAS 和 synchronized 實(shí)現(xiàn)。
圖片
其主要區(qū)別就在 CHM 支持并發(fā):
- 使用 Unsafe 方法操作數(shù)組內(nèi)部元素,保證可見性;(U.getObjectVolatile、U.compareAndSwapObject、U.putObjectVolatile);
- 在更新和移動(dòng)節(jié)點(diǎn)的時(shí)候,直接鎖住對(duì)應(yīng)的哈希桶,鎖粒度更小,且動(dòng)態(tài)擴(kuò)展;
- 針對(duì)擴(kuò)容慢操作進(jìn)行優(yōu)化。
a.首先擴(kuò)容過(guò)程的中,節(jié)點(diǎn)首先移動(dòng)到過(guò)度表 nextTable ,所有節(jié)點(diǎn)移動(dòng)完畢時(shí)替換散列表 table;
b.移動(dòng)時(shí)先將散列表定長(zhǎng)等分,然后逆序依次領(lǐng)取任務(wù)擴(kuò)容,設(shè)置 sizeCtl 標(biāo)記正在擴(kuò)容;
c.移動(dòng)完成一個(gè)哈希桶或者遇到空桶時(shí),將其標(biāo)記為 ForwardingNode 節(jié)點(diǎn),并指向 nextTable ;
d.后有其他線程在操作哈希表時(shí),遇到 ForwardingNode 節(jié)點(diǎn),則先幫助擴(kuò)容(繼續(xù)領(lǐng)取分段任務(wù)),擴(kuò)容完成后再繼續(xù)之前的操作;
- 優(yōu)化哈希表計(jì)數(shù)器,采用 LongAdder、Striped64 類似思想;
- 以及大量的哈希算法優(yōu)化和狀態(tài)變量?jī)?yōu)化;
- 關(guān)注「碼哥跳動(dòng)」,并設(shè)置星標(biāo),一起擁抱硬核技術(shù)和對(duì)象,面向人民幣編程。
類定義和成員變量
// node數(shù)組最大容量:2^30=1073741824
privatestaticfinalint MAXIMUM_CAPACITY = 1 << 30;
// 默認(rèn)初始值,必須是2的幕數(shù)
privatestaticfinalint DEFAULT_CAPACITY = 16;
//數(shù)組可能最大值,需要與toArray()相關(guān)方法關(guān)聯(lián)
staticfinalint MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并發(fā)級(jí)別,遺留下來(lái)的,為兼容以前的版本
privatestaticfinalint DEFAULT_CONCURRENCY_LEVEL = 16;
// 負(fù)載因子
privatestaticfinalfloat LOAD_FACTOR = 0.75f;
// 鏈表轉(zhuǎn)紅黑樹閥值,> 8 鏈表轉(zhuǎn)換為紅黑樹
staticfinalint TREEIFY_THRESHOLD = 8;
//樹轉(zhuǎn)鏈表閥值,小于等于6(tranfer時(shí),lc、hc=0兩個(gè)計(jì)數(shù)器分別++記錄原bin、新binTreeNode數(shù)量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
staticfinalint UNTREEIFY_THRESHOLD = 6;
staticfinalint MIN_TREEIFY_CAPACITY = 64;
privatestaticfinalint MIN_TRANSFER_STRIDE = 16;
privatestaticint RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize的最大線程數(shù)
privatestaticfinalint MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中記錄size大小的偏移量
privatestaticfinalint RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
staticfinalint MOVED = -1;
// 樹根節(jié)點(diǎn)的hash值
staticfinalint TREEBIN = -2;
// ReservationNode的hash值
staticfinalint RESERVED = -3;
// 可用處理器數(shù)量
staticfinalint NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數(shù)組
transientvolatile Node<K,V>[] table;
/*控制標(biāo)識(shí)符,用來(lái)控制table的初始化和擴(kuò)容的操作,不同的值有不同的含義
*當(dāng)為負(fù)數(shù)時(shí):-1代表正在初始化,-N代表有N-1個(gè)線程正在 進(jìn)行擴(kuò)容
*當(dāng)為0時(shí):代表當(dāng)時(shí)的table還沒(méi)有被初始化
*當(dāng)為正數(shù)時(shí):表示初始化或者下一次進(jìn)行擴(kuò)容的大小
*/
privatetransientvolatileint sizeCtl;
上面有幾個(gè)重要的地方這里單獨(dú)講:
LOAD_FACTOR:
這里的負(fù)載系數(shù),同 HashMap 等其他 Map 的系數(shù)有明顯區(qū)別:
- 通常的系數(shù)默認(rèn) 0.75,可以由構(gòu)造函數(shù)傳入,當(dāng)節(jié)點(diǎn)數(shù) size 超過(guò) loadFactor * capacity 時(shí)擴(kuò)容;
- 而 CMH 的系數(shù)則固定 0.75(使用 n - (n >>> 2) 表示),構(gòu)造函數(shù)傳入的系數(shù)只影響初始化容量,見第 5 個(gè)構(gòu)造函數(shù)。
sizeCtl:
sizeCtl 是 CHM 中最重要的狀態(tài)變量,其中包括很多中狀態(tài),這里先整體介紹幫助后面源碼理解;
- sizeCtl = 0 :初始值,還未指定初始容量;
- sizeCtl > 0 :
table 未初始化,表示初始化容量;
table 已初始化,表示擴(kuò)容閾值(0.75n);
- sizeCtl = -1 :表示正在初始化;
- sizeCtl < -1 :表示正在擴(kuò)容,具體結(jié)構(gòu)如圖所示:
圖片
Node 節(jié)點(diǎn)
Node 是 ConcurrentHashMap 存儲(chǔ)結(jié)構(gòu)的基本單元,繼承于 HashMap 中的 Entry,用于存儲(chǔ)數(shù)據(jù),源代碼如下。
static class Node<K,V> implements Map.Entry<K,V> { // 哈希表普通節(jié)點(diǎn)
finalint hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node<K,V> find(int h, Object k) {} // 主要在擴(kuò)容時(shí),利用多態(tài)查詢已轉(zhuǎn)移節(jié)點(diǎn)
}
staticfinalclass ForwardingNode<K,V> extends Node<K,V> { // 標(biāo)識(shí)擴(kuò)容節(jié)點(diǎn)
final Node<K,V>[] nextTable; // 指向成員變量 ConcurrentHashMap.nextTable
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null); // hash = -1,快速確定 ForwardingNode 節(jié)點(diǎn)
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {}
}
staticfinalclass TreeBin<K,V> extends Node<K,V> { // 紅黑樹根節(jié)點(diǎn)
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null); // hash = -2,快速確定紅黑樹,
...
}
}
staticfinalclass TreeNode<K,V> extends Node<K,V> { } // 紅黑樹普通節(jié)點(diǎn),其 hash 同 Node 普通節(jié)點(diǎn) > 0;
哈希計(jì)算
static finalint MOVED = -1; // hash for forwarding nodes
staticfinalint TREEBIN = -2; // hash for roots of trees
staticfinalint RESERVED = -3; // hash for transient reservations
staticfinalint HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 讓高位16位,參與哈希桶定位運(yùn)算的同時(shí),保證 hash 為正
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
哈希桶可見性
一個(gè)數(shù)組即使聲明為 volatile,也只能保證這個(gè)數(shù)組引用本身的可見性,其內(nèi)部元素的可見性是無(wú)法保證的,如果每次都加鎖,則效率必然大大降低,在 CHM 中則使用 Unsafe 方法來(lái)保證:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
staticfinal <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
staticfinal <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
put 操作
思路是對(duì)當(dāng)前的 table 進(jìn)行無(wú)條件自循環(huán)直到 put 成功,可以分成以下六步流程來(lái)概述。
- 如果沒(méi)有初始化就先調(diào)用 initTable()方法來(lái)進(jìn)行初始化過(guò)程
- 如果沒(méi)有 hash 沖突就直接 CAS 插入
- 如果還在進(jìn)行擴(kuò)容操作就先進(jìn)行擴(kuò)容
- 如果存在 hash 沖突,就加鎖來(lái)保證線程安全,這里有兩種情況,一種是鏈表形式就直接遍歷到尾端插入,一種是紅黑樹就按照紅黑樹結(jié)構(gòu)插入,
- 最后一個(gè)如果該鏈表的數(shù)量大于閾值 8,就要先轉(zhuǎn)換成黑紅樹的結(jié)構(gòu),break 再一次進(jìn)入循環(huán)
- 如果添加成功就調(diào)用 addCount()方法統(tǒng)計(jì) size,并且檢查是否需要擴(kuò)容
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) thrownew NullPointerException();
int hash = spread(key.hashCode()); //兩次hash,減少hash沖突,可以均勻分布
int binCount = 0;
for (Node<K,V>[] tab = table;;) { //對(duì)這個(gè)table進(jìn)行迭代
Node<K,V> f; int n, i, fh;
//這里就是上面構(gòu)造方法沒(méi)有進(jìn)行初始化,在這里進(jìn)行判斷,為null就調(diào)用initTable進(jìn)行初始化,屬于懶漢模式初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
elseif ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置沒(méi)有數(shù)據(jù),就直接無(wú)鎖插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
elseif ((fh = f.hash) == MOVED)//如果在進(jìn)行擴(kuò)容,則先進(jìn)行擴(kuò)容操作
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//如果以上條件都不滿足,那就要進(jìn)行加鎖操作,也就是存在hash沖突,鎖住鏈表或者紅黑樹的頭結(jié)點(diǎn)
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //表示該節(jié)點(diǎn)是鏈表結(jié)構(gòu)
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//這里涉及到相同的key進(jìn)行put就會(huì)覆蓋原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //插入鏈表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
elseif (f instanceof TreeBin) {//紅黑樹結(jié)構(gòu)
Node<K,V> p;
binCount = 2;
//紅黑樹結(jié)構(gòu)旋轉(zhuǎn)插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { //如果鏈表的長(zhǎng)度大于8時(shí)就會(huì)進(jìn)行紅黑樹的轉(zhuǎn)換
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//統(tǒng)計(jì)size,并且檢查是否需要擴(kuò)容
returnnull;
}
流程圖如下所示:
圖片
get 操作
get 方法可能看代碼不是很長(zhǎng),但是他卻能 保證無(wú)鎖狀態(tài)下的內(nèi)存一致性 。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 計(jì)算 hash
if ((tab = table) != null && (n = tab.length) > 0 && // 確保 table 已經(jīng)初始化
// 確保對(duì)應(yīng)的哈希桶不為空,注意這里是 Volatile 語(yǔ)義獲??;因?yàn)閿U(kuò)容的時(shí)候,是完全拷貝,所以只要不為空,則鏈表必然完整
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash < 0,則必然在擴(kuò)容,原來(lái)位置的節(jié)點(diǎn)可能全部移動(dòng)到 i + oldCap 位置,所以利用多態(tài)到 nextTable 中查找
elseif (eh < 0) return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 遍歷鏈表
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
returnnull;
}
ConcurrentHashMap 的 get 操作的流程很簡(jiǎn)單,也很清晰,可以分為三個(gè)步驟來(lái)描述.
- 計(jì)算 hash 值,定位到該 table 索引位置,如果是首節(jié)點(diǎn)符合就返回
- 如果遇到擴(kuò)容的時(shí)候,會(huì)調(diào)用標(biāo)志正在擴(kuò)容節(jié)點(diǎn) ForwardingNode 的 find 方法,查找該節(jié)點(diǎn),匹配就返回
- 以上都不符合的話,就往下遍歷節(jié)點(diǎn),匹配就返回,否則最后就返回 null
size 操作
在 JDK1.8 版本中,對(duì)于 size 的計(jì)算,在擴(kuò)容和 addCount()方法就已經(jīng)有處理了,JDK1.7 是在調(diào)用 size()方法才去計(jì)算,其實(shí)在并發(fā)集合中去計(jì)算 size 是沒(méi)有多大的意義的,因?yàn)?size 是實(shí)時(shí)在變的,只能計(jì)算某一刻的大小,但是某一刻太快了,人的感知是一個(gè)時(shí)間段,所以并不是很精確。
擴(kuò)容
擴(kuò)容操作一直都是比較慢的操作,而 CHM 中巧妙的利用任務(wù)劃分,使得多個(gè)線程可能同時(shí)參與擴(kuò)容;
另外擴(kuò)容條件也有兩個(gè):
- 有鏈表長(zhǎng)度超過(guò) 8,但是容量小于 64 的時(shí)候,發(fā)生擴(kuò)容;
- 節(jié)點(diǎn)數(shù)超過(guò)閾值的時(shí)候,發(fā)生擴(kuò)容;
其擴(kuò)容的過(guò)程可描述為:
- 首先擴(kuò)容過(guò)程的中,節(jié)點(diǎn)首先移動(dòng)到過(guò)度表 nextTable ,所有節(jié)點(diǎn)移動(dòng)完畢時(shí)替換散列表 table;
- 移動(dòng)時(shí)先將散列表定長(zhǎng)等分,然后逆序依次領(lǐng)取任務(wù)擴(kuò)容,設(shè)置 sizeCtl 標(biāo)記正在擴(kuò)容;
- 移動(dòng)完成一個(gè)哈希桶或者遇到空桶時(shí),將其標(biāo)記為 ForwardingNode 節(jié)點(diǎn),并指向 nextTable ;
- 后有其他線程在操作哈希表時(shí),遇到 ForwardingNode 節(jié)點(diǎn),則先幫助擴(kuò)容(繼續(xù)領(lǐng)取分段任務(wù)),擴(kuò)容完成后再繼續(xù)之前的操作;
如圖:
圖片
具體源碼如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // 根據(jù) CPU 數(shù)量計(jì)算任務(wù)步長(zhǎng)
if (nextTab == null) { // 初始化 nextTab
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 擴(kuò)容一倍
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE; // 發(fā)生 OOM 時(shí),不再擴(kuò)容
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 標(biāo)記空桶,或已經(jīng)轉(zhuǎn)移完畢的桶
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) { // 逆向遍歷擴(kuò)容
Node<K,V> f; int fh;
while (advance) { // 向前獲取哈希桶
int nextIndex, nextBound;
if (--i >= bound || finishing) // 已經(jīng)取到哈希桶,或已完成時(shí)退出
advance = false;
elseif ((nextIndex = transferIndex) <= 0) { // 遍歷到達(dá)頭節(jié)點(diǎn),已經(jīng)沒(méi)有待遷移的桶,線程準(zhǔn)備退出
i = -1;
advance = false;
}
elseif (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // 當(dāng)前任務(wù)完成,領(lǐng)取下一批哈希桶
bound = nextBound;
i = nextIndex - 1; // 索引指向下一批哈希桶
advance = false;
}
}
// i < 0 :表示擴(kuò)容結(jié)束,已經(jīng)沒(méi)有待移動(dòng)的哈希桶
// i >= n :擴(kuò)容結(jié)束,再次檢查確認(rèn)
// i + n >= nextn : 在使用 nextTable 替換 table 時(shí),有線程進(jìn)入擴(kuò)容就會(huì)出現(xiàn)
if (i < 0 || i >= n || i + n >= nextn) { // 完成擴(kuò)容準(zhǔn)備退出
int sc;
if (finishing) { // 兩次檢查,只有最后一個(gè)擴(kuò)容線程退出時(shí),才更新變量
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); // 0.75*2*n
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 擴(kuò)容線程減一
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 不是最后一個(gè)線程,直接退出
finishing = advance = true; // 最后一個(gè)線程,再次檢查
i = n; // recheck before commit
}
}
elseif ((f = tabAt(tab, i)) == null) // 當(dāng)前節(jié)點(diǎn)為空,直接標(biāo)記為 ForwardingNode,然后繼續(xù)獲取下一個(gè)桶
advance = casTabAt(tab, i, null, fwd);
// 之前的線程已經(jīng)完成該桶的移動(dòng),直接跳過(guò),正常情況下自己的任務(wù)區(qū)間,不會(huì)出現(xiàn) ForwardingNode 節(jié)點(diǎn),
elseif ((fh = f.hash) == MOVED) // 此處為極端條件下的健壯性檢查
advance = true; // already processed
// 開始處理鏈表
else {
// 注意在 get 的時(shí)候,可以無(wú)鎖獲取,是因?yàn)閿U(kuò)容是全拷貝節(jié)點(diǎn),完成后最后在更新哈希桶
// 而在 put 的時(shí)候,是直接將節(jié)點(diǎn)加入尾部,獲取修改其中的值,此時(shí)如果允許 put 操作,最后就會(huì)發(fā)生臟讀,
// 所以 put 和 transfer,需要競(jìng)爭(zhēng)同一把鎖,也就是對(duì)應(yīng)的哈希桶,以保證內(nèi)存一致性效果
synchronized (f) {
if (tabAt(tab, i) == f) { // 確認(rèn)鎖定的是同一個(gè)桶
Node<K,V> ln, hn;
if (fh >= 0) { // 正常節(jié)點(diǎn)
int runBit = fh & n; // hash & n,判斷擴(kuò)容后的索引
Node<K,V> lastRun = f;
// 此處找到鏈表最后擴(kuò)容后處于同一位置的連續(xù)節(jié)點(diǎn),這樣最后一節(jié)就不用再一次復(fù)制了
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 依次將鏈表拆分成,lo、hi 兩條鏈表,即位置不變的鏈表,和位置 + oldCap 的鏈表
// 注意最后一節(jié)鏈表沒(méi)有new,而是直接使用原來(lái)的節(jié)點(diǎn)
// 同時(shí)鏈表的順序也被打亂了,lastRun 到最后為正序,前面一節(jié)為逆序
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // 插入 lo 鏈表
setTabAt(nextTab, i + n, hn); // 插入 hi 鏈表
setTabAt(tab, i, fwd); // 哈希桶移動(dòng)完成,標(biāo)記為 ForwardingNode 節(jié)點(diǎn)
advance = true; // 繼續(xù)獲取下一個(gè)桶
}
elseif (f instanceof TreeBin) { // 拆分紅黑樹
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null; // 為避免最后在反向遍歷,先留頭結(jié)點(diǎn)的引用,
TreeNode<K,V> hi = null, hiTail = null; // 因?yàn)轫樞虻逆湵恚梢约铀偌t黑樹構(gòu)造
int lc = 0, hc = 0; // 同樣記錄 lo,hi 鏈表的長(zhǎng)度
for (Node<K,V> e = t.first; e != null; e = e.next) { // 中序遍歷紅黑樹
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null); // 構(gòu)造紅黑樹節(jié)點(diǎn)
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 判斷是否需要將其轉(zhuǎn)化為紅黑樹,同時(shí)如果只有一條鏈,那么就可以不用在構(gòu)造
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
ConcurrentLinkedQueue 的使用和原理
ConcurerntLinkedQueue 一個(gè)基于單向鏈表的無(wú)界線程安全隊(duì)列,支持高并發(fā)的隊(duì)列操作,無(wú)需顯式的鎖,而且容量沒(méi)有上限。
此隊(duì)列按照 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。新的元素插入到隊(duì)列的尾部,隊(duì)列獲取操作從隊(duì)列頭部獲得元素。
應(yīng)用場(chǎng)景
常見的使用場(chǎng)景可能包括任務(wù)調(diào)度、事件處理、日志記錄等。比如訂單處理系統(tǒng),其中多個(gè)生產(chǎn)者生成訂單,多個(gè)消費(fèi)者處理訂單。
// 訂單事件處理器(生產(chǎn)環(huán)境級(jí)實(shí)現(xiàn))
publicclass OrderEventProcessor {
// 使用隊(duì)列作為訂單緩沖區(qū)(無(wú)容量限制)
privatefinal ConcurrentLinkedQueue<OrderEvent> queue = new ConcurrentLinkedQueue<>();
privatefinal ExecutorService workers = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2,
new NamedThreadFactory("order-processor")
);
// 初始化處理線程
public void start() {
for (int i = 0; i < workers.getCorePoolSize(); i++) {
workers.submit(this::processEvents);
}
}
// 接收訂單事件(來(lái)自網(wǎng)絡(luò)IO線程)
public void receiveEvent(OrderEvent event) {
queue.offer(event); // 無(wú)阻塞插入
metrics.recordEnqueue(); // 監(jiān)控埋點(diǎn)
}
// 事件處理核心邏輯
private void processEvents() {
while (!Thread.currentThread().isInterrupted()) {
OrderEvent event = queue.poll(); // 無(wú)阻塞獲取
if (event != null) {
try {
handleEvent(event);
} catch (Exception ex) {
handleFailure(event, ex); // 異常處理
}
} else {
// 隊(duì)列空時(shí)自適應(yīng)休眠(避免CPU空轉(zhuǎn))
sleepBackoff();
}
}
}
// 指數(shù)退避休眠(動(dòng)態(tài)調(diào)節(jié)CPU使用率)
private void sleepBackoff() {
long delay = 1; // 初始1ms
while (queue.isEmpty() && delay < 100) {
try {
TimeUnit.MILLISECONDS.sleep(delay);
delay <<= 1; // 指數(shù)增加等待時(shí)間
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 優(yōu)雅關(guān)閉
public void shutdown() {
workers.shutdown();
while (!queue.isEmpty()) {
// 等待剩余任務(wù)處理完成
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
}
}
需要注意的是,以下場(chǎng)景不適合。
- 阻塞需求,需要 take()阻塞等待。
- 有界隊(duì)列。
- 強(qiáng)一致性,要求精確的 size。
實(shí)現(xiàn)原理
以 JDK 17 源碼為基線,ConcurrentLinkedQueue 是 Java 并發(fā)包中基于無(wú)鎖算法實(shí)現(xiàn)的線程安全隊(duì)列,專為高并發(fā)場(chǎng)景設(shè)計(jì)。其核心設(shè)計(jì)目標(biāo)包括:
- 無(wú)阻塞操作:通過(guò) CAS 實(shí)現(xiàn)非阻塞算法
- 線性擴(kuò)展能力:性能隨 CPU 核心數(shù)增加而提升
- 弱一致性:迭代器與 size() 方法返回近似值
- 內(nèi)存效率:每個(gè)元素僅需 24 字節(jié)存儲(chǔ)開銷
ConcurrentLinkedQueue 的結(jié)構(gòu)
ConcurrentLinkedQueue 由 head 節(jié)點(diǎn)和 tail 節(jié)點(diǎn)組成,每個(gè)節(jié)點(diǎn)(Node)由節(jié)點(diǎn)元素(item)和指向下一個(gè)節(jié)點(diǎn)的引用(next)組成,節(jié)點(diǎn)與節(jié)點(diǎn)之間就是通過(guò)這個(gè) next 關(guān)聯(lián)起來(lái),從而組成一張鏈表結(jié)構(gòu)的隊(duì)列。
ConcurrentLinkedQueue 的節(jié)點(diǎn)都是 Node 類型的。
static finalclass Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
ITEM.set(this, item);
}
Node() {}
void appendRelaxed(Node<E> next) {
NEXT.set(this, next);
}
boolean casItem(E cmp, E val) {
return ITEM.compareAndSet(this, cmp, val);
}
}
入隊(duì)操作(offer)
流程圖如下。
圖片
代碼解析。
public boolean offer(E e) {
final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// CAS插入新節(jié)點(diǎn)
if (NEXT.compareAndSet(p, null, newNode)) {
// 惰性更新tail(允許失?。? if (p != t)
TAIL.weakCompareAndSet(this, t, newNode);
returntrue;
}
}
elseif (p == q) // 處理已移除節(jié)點(diǎn)
p = (t != (t = tail)) ? t : head;
else// 推進(jìn)指針
p = (p != t && t != (t = tail)) ? t : q;
}
}
優(yōu)化點(diǎn):
- weakCompareAndSet 減少內(nèi)存屏障開銷
- 允許尾指針最多滯后 log(n) 個(gè)節(jié)點(diǎn)
- 通過(guò) VarHandle 實(shí)現(xiàn)精確內(nèi)存排序
出隊(duì)操作(poll)
核心機(jī)制:
- 兩階段出隊(duì):先標(biāo)記 item 為 null,再更新 head
- 頭指針可能跳躍多個(gè)已消費(fèi)節(jié)點(diǎn)
- 自動(dòng)清理無(wú)效節(jié)點(diǎn)
流程圖如下。
圖片
核心源碼。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item;
if ((item = p.item) != null && p.casItem(item, null)) {
// 成功獲取數(shù)據(jù)
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
elseif ((q = p.next) == null) {
updateHead(h, p);
returnnull;
}
elseif (p == q)
// 繼續(xù)循環(huán)
continue restartFromHead;
}
}
}
Java 7 種阻塞隊(duì)列
Chaya:“什么是阻塞隊(duì)列?”
阻塞隊(duì)列,顧名思義,首先它是一個(gè)隊(duì)列,線程 1 往阻塞隊(duì)列中添加元素,而線程 2 從阻塞隊(duì)列中移除元素。
- 當(dāng)阻塞隊(duì)列是空時(shí),從隊(duì)列中獲取元素的操作將會(huì)被阻塞。
- 當(dāng)阻塞隊(duì)列是滿時(shí),從隊(duì)列中添加元素的操作將會(huì)被阻塞。
JDK1.8 中的阻塞隊(duì)列實(shí)現(xiàn)共有 7 個(gè),分別是:
- ArrayBlockingQueue:基于數(shù)組的有界隊(duì)列;
- LinkedBlockingQueue:基于鏈表的無(wú)界隊(duì)列(可以設(shè)置容量);
- PriorityBlockingQueue:基于二叉堆的無(wú)界優(yōu)先級(jí)隊(duì)列;
- DelayQueue:基于 PriorityBlockingQueue 的無(wú)界延遲隊(duì)列;
- SynchronousQueue:無(wú)容量的阻塞隊(duì)列(Executors.newCachedThreadPool() 中使用的隊(duì)列);
- LinkedTransferQueue:基于鏈表的無(wú)界隊(duì)列;
- LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
阻塞隊(duì)列核心接口
阻塞隊(duì)列統(tǒng)一實(shí)現(xiàn)了BlockingQueue接口,BlockingQueue接口在java.util包Queue接口的基礎(chǔ)上提供了put(e)以及take()兩個(gè)阻塞方法。
除了阻塞功能,BlockingQueue 接口還定義了定時(shí)的offer以及poll,以及一次性移除方法drainTo。
//插入元素,隊(duì)列滿后會(huì)拋出異常
boolean add(E e);
//移除元素,隊(duì)列為空時(shí)會(huì)拋出異常
E remove();
//插入元素,成功反會(huì)true
boolean offer(E e);
//移除元素
E poll();
//插入元素,隊(duì)列滿后會(huì)阻塞
void put(E e) throws InterruptedException;
//移除元素,隊(duì)列空后會(huì)阻塞
E take() throws InterruptedException;
//限時(shí)插入
boolean offer(E e, long timeout, TimeUnit unit)
//限時(shí)移除
E poll(long timeout, TimeUnit unit);
//獲取所有元素到Collection中
int drainTo(Collection<? super E> c);
阻塞隊(duì)列 6 大使用場(chǎng)景
Java 阻塞隊(duì)列(BlockingQueue)是并發(fā)編程中的核心工具,其線程安全和阻塞特性使其在以下場(chǎng)景中發(fā)揮重要作用。
生產(chǎn)者-消費(fèi)者模型(經(jīng)典場(chǎng)景)
電商系統(tǒng)中,用戶下單后需異步處理庫(kù)存扣減、支付回調(diào)、物流通知等操作。
痛點(diǎn):生產(chǎn)(下單)與消費(fèi)(處理)速度不一致,需解耦并保證高吞吐。
public class OrderProcessor {
// 生產(chǎn)級(jí)配置:建議隊(duì)列大小為 CPU 核心數(shù)*2~4
privatestaticfinal BlockingQueue<Order> queue = new LinkedBlockingQueue<>(2048);
privatestaticfinal ExecutorService consumerPool = Executors.newFixedThreadPool(8);
// 生產(chǎn)者(Web 服務(wù)線程)
public void submitOrder(Order order) {
if (!queue.offer(order)) { // 隊(duì)列滿時(shí)快速失敗
log.warn("Order queue overflow! Reject order: {}", order.getId());
thrownew ServiceException("系統(tǒng)繁忙,請(qǐng)稍后重試");
}
log.info("Order submitted: {}", order.getId());
}
// 消費(fèi)者(后臺(tái)線程池)
@PostConstruct
public void initConsumers() {
for (int i = 0; i < 8; i++) {
consumerPool.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Order order = queue.take(); // 阻塞直到有訂單
processOrder(order);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
}
private void processOrder(Order order) {
// 扣減庫(kù)存、支付回調(diào)等業(yè)務(wù)邏輯
}
}
線程池任務(wù)調(diào)度
場(chǎng)景問(wèn)題
視頻平臺(tái)需將上傳的視頻轉(zhuǎn)碼為不同分辨率,任務(wù)具有突發(fā)性。
public class VideoTranscoder {
// 使用 PriorityBlockingQueue 確保 VIP 用戶優(yōu)先處理
privatestaticfinal BlockingQueue<TranscodeTask> queue =
new PriorityBlockingQueue<>(1000, Comparator.comparing(TranscodeTask::getPriority));
// 自定義線程池(核心線程數(shù)=CPU數(shù), 最大線程數(shù)=CPU數(shù)*2)
privatestaticfinal ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 30, TimeUnit.SECONDS, queue
);
public void submitTranscodeTask(TranscodeTask task) {
executor.execute(() -> {
// 執(zhí)行實(shí)際轉(zhuǎn)碼操作
process(task);
});
}
// 監(jiān)控隊(duì)列狀態(tài)(生產(chǎn)環(huán)境建議接入 Prometheus)
public MonitorData getQueueStatus() {
returnnew MonitorData(
queue.size(),
executor.getActiveCount(),
queue.remainingCapacity()
);
}
}
生產(chǎn)級(jí)要點(diǎn):
- 使用有界隊(duì)列避免 OOM
- RejectedExecutionHandler 需配置合理拒絕策略
- 隊(duì)列監(jiān)控接入告警系統(tǒng)
流量削峰
瞬時(shí)流量高峰可達(dá)平時(shí) 100 倍,數(shù)據(jù)庫(kù)無(wú)法承受直接壓力。
public class SeckillService {
// 隊(duì)列容量=商品庫(kù)存*2(內(nèi)存可控)
privatefinal BlockingQueue<SeckillRequest> queue =
new ArrayBlockingQueue<>(20000);
// 異步消費(fèi)隊(duì)列
@Scheduled(fixedRate = 100)
public void processQueue() {
List<SeckillRequest> batch = new ArrayList<>(100);
queue.drainTo(batch, 100); // 批量取100條
if (!batch.isEmpty()) {
seckillDao.batchProcess(batch); // 批量寫入數(shù)據(jù)庫(kù)
}
}
public boolean trySeckill(SeckillRequest request) {
return queue.offer(request); // 非阻塞提交
}
}
圖片
生產(chǎn)級(jí)設(shè)計(jì):
- 隊(duì)列容量與數(shù)據(jù)庫(kù)吞吐量匹配
- 批量處理減少數(shù)據(jù)庫(kù)壓力
- 前端配合顯示排隊(duì)狀態(tài)
延遲任務(wù)調(diào)度:訂單超時(shí)關(guān)閉
需在訂單創(chuàng)建 30 分鐘后檢查支付狀態(tài),未支付自動(dòng)關(guān)閉。
public class OrderTimeoutChecker implements Runnable {
privatefinal DelayQueue<DelayedOrder> queue = new DelayQueue<>();
public void addOrder(Order order) {
queue.put(new DelayedOrder(order, 30, TimeUnit.MINUTES));
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
DelayedOrder order = queue.take(); // 阻塞直到有到期訂單
checkPayment(order.getOrderId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
privatestaticclass DelayedOrder implements Delayed {
privatefinal Order order;
privatefinallong expireTime;
// 實(shí)現(xiàn) getDelay() 和 compareTo()
}
}
生產(chǎn)級(jí)注意:
- 分布式場(chǎng)景需用 Redis/ZooKeeper 替代
- 集群環(huán)境下需防重復(fù)處理
- 添加 JVM 關(guān)閉鉤子確保任務(wù)不丟失
異步日志系統(tǒng)
需要記錄詳細(xì)業(yè)務(wù)日志但磁盤 I/O 不能影響主線程性能。
public class AsyncLogger {
privatestaticfinal BlockingQueue<LogEvent> queue =
new LinkedTransferQueue<>(); // 高吞吐無(wú)界隊(duì)列
static {
// 守護(hù)線程消費(fèi)日志
Thread loggerThread = new Thread(() -> {
while (true) {
try {
LogEvent event = queue.take();
writeToDisk(event);
} catch (InterruptedException e) {
// 優(yōu)雅關(guān)閉處理
drainRemainingLogs();
break;
}
}
});
loggerThread.setDaemon(true);
loggerThread.start();
}
public static void log(LogEvent event) {
if (!queue.offer(event)) { // 防御性設(shè)計(jì)
fallbackLog(event);
}
}
}
線程池隊(duì)列
- 線程池中活躍線程數(shù)達(dá)到 corePoolSize 時(shí),線程池將會(huì)將后續(xù)的 task 提交到 BlockingQueue 中;
- 線程池的核心方法 ThreadPoolExecutor,用 BlockingQueue 存放任務(wù)的阻塞隊(duì)列,被提交但尚未被執(zhí)行的任務(wù)。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
線程池在內(nèi)部實(shí)際也是構(gòu)建了一個(gè)生產(chǎn)者消費(fèi)者模型,將線程和任務(wù)兩者解耦,并不直接關(guān)聯(lián),從而良好的緩沖任務(wù),復(fù)用線程。
圖片
不同的線程池實(shí)現(xiàn)用的是不同的阻塞隊(duì)列,newFixedThreadPool 和 newSingleThreadExecutor 用的是 LinkedBlockingQueue,newCachedThreadPool 用的是 SynchronousQueue。
各類隊(duì)列對(duì)比和選型
圖片
生產(chǎn)選型建議:
- 網(wǎng)絡(luò)請(qǐng)求緩沖 → ArrayBlockingQueue(可控內(nèi)存)
- 任務(wù)調(diào)度 → PriorityBlockingQueue(優(yōu)先級(jí)控制)
- 線程間直接通信 → SynchronousQueue
- 磁盤 I/O 解耦 → LinkedBlockingQueue(吞吐優(yōu)先)
性能優(yōu)化要點(diǎn)
隊(duì)列監(jiān)控:
// 通過(guò) JMX 暴露指標(biāo)
public class QueueMonitor implements QueueMonitorMXBean {
private final BlockingQueue<?> queue;
public int getQueueSize() {
return queue.size();
}
// 注冊(cè)到 MBeanServer...
}
拒絕策略(以線程池為例):
new ThreadPoolExecutor.CallerRunsPolicy() { // 生產(chǎn)推薦策略
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
log.warn("Task rejected, running in caller thread");
if (!e.isShutdown()) {
r.run();
}
}
}
動(dòng)態(tài)擴(kuò)縮容:
// 根據(jù)監(jiān)控指標(biāo)調(diào)整隊(duì)列容量
public void adjustQueueCapacity(int newSize) {
if (queue instanceof ResizableBlockingQueue) {
((ResizableBlockingQueue) queue).setCapacity(newSize);
}
}
掌握了各種阻塞隊(duì)列的使用場(chǎng)景,接下來(lái)我們深入拆解每個(gè)阻塞隊(duì)列的實(shí)現(xiàn)原理。
ArrayBlockingQueue
ArrayBlockingQueue是一個(gè)底層用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,有界是指他的容量大小是固定的,不能擴(kuò)充容量,在初始化時(shí)就必須確定隊(duì)列大小。
它通過(guò)可重入的獨(dú)占鎖ReentrantLock來(lái)控制并發(fā),Condition來(lái)實(shí)現(xiàn)阻塞和通知喚醒。
結(jié)構(gòu)概述
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
final Object[] items; // 容器數(shù)組
int takeIndex; // 出隊(duì)索引
int putIndex; // 入隊(duì)索引
int count; // 排隊(duì)個(gè)數(shù)
final ReentrantLock lock; // 全局鎖
private final Condition notEmpty; // 出隊(duì)條件隊(duì)列
private final Condition notFull; // 入隊(duì)條件隊(duì)列
...
}
ArrayBlockingQueue 的結(jié)構(gòu)如圖所示:
圖片
- ArrayBlockingQueue 的數(shù)組其實(shí)是一個(gè)邏輯上的環(huán)狀結(jié)構(gòu),在添加、取出數(shù)據(jù)的時(shí)候,并沒(méi)有像 ArrayList 一樣發(fā)生數(shù)組元素的移動(dòng)(當(dāng)然除了 removeAt(final int removeIndex));
- 并且由 takeIndex 和 putIndex 指示讀寫位置;
- 在讀寫的時(shí)候還有兩個(gè)讀寫條件隊(duì)列。
阻塞入隊(duì)
阻塞入隊(duì) put 方法:
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
//獲取獨(dú)占鎖
lock.lockInterruptibly();
try {
//如果隊(duì)列已滿則通過(guò)await阻塞put方法
while (count == items.length)
notFull.await();
//滿足條件,插入元素,并喚醒因notEmpty等待的消費(fèi)線程
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
//插入元素后將putIndex+1,當(dāng)隊(duì)列使用完后重置為0
if (++putIndex == items.length)
putIndex = 0;
count++;
//隊(duì)列添加元素后喚醒因notEmpty等待的消費(fèi)線程
notEmpty.signal();
}
阻塞出隊(duì)
//移除隊(duì)列中的元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//獲取獨(dú)占鎖
lock.lockInterruptibly();
try {
//如果隊(duì)列已空則通過(guò)await阻塞take方法
while (count == 0)
notEmpty.await();
return dequeue(); //移除元素
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
//移除元素后將takeIndex+1,當(dāng)隊(duì)列使用完后重置為0
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//隊(duì)列消費(fèi)元素后喚醒因notFull等待的消費(fèi)線程
notFull.signal();
return x;
}
隊(duì)列滿后通過(guò)notFull.await()來(lái)阻塞生產(chǎn)者線程,消費(fèi)元素后通過(guò) notFull.signal()來(lái)喚醒阻塞的生產(chǎn)者線程。
隊(duì)列為空后通過(guò)notEmpty.await()來(lái)阻塞消費(fèi)者線程,生產(chǎn)元素后通過(guò)notEmpty.signal()喚醒阻塞的消費(fèi)者線程。
drainTo
drainTo方法可以一次性獲取隊(duì)列中所有的元素,它減少了鎖定隊(duì)列的次數(shù),使用得當(dāng)在某些場(chǎng)景下對(duì)性能有不錯(cuò)的提升。
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c);
if (c == this)
thrownew IllegalArgumentException();
if (maxElements <= 0)
return0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock; //僅獲取一次鎖
lock.lock();
try {
int n = Math.min(maxElements, count); //獲取隊(duì)列中所有元素
int take = takeIndex;
int i = 0;
try {
while (i < n) {
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x); //循環(huán)插入元素
items[take] = null;
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
elseif (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal(); //喚醒等待的生產(chǎn)者線程
}
}
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
LinkedBlockingQueue是一個(gè)底層用單向鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列,和ArrayBlockingQueue一樣,采用ReentrantLock來(lái)控制并發(fā),不同的是它使用了兩個(gè)獨(dú)占鎖來(lái)控制消費(fèi)和生產(chǎn)。
如果不是特殊業(yè)務(wù),LinkedBlockingQueue 使用時(shí),切記要定義容量 new LinkedBlockingQueue(capacity),防止過(guò)度膨脹。
結(jié)構(gòu)概述
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
// 默認(rèn) Integer.MAX_VALUE
privatefinalint capacity;
// 容量
privatefinal AtomicInteger count = new AtomicInteger();
// 頭結(jié)點(diǎn) head.item == null
transient Node<E> head;
// 尾節(jié)點(diǎn) last.next == null
privatetransient Node<E> last;
// take鎖,出隊(duì)鎖,只有take,poll方法會(huì)持有
privatefinal ReentrantLock takeLock = new ReentrantLock();
// 出隊(duì)等待條件
// 當(dāng)隊(duì)列無(wú)元素時(shí),take鎖會(huì)阻塞在notEmpty條件上,等待其它線程喚醒
privatefinal Condition notEmpty = takeLock.newCondition();
// 入隊(duì)鎖,只有put,offer會(huì)持有
privatefinal ReentrantLock putLock = new ReentrantLock();
// 入隊(duì)等待條件
// 當(dāng)隊(duì)列滿了時(shí),put鎖會(huì)會(huì)阻塞在notFull上,等待其它線程喚醒
privatefinal Condition notFull = putLock.newCondition();
// 基于鏈表實(shí)現(xiàn),肯定要有結(jié)點(diǎn)類,典型的單鏈表結(jié)構(gòu)
staticclass Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
}
LinkedBlockingQueue 的結(jié)構(gòu)如圖所示:
圖片
如圖所示,
- LinkedBlockingQueue 其實(shí)就是一個(gè)簡(jiǎn)單的單向鏈表,其中頭部元素的數(shù)據(jù)為空,尾部元素的 next 為空;
- 因?yàn)樽x寫都有競(jìng)爭(zhēng),所以在頭部和尾部分別有一把鎖;同時(shí)還有對(duì)應(yīng)的兩個(gè)條件隊(duì)列;
put 和 take 方法
public void put(E e) throws InterruptedException {
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//因?yàn)槭褂昧穗p鎖,需要使用AtomicInteger計(jì)算元素總量,避免并發(fā)計(jì)算不準(zhǔn)確
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//隊(duì)列已滿,阻塞生產(chǎn)線程
notFull.await();
}
//插入元素到隊(duì)列尾部
enqueue(node);
//count + 1
c = count.getAndIncrement();
//如果+1后隊(duì)列還未滿,通過(guò)其他生產(chǎn)線程繼續(xù)生產(chǎn)
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//只有當(dāng)之前是空時(shí),消費(fèi)隊(duì)列才會(huì)阻塞,否則是不需要通知的
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
//將新元素添加到鏈表末尾,然后將last指向尾部元素
last = last.next = node;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//隊(duì)列為空,阻塞消費(fèi)線程
notEmpty.await();
}
//消費(fèi)一個(gè)元素
x = dequeue();
//count - 1
c = count.getAndDecrement();
// 通知其他等待的消費(fèi)線程繼續(xù)消費(fèi)
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
//只有當(dāng)前隊(duì)列是滿的,生產(chǎn)隊(duì)列才會(huì)阻塞,否則是不需要通知的
signalNotFull();
return x;
}
//消費(fèi)隊(duì)列頭部的下一個(gè)元素,同時(shí)將新頭部置空
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
可以看到LinkedBlockingQueue通過(guò)takeLock和putLock兩個(gè)鎖來(lái)控制生產(chǎn)和消費(fèi),互不干擾,只要隊(duì)列未滿,生產(chǎn)線程可以一直生產(chǎn),只要隊(duì)列不為空,消費(fèi)線程可以一直消費(fèi),不會(huì)相互因?yàn)楠?dú)占鎖而阻塞。
Chaya:“為什么 ArrayBlockingQueue 中不使用雙鎖來(lái)實(shí)現(xiàn)隊(duì)列的生產(chǎn)和消費(fèi)呢?”
我的理解是 ArrayBlockingQueue 也能使用雙鎖來(lái)實(shí)現(xiàn)功能,但由于它底層使用了數(shù)組這種簡(jiǎn)單結(jié)構(gòu),相當(dāng)于一個(gè)共享變量,如果通過(guò)兩個(gè)鎖,需要更加精確的鎖控制。
LinkedBlockingQueue不存在這個(gè)問(wèn)題,鏈表這種數(shù)據(jù)結(jié)構(gòu)頭尾節(jié)點(diǎn)都相對(duì)獨(dú)立,存儲(chǔ)上也不連續(xù),雙鎖控制不存在復(fù)雜性。
PriorityBlockingQueue
PriorityBlockingQueue是一個(gè)底層由數(shù)組實(shí)現(xiàn)的無(wú)界隊(duì)列,并帶有排序功能,同樣采用ReentrantLock來(lái)控制并發(fā)。
由于是無(wú)界的,所以插入元素時(shí)不會(huì)阻塞,沒(méi)有隊(duì)列滿的狀態(tài),只有隊(duì)列為空的狀態(tài)。
PriorityBlockingQueue 具有以下特性:
- 基于優(yōu)先級(jí)排序:元素按自然順序(Comparable)或自定義 Comparator 排序。
- 線程安全:通過(guò) ReentrantLock 保證并發(fā)操作的安全性。
- 動(dòng)態(tài)擴(kuò)容:底層是數(shù)組實(shí)現(xiàn)的二叉堆,容量不足時(shí)自動(dòng)擴(kuò)容。
- 阻塞操作:take() 在隊(duì)列為空時(shí)阻塞;put() 不會(huì)阻塞(因?yàn)殛?duì)列無(wú)界)。
適用場(chǎng)景
例子中,創(chuàng)建了一個(gè)優(yōu)先級(jí)阻塞隊(duì)列,用于存儲(chǔ)和檢索PriorityTask對(duì)象,這些對(duì)象根據(jù)它們的優(yōu)先級(jí)進(jìn)行排序,client 代碼會(huì)向隊(duì)列中添加任務(wù),并從隊(duì)列中檢索并處理優(yōu)先級(jí)最高的任務(wù)。
// 定義一個(gè)具有優(yōu)先級(jí)的任務(wù)類
class PriorityTask implements Comparable<PriorityTask>{
privateint priority;
private String name;
public PriorityTask(int priority, String name) {
this.priority = priority;
this.name = name;
}
@Override
public int compareTo(PriorityTask o) {
if (this.priority < o.priority) {
return -1;
} elseif (this.priority > o.priority) {
return1;
} else {
return0;
}
}
@Override
public String toString() {
return"PriorityTask{" +
"priority=" + priority +
", name='" + name + '\'' +
'}';
}
}
publicclass PriorityBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建一個(gè)優(yōu)先級(jí)阻塞隊(duì)列,使用PriorityTask類的自然順序進(jìn)行排序
PriorityBlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();
// 添加任務(wù)到隊(duì)列
queue.put(new PriorityTask(3, "Task 3"));
queue.put(new PriorityTask(1, "Task 1"));
queue.put(new PriorityTask(2, "Task 2"));
// 從隊(duì)列中取出并打印任務(wù),優(yōu)先級(jí)高的先出隊(duì)
while (!queue.isEmpty()) {
PriorityTask task = queue.take();
System.out.println("Processing: " + task);
}
}
}
在這個(gè)示例中,定義了一個(gè)名為 PriorityTask 的類,它實(shí)現(xiàn)了 Comparable 接口,并且重寫了 compareTo 方法來(lái)定義優(yōu)先級(jí)規(guī)則。
隊(duì)列中的元素將根據(jù)這個(gè)規(guī)則自動(dòng)排序,從而保證優(yōu)先級(jí)高的任務(wù)先被處理。
實(shí)現(xiàn)原理
PriorityBlockingQueue 內(nèi)部通過(guò) 數(shù)組 維護(hù)一個(gè) 最小二叉堆(默認(rèn)),堆頂元素始終是優(yōu)先級(jí)最高的(最小元素)。 數(shù)組下標(biāo)關(guān)系:
- 父節(jié)點(diǎn):parent = (childIndex - 1) / 2
- 左子節(jié)點(diǎn):leftChild = parent * 2 + 1
- 右子節(jié)點(diǎn):rightChild = parent * 2 + 2
關(guān)鍵字段
// 存儲(chǔ)元素的數(shù)組
privatetransient Object[] queue;
// 元素?cái)?shù)量
privatetransientint size;
// 排序規(guī)則(為 null 時(shí)使用自然排序)
privatetransient Comparator<? super E> comparator;
// 保證線程安全的鎖
privatefinal ReentrantLock lock = new ReentrantLock();
// 非空條件變量(用于 take() 阻塞)
privatefinal Condition notEmpty = lock.newCondition();
入隊(duì)源碼
public boolean offer(E e) {
if (e == null)
thrownew NullPointerException();
// 首先獲取鎖對(duì)象。
final ReentrantLock lock = this.lock;
// 只有一個(gè)線程操作入隊(duì)和出隊(duì)動(dòng)作。
lock.lock();
// n代表數(shù)組的實(shí)際存儲(chǔ)內(nèi)容的大小
// cap代表隊(duì)列的整體大小,也就是數(shù)組的長(zhǎng)度。
int n, cap;
Object[] array;
// 如果數(shù)組實(shí)際長(zhǎng)度大于等于數(shù)組的長(zhǎng)度時(shí),需要進(jìn)行擴(kuò)容操作。
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
// 如果用戶指定比較器,則使用用戶指定的比較器來(lái)進(jìn)行比較,如果沒(méi)有則使用默認(rèn)比較器。
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 進(jìn)行上浮操作。
siftUpComparable(n, e, array);
else
// 進(jìn)行上浮操作。
siftUpUsingComparator(n, e, array, cmp);
// 實(shí)際長(zhǎng)度增加1,由于有且僅有一個(gè)線程操作隊(duì)列,所以這里并沒(méi)有使用原子性操作。
size = n + 1;
// 通知等待的線程,隊(duì)列已經(jīng)有數(shù)據(jù),可以獲取數(shù)據(jù)。
notEmpty.signal();
} finally {
// 解鎖操作。
lock.unlock();
}
// 返回操作成功。
returntrue;
}
關(guān)鍵步驟
- 加鎖:確保線程安全。
- 擴(kuò)容檢查:若數(shù)組已滿,調(diào)用 tryGrow() 擴(kuò)容(通常擴(kuò)容 50%)。
- 堆上浮:將新元素插入數(shù)組末尾,逐步與父節(jié)點(diǎn)比較并交換,直到滿足堆性質(zhì)。
- 喚醒消費(fèi)者:通知可能阻塞的 take() 線程。
出隊(duì)
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
// 數(shù)組的元素的個(gè)數(shù)。
int n = size - 1;
// 如果數(shù)組中不存在元素則直接返回null。
if (n < 0)
returnnull;
else {
// 獲取隊(duì)列數(shù)組。
Object[] array = queue;
// 將第一個(gè)元素也就是二叉堆的根結(jié)點(diǎn)堆頂元素作為返回結(jié)果。
E result = (E) array[0];
// 獲取數(shù)組中最后一個(gè)元素。
E x = (E) array[n];
// 將最后一個(gè)元素設(shè)置為null。
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 進(jìn)行下沉操作。
siftDownComparable(0, x, array, n);
else
// 進(jìn)行下沉操作。
siftDownUsingComparator(0, x, array, n, cmp);
// 實(shí)際元素大小減少1.
size = n;
// 返回結(jié)果。
return result;
}
}
關(guān)鍵步驟
- 加鎖:確保線程安全。
- 取出堆頂:返回堆頂元素(優(yōu)先級(jí)最高)。
- 堆下沉:將末尾元素移到堆頂,逐步與子節(jié)點(diǎn)比較并交換,直到滿足堆性質(zhì)。
DelayQueue
DelayQueue 也是一個(gè)無(wú)界隊(duì)列,它是在PriorityQueue基礎(chǔ)上實(shí)現(xiàn)的,先按延遲優(yōu)先級(jí)排序,延遲時(shí)間短的排在前面。
和PriorityBlockingQueue相似,底層也是數(shù)組,采用一個(gè)ReentrantLock來(lái)控制并發(fā)。
由于是無(wú)界的,所以插入元素時(shí)不會(huì)阻塞,沒(méi)有隊(duì)列滿的狀態(tài)。
private finaltransient ReentrantLock lock = new ReentrantLock();
privatefinal PriorityQueue<E> q = new PriorityQueue<E>();//優(yōu)先級(jí)隊(duì)列
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); //插入元素到優(yōu)先級(jí)隊(duì)列
if (q.peek() == e) { //如果插入的元素在隊(duì)列頭部
leader = null;
available.signal(); //通知消費(fèi)線程
}
returntrue;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); //獲取頭部元素
if (first == null)
available.await(); //空隊(duì)列阻塞
else {
long delay = first.getDelay(NANOSECONDS); //檢查元素是否延遲到期
if (delay <= 0)
return q.poll(); //到期則彈出元素
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); //阻塞未到期的時(shí)間
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
SynchronousQueue
SynchronousQueue相比較之前的 4 個(gè)隊(duì)列就比較特殊了,它是一個(gè)沒(méi)有容量的隊(duì)列,也就是說(shuō)它內(nèi)部時(shí)不會(huì)對(duì)數(shù)據(jù)進(jìn)行存儲(chǔ),每進(jìn)行一次 put 之后必須要進(jìn)行一次 take,否則相同線程繼續(xù) put 會(huì)阻塞。
這種特性很適合做一些傳遞性的工作,一個(gè)線程生產(chǎn),一個(gè)線程消費(fèi)。
這里只對(duì)它的非公平實(shí)現(xiàn)下的 take 和 put 方法做下簡(jiǎn)單分析:
//非公平情況下調(diào)用內(nèi)部類TransferStack的transfer方法put
public void put(E e) throws InterruptedException {
if (e == null) thrownew NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
thrownew InterruptedException();
}
}
//非公平情況下調(diào)用內(nèi)部類TransferStack的transfer方法take
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
thrownew InterruptedException();
}
//具體的put以及take方法,只有E的區(qū)別,通過(guò)E來(lái)區(qū)別REQUEST還是DATA模式
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
//棧無(wú)元素或者元素和插入的元素模式相匹配,也就是說(shuō)都是插入元素
if (h == null || h.mode == mode) {
//有時(shí)間限制并且超時(shí)
if (timed && nanos <= 0) {
if (h != null && h.isCancelled())
casHead(h, h.next); // 重新設(shè)置頭節(jié)點(diǎn)
else
returnnull;
}
//未超時(shí)cas操作嘗試設(shè)置頭節(jié)點(diǎn)
elseif (casHead(h, s = snode(s, e, h, mode))) {
//自旋一段時(shí)間后未消費(fèi)元素則掛起put線程
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
returnnull;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
//棧不為空并且和頭節(jié)點(diǎn)模式不匹配,存在元素則消費(fèi)元素并重新設(shè)置head節(jié)點(diǎn)
elseif (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
elseif (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
}
//節(jié)點(diǎn)正在匹配階段
else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
//先自旋后掛起的核心方法
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
finallong deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//計(jì)算自旋的次數(shù)
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
//匹配成功過(guò)返回節(jié)點(diǎn)
if (m != null)
return m;
//超時(shí)控制
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
//自旋檢查,是否進(jìn)行下一次自旋
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
elseif (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
elseif (!timed)
LockSupport.park(this); //在這里掛起線程
elseif (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
代碼非常復(fù)雜,這里說(shuō)下我所理解的核心邏輯。
代碼中可以看到put以及take方法都是通過(guò)調(diào)用transfer方法來(lái)實(shí)現(xiàn)的,然后通過(guò)參數(shù)mode來(lái)區(qū)別,在生產(chǎn)元素時(shí)如果是同一個(gè)線程多次put則會(huì)采取自旋的方式多次嘗試put元素,可能自旋過(guò)程中元素會(huì)被消費(fèi),這樣可以及時(shí)put,降低線程掛起的性能損耗,高吞吐量的核心也在這里。
消費(fèi)線程一樣,空棧時(shí)也會(huì)先自旋,自旋失敗然后通過(guò)線程的LockSupport.park方法掛起。
LinkedTransferQueue
LinkedTransferQueue 是一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞 TransferQueue 隊(duì)列。
LinkedTransferQueue采用一種預(yù)占模式。意思就是消費(fèi)者線程取元素時(shí),如果隊(duì)列不為空,則直接取走數(shù)據(jù),若隊(duì)列為空,那就生成一個(gè)節(jié)點(diǎn)(節(jié)點(diǎn)元素為 null)入隊(duì),然后消費(fèi)者線程被等待在這個(gè)節(jié)點(diǎn)上,后面生產(chǎn)者線程入隊(duì)時(shí)發(fā)現(xiàn)有一個(gè)元素為 null 的節(jié)點(diǎn),生產(chǎn)者線程就不入隊(duì)了,直接就將元素填充到該節(jié)點(diǎn),并喚醒該節(jié)點(diǎn)等待的線程,被喚醒的消費(fèi)者線程取走元素,從調(diào)用的方法返回。我們稱這種節(jié)點(diǎn)操作為“匹配”方式。
隊(duì)列實(shí)現(xiàn)了 TransferQueue 接口重寫了 tryTransfer 和transfer方法,這組方法和SynchronousQueue` 公平模式的隊(duì)列類似,具有匹配的功能.。
LinkedBlockingDeque
LinkedBlockingDeque 是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
所謂雙向隊(duì)列指的你可以從隊(duì)列的兩端插入和移出元素。雙端隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)。
相比其他的阻塞隊(duì)列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結(jié)尾的方法,表示插入,獲?。╬eek)或移除雙端隊(duì)列的第一個(gè)元素。
以 Last 單詞結(jié)尾的方法,表示插入,獲取或移除雙端隊(duì)列的最后一個(gè)元素。
另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。
在初始化 LinkedBlockingDeque 時(shí)可以設(shè)置容量防止其過(guò)渡膨脹,默認(rèn)容量也是 Integer.MAX_VALUE。
另外雙向阻塞隊(duì)列可以運(yùn)用在“工作竊取”模式中。