深入理解Java線程池,剖析LinkedBlockingQueue源碼實(shí)現(xiàn)
引言
上篇文章我們講解了ArrayBlockingQueue源碼,這篇文章開(kāi)始講解LinkedBlockingQueue源碼。從名字上就能看到ArrayBlockingQueue是基于數(shù)組實(shí)現(xiàn)的,而LinkedBlockingQueue是基于鏈表實(shí)現(xiàn)。
那么,LinkedBlockingQueue底層源碼實(shí)現(xiàn)是什么樣的?跟ArrayBlockingQueue有何不同?
LinkedBlockingQueue的應(yīng)用場(chǎng)景跟ArrayBlockingQueue有什么不一樣?
看完這篇文章,可以輕松解答這些問(wèn)題。
由于LinkedBlockingQueue實(shí)現(xiàn)了BlockingQueue接口,而B(niǎo)lockingQueue接口中定義了幾組放數(shù)據(jù)和取數(shù)據(jù)的方法,來(lái)滿足不同的場(chǎng)景。
操作 | 拋出異常 | 返回特定值 | 一直阻塞 | 阻塞指定時(shí)間 |
放數(shù)據(jù) | add() | offer() | put() | offer(e, time, unit) |
取數(shù)據(jù)(同時(shí)刪除數(shù)據(jù)) | remove() | poll() | take() | poll(time, unit) |
取數(shù)據(jù)(不刪除) | element() | peek() | 不支持 | 不支持 |
這四組方法的區(qū)別是:
- 當(dāng)隊(duì)列滿的時(shí)候,再次添加數(shù)據(jù),add()會(huì)拋出異常,offer()會(huì)返回false,put()會(huì)一直阻塞,offer(e, time, unit)會(huì)阻塞指定時(shí)間,然后返回false。
- 當(dāng)隊(duì)列為空的時(shí)候,再次取數(shù)據(jù),remove()會(huì)拋出異常,poll()會(huì)返回null,take()會(huì)一直阻塞,poll(time, unit)會(huì)阻塞指定時(shí)間,然后返回null。
LinkedBlockingQueue也會(huì)有針對(duì)這幾組放數(shù)據(jù)和取數(shù)據(jù)方法的具體實(shí)現(xiàn)。 Java線程池中的固定大小線程池就是基于LinkedBlockingQueue實(shí)現(xiàn)的:
# 創(chuàng)建固定大小的線程池
ExecutorService executorService = Executors.newFixedThreadPool(10);對(duì)應(yīng)的源碼實(shí)現(xiàn):
# 底層使用LinkedBlockingQueue隊(duì)列存儲(chǔ)任務(wù)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}類結(jié)構(gòu)
先看一下LinkedBlockingQueue類里面有哪些屬性:
public class LinkedBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 容量大小
*/
private final int capacity;
/**
* 元素個(gè)數(shù)
*/
private final AtomicInteger count = new AtomicInteger();
/**
* 頭節(jié)點(diǎn)
*/
transient Node<E> head;
/**
* 尾節(jié)點(diǎn)
*/
private transient Node<E> last;
/**
* 取數(shù)據(jù)的鎖
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
* 取數(shù)據(jù)的條件(隊(duì)列非空)
*/
private final Condition notEmpty = takeLock.newCondition();
/**
* 放數(shù)據(jù)的鎖
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
* 放數(shù)據(jù)的條件(隊(duì)列非滿)
*/
private final Condition notFull = putLock.newCondition();
/**
* 鏈表節(jié)點(diǎn)類
*/
static class Node<E> {
/**
* 節(jié)點(diǎn)元素
*/
E item;
/**
* 后繼節(jié)點(diǎn)
*/
Node<E> next;
Node(E x) {
item = x;
}
}
}
圖片
可以看出LinkedBlockingQueue底層是基于鏈表實(shí)現(xiàn)的,定義了頭節(jié)點(diǎn)head和尾節(jié)點(diǎn)last,由鏈表節(jié)點(diǎn)類Node可以看出是個(gè)單鏈表。 發(fā)現(xiàn)個(gè)問(wèn)題,ArrayBlockingQueue中只使用了一把鎖,入隊(duì)出隊(duì)操作共用這把鎖。而LinkedBlockingQueue則使用了兩把鎖,分別是出隊(duì)鎖takeLock和入隊(duì)鎖putLock,為什么要這么設(shè)計(jì)呢?
LinkedBlockingQueue把兩把鎖分開(kāi),性能更好,為什么ArrayBlockingQueue不這樣設(shè)計(jì)呢?
原因是ArrayBlockingQueue是基于數(shù)組實(shí)現(xiàn)的,所有數(shù)據(jù)都存儲(chǔ)在同一個(gè)數(shù)組對(duì)象里面,對(duì)同一個(gè)對(duì)象沒(méi)辦法使用兩把鎖,會(huì)有數(shù)據(jù)可見(jiàn)性的問(wèn)題。而LinkedBlockingQueue底層是基于鏈表實(shí)現(xiàn)的,從頭節(jié)點(diǎn)刪除,尾節(jié)點(diǎn)插入,頭尾節(jié)點(diǎn)分別是兩個(gè)對(duì)象,可以分別使用兩把鎖,提升操作性能。
另外也定義了兩個(gè)條件notEmpty和notFull,當(dāng)條件滿足的時(shí)候才允許放數(shù)據(jù)或者取數(shù)據(jù),下面會(huì)詳細(xì)講。
初始化
LinkedBlockingQueue常用的初始化方法有兩個(gè):
- 無(wú)參構(gòu)造方法
- 指定容量大小的有參構(gòu)造方法
/**
* 無(wú)參構(gòu)造方法
*/
BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingQueue<>();
/**
* 指定容量大小的構(gòu)造方法
*/
BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingQueue<>(10);再看一下對(duì)應(yīng)的源碼實(shí)現(xiàn):
/**
* 無(wú)參構(gòu)造方法
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 指定容量大小的構(gòu)造方法
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
// 設(shè)置容量大小,初始化頭尾結(jié)點(diǎn)
this.capacity = capacity;
last = head = new Node<E>(null);
}可以看出LinkedBlockingQueue的無(wú)參構(gòu)造方法使用的鏈表容量是Integer的最大值,存儲(chǔ)大量數(shù)據(jù)的時(shí)候,會(huì)有內(nèi)存溢出的風(fēng)險(xiǎn),建議使用有參構(gòu)造方法,指定容量大小。
有參構(gòu)造方法還會(huì)初始化頭尾節(jié)點(diǎn),節(jié)點(diǎn)值為null。
LinkedBlockingQueue初始化的時(shí)候,不支持指定是否使用公平鎖,只能使用非公平鎖,而ArrayBlockingQueue是支持指定的。
放數(shù)據(jù)源碼
放數(shù)據(jù)的方法有四個(gè):
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時(shí)間 |
放數(shù)據(jù) | add() | offer() | put() | offer(e, time, unit) |
offer方法源碼
先看一下offer()方法源碼,其他放數(shù)據(jù)方法邏輯也是大同小異,都是在鏈表尾部插入。 offer()方法在隊(duì)列滿的時(shí)候,會(huì)直接返回false,表示插入失敗。
/**
* offer方法入口
*
* @param e 元素
* @return 是否插入成功
*/
public boolean offer(E e) {
// 1. 判空,傳參不允許為null
if (e == null) {
throw new NullPointerException();
}
// 2. 如果隊(duì)列已滿,則直接返回false,表示插入失敗
final AtomicInteger count = this.count;
if (count.get() == capacity) {
return false;
}
int c = -1;
Node<E> node = new Node<E>(e);
// 3. 獲取put鎖,并加鎖
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 4. 加鎖后,再次判斷隊(duì)列是否已滿,如果未滿,則入隊(duì)
if (count.get() < capacity) {
enqueue(node);
// 5. 隊(duì)列個(gè)數(shù)加一
c = count.getAndIncrement();
// 6. 如果隊(duì)列未滿,則喚醒因?yàn)殛?duì)列已滿而等待放數(shù)據(jù)的線程(用來(lái)補(bǔ)償,不加也行)
if (c + 1 < capacity) {
notFull.signal();
}
}
} finally {
// 7. 釋放鎖
putLock.unlock();
}
// 8. c等于0,表示插入前,隊(duì)列為空,是第一次插入,需要喚醒因?yàn)殛?duì)列為空而等待取數(shù)據(jù)的線程
if (c == 0) {
signalNotEmpty();
}
// 9. 返回是否插入成功
return c >= 0;
}
/**
* 入隊(duì)
*
* @param node 節(jié)點(diǎn)
*/
private void enqueue(LinkedBlockingQueue.Node<E> node) {
// 直接追加到鏈表末尾
last = last.next = node;
}
/**
* 喚醒因?yàn)殛?duì)列為空而等待取數(shù)據(jù)的線程
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}offer()方法邏輯也很簡(jiǎn)單,追加元素到鏈表末尾,如果是第一次添加元素,就喚醒因?yàn)殛?duì)列為空而等待取數(shù)據(jù)的線程。
再看一下另外三個(gè)添加元素方法源碼:
add方法源碼
add()方法在數(shù)組滿的時(shí)候,會(huì)拋出異常,底層基于offer()實(shí)現(xiàn)。
/**
* add方法入口
*
* @param e 元素
* @return 是否添加成功
*/
public boolean add(E e) {
if (offer(e)) {
return true;
} else {
throw new IllegalStateException("Queue full");
}
}put方法源碼
put()方法在數(shù)組滿的時(shí)候,會(huì)一直阻塞,直到有其他線程取走數(shù)據(jù),空出位置,才能添加成功。
/**
* put方法入口
*
* @param e 元素
*/
public void put(E e) throws InterruptedException {
// 1. 判空,傳參不允許為null
if (e == null) {
throw new NullPointerException();
}
int c = -1;
Node<E> node = new Node<E>(e);
// 2. 加可中斷的鎖,防止一直阻塞
final ReentrantLock putLock = this.putLock;
putLock.lockInterruptibly();
final AtomicInteger count = this.count;
try {
// 3. 如果隊(duì)列已滿,就一直阻塞,直到被喚醒
while (count.get() == capacity) {
notFull.await();
}
// 4. 如果隊(duì)列未滿,則直接入隊(duì)
enqueue(node);
c = count.getAndIncrement();
// 5. 如果隊(duì)列未滿,則喚醒因?yàn)殛?duì)列已滿而等待放數(shù)據(jù)的線程(用來(lái)補(bǔ)償,不加也行)
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
// 6. 釋放鎖
putLock.unlock();
}
// 7. c等于0,表示插入前,隊(duì)列為空,是第一次插入,需要喚醒因?yàn)殛?duì)列為空而等待取數(shù)據(jù)的線程
if (c == 0) {
signalNotEmpty();
}
}offer(e, time, unit)源碼
再看一下offer(e, time, unit)方法源碼,在數(shù)組滿的時(shí)候, offer(e, time, unit)方法會(huì)阻塞一段時(shí)間。
/**
* offer方法入口
*
* @param e 元素
* @param timeout 超時(shí)時(shí)間
* @param unit 時(shí)間單位
* @return 是否添加成功
*/
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 1. 判空,傳參不允許為null
if (e == null) {
throw new NullPointerException();
}
// 2. 把超時(shí)時(shí)間轉(zhuǎn)換為納秒
long nanos = unit.toNanos(timeout);
int c = -1;
final AtomicInteger count = this.count;
// 2. 加可中斷的鎖,防止一直阻塞
final ReentrantLock putLock = this.putLock;
putLock.lockInterruptibly();
try {
// 4. 循環(huán)判斷隊(duì)列是否已滿
while (count.get() == capacity) {
if (nanos <= 0) {
// 6. 如果隊(duì)列已滿,且超時(shí)時(shí)間已過(guò),則返回false
return false;
}
// 5. 如果隊(duì)列已滿,則等待指定時(shí)間
nanos = notFull.awaitNanos(nanos);
}
// 7. 如果隊(duì)列未滿,則入隊(duì)
enqueue(new Node<E>(e));
// 8. 如果隊(duì)列未滿,則喚醒因?yàn)殛?duì)列已滿而等待放數(shù)據(jù)的線程(用來(lái)補(bǔ)償,不加也行)
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
// 9. 釋放鎖
putLock.unlock();
}
// 10. c等于0,表示插入前,隊(duì)列為空,是第一次插入,需要喚醒因?yàn)殛?duì)列為空而等待取數(shù)據(jù)的線程
if (c == 0) {
signalNotEmpty();
}
return true;
}彈出數(shù)據(jù)源碼
彈出數(shù)據(jù)(取出數(shù)據(jù)并刪除)的方法有四個(gè):
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時(shí)間 |
取數(shù)據(jù)(同時(shí)刪除數(shù)據(jù)) | remove() | poll() | take() | poll(time, unit) |
poll方法源碼
看一下poll()方法源碼,其他方取數(shù)據(jù)法邏輯大同小異,都是從鏈表頭部彈出元素。 poll()方法在彈出元素的時(shí)候,如果隊(duì)列為空,直接返回null,表示彈出失敗。
/**
* poll方法入口
*/
public E poll() {
// 如果隊(duì)列為空,則返回null
final AtomicInteger count = this.count;
if (count.get() == 0) {
return null;
}
E x = null;
int c = -1;
// 2. 加鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 3. 如果隊(duì)列不為空,則取出隊(duì)頭元素
if (count.get() > 0) {
x = dequeue();
// 4. 元素個(gè)數(shù)減一
c = count.getAndDecrement();
// 5. 如果隊(duì)列不為空,則喚醒因?yàn)殛?duì)列為空而等待取數(shù)據(jù)的線程
if (c > 1) {
notEmpty.signal();
}
}
} finally {
// 6. 釋放鎖
takeLock.unlock();
}
// 7. 如果取數(shù)據(jù)之前,隊(duì)列已滿,取數(shù)據(jù)之后隊(duì)列肯定不滿了,則喚醒因?yàn)殛?duì)列已滿而等待放數(shù)據(jù)的線程
if (c == capacity) {
signalNotFull();
}
return x;
}
/**
* 取出隊(duì)頭元素
*/
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* 喚醒因?yàn)殛?duì)列已滿而等待放數(shù)據(jù)的線程
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}remove方法源碼
再看一下remove()方法源碼,如果隊(duì)列為空,remove()會(huì)拋出異常。
/**
* remove方法入口
*/
public E remove() {
// 1. 直接調(diào)用poll方法
E x = poll();
// 2. 如果取到數(shù)據(jù),直接返回,否則拋出異常
if (x != null) {
return x;
} else {
throw new NoSuchElementException();
}
}take方法源碼
再看一下take()方法源碼,如果隊(duì)列為空,take()方法就一直阻塞,直到被喚醒。
/**
* take方法入口
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// 1. 加可中斷的鎖,防止一直阻塞
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 2. 如果隊(duì)列為空,就一直阻塞,直到被喚醒
while (count.get() == 0) {
notEmpty.await();
}
// 3. 如果隊(duì)列不為空,則取出隊(duì)頭元素
x = dequeue();
// 4. 隊(duì)列元素個(gè)數(shù)減一
c = count.getAndDecrement();
// 5. 如果隊(duì)列不為空,則喚醒因?yàn)殛?duì)列為空而等待取數(shù)據(jù)的線程
if (c > 1) {
notEmpty.signal();
}
} finally {
// 6. 釋放鎖
takeLock.unlock();
}
// 7. 如果取數(shù)據(jù)之前,隊(duì)列已滿,取數(shù)據(jù)之后隊(duì)列肯定不滿了,則喚醒因?yàn)殛?duì)列已滿而等待放數(shù)據(jù)的線程
if (c == capacity) {
signalNotFull();
}
return x;
}poll(time, unit)源碼
再看一下poll(time, unit)方法源碼,在隊(duì)列滿的時(shí)候, poll(time, unit)方法會(huì)阻塞指定時(shí)間,然后然后null。
/**
* poll方法入口
*
* @param timeout 超時(shí)時(shí)間
* @param unit 時(shí)間單位
* @return 元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
// 1. 把超時(shí)時(shí)間轉(zhuǎn)換成納秒
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
// 2. 加可中斷的鎖,防止一直阻塞
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 3. 循環(huán)判斷隊(duì)列是否為空
while (count.get() == 0) {
if (nanos <= 0) {
// 5. 如果隊(duì)列為空,且超時(shí)時(shí)間已過(guò),則返回null
return null;
}
// 4. 阻塞到到指定時(shí)間
nanos = notEmpty.awaitNanos(nanos);
}
// 6. 如果隊(duì)列不為空,則取出隊(duì)頭元素
x = dequeue();
// 7. 隊(duì)列元素個(gè)數(shù)減一
c = count.getAndDecrement();
// 8. 如果隊(duì)列不為空,則喚醒因?yàn)殛?duì)列為空而等待取數(shù)據(jù)的線程
if (c > 1) {
notEmpty.signal();
}
} finally {
// 9. 釋放鎖
takeLock.unlock();
}
// 7. 如果取數(shù)據(jù)之前,隊(duì)列已滿,取數(shù)據(jù)之后隊(duì)列肯定不滿了,則喚醒因?yàn)殛?duì)列已滿而等待放數(shù)據(jù)的線程
if (c == capacity) {
signalNotFull();
}
return x;
}查看數(shù)據(jù)源碼
再看一下查看數(shù)據(jù)源碼,查看數(shù)據(jù),并不刪除數(shù)據(jù)。
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時(shí)間 |
取數(shù)據(jù)(不刪除) | element() | peek() | 不支持 | 不支持 |
peek方法源碼
先看一下peek()方法源碼,如果數(shù)組為空,直接返回null。
/**
* peek方法入口
*/
public E peek() {
// 1. 如果隊(duì)列為空,則返回null
if (count.get() == 0) {
return null;
}
// 2. 加鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 3. 取出隊(duì)頭元素
Node<E> first = head.next;
if (first == null) {
return null;
} else {
return first.item;
}
} finally {
// 4. 釋放鎖
takeLock.unlock();
}
}element方法源碼
再看一下element()方法源碼,如果隊(duì)列為空,則拋出異常。
/**
* element方法入口
*/
public E element() {
// 1. 調(diào)用peek方法查詢數(shù)據(jù)
E x = peek();
// 2. 如果查到數(shù)據(jù),直接返回
if (x != null) {
return x;
} else {
// 3. 如果沒(méi)找到,則拋出異常
throw new NoSuchElementException();
}
}總結(jié)
這篇文章講解了LinkedBlockingQueue阻塞隊(duì)列的核心源碼,了解到LinkedBlockingQueue隊(duì)列具有以下特點(diǎn):
- LinkedBlockingQueue實(shí)現(xiàn)了BlockingQueue接口,提供了四組放數(shù)據(jù)和讀數(shù)據(jù)的方法,來(lái)滿足不同的場(chǎng)景。
- LinkedBlockingQueue底層基于鏈表實(shí)現(xiàn),支持從頭部彈出數(shù)據(jù),從尾部添加數(shù)據(jù)。
- LinkedBlockingQueue初始化的時(shí)候,如果不指定隊(duì)列長(zhǎng)度,默認(rèn)長(zhǎng)度是Integer最大值,有內(nèi)存溢出風(fēng)險(xiǎn),建議初始化的時(shí)候指定隊(duì)列長(zhǎng)度。
- LinkedBlockingQueue的方法是線程安全的,分別使用了讀寫(xiě)兩把鎖,比ArrayBlockingQueue性能更好。
那么ArrayBlockingQueue與LinkedBlockingQueue區(qū)別是什么?相同點(diǎn):
- 都是繼承自AbstractQueue抽象類,并實(shí)現(xiàn)了BlockingQueue接口,所以兩者擁有相同的讀寫(xiě)方法,出現(xiàn)的地方可以相互替換。
不同點(diǎn):
- 底層結(jié)構(gòu)不同,ArrayBlockingQueue底層基于數(shù)組實(shí)現(xiàn),初始化的時(shí)候必須指定數(shù)組長(zhǎng)度,無(wú)法擴(kuò)容。LinkedBlockingQueue底層基于鏈表實(shí)現(xiàn),鏈表最大長(zhǎng)度是Integer最大值。
- 占用內(nèi)存大小不同,ArrayBlockingQueue一旦初始化,數(shù)組長(zhǎng)度就確定了,不會(huì)隨著元素增加而改變。LinkedBlockingQueue會(huì)隨著元素越多,鏈表越長(zhǎng),占用內(nèi)存越大。
- 性能不同,ArrayBlockingQueue的入隊(duì)和出隊(duì)共用一把鎖,并發(fā)較低。LinkedBlockingQueue入隊(duì)和出隊(duì)使用兩把獨(dú)立的鎖,并發(fā)情況下性能更高。
- 公平鎖選項(xiàng),ArrayBlockingQueue初始化的時(shí)候,可以指定使用公平鎖或者非公平鎖,公平鎖模式下,可以按照線程等待的順序來(lái)操作隊(duì)列。LinkedBlockingQueue只支持非公平鎖。
- 適用場(chǎng)景不同,ArrayBlockingQueue適用于明確限制隊(duì)列大小的場(chǎng)景,防止生產(chǎn)速度大于消費(fèi)速度的時(shí)候,造成內(nèi)存溢出、資源耗盡。LinkedBlockingQueue適用于業(yè)務(wù)高峰期可以自動(dòng)擴(kuò)展消費(fèi)速度的場(chǎng)景。
今天一起分析了LinkedBlockingQueue隊(duì)列的源碼,可以看到LinkedBlockingQueue的源碼非常簡(jiǎn)單,沒(méi)有什么神秘復(fù)雜的東西,下篇文章再一起接著分析其他的阻塞隊(duì)列源碼。




























