偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

沒看過ArrayBlockingQueue源碼,就別說精通線程池

開發(fā) 前端
今天一起分析了ArrayBlockingQueue?隊(duì)列的源碼,可以看到ArrayBlockingQueue的源碼非常簡(jiǎn)單,沒有什么神秘復(fù)雜的東西,下篇文章再一起接著分析其他的阻塞隊(duì)列源碼。

引言

在日常開發(fā)中,我們好像很少用到BlockingQueue(阻塞隊(duì)列),BlockingQueue到底有什么作用?應(yīng)用場(chǎng)景是什么樣的?

如果使用過線程池或者閱讀過線程池源碼,就會(huì)知道線程池的核心功能都是基于BlockingQueue實(shí)現(xiàn)的。

大家用過消息隊(duì)列(MessageQueue),就知道消息隊(duì)列作用是解耦、異步、削峰。同樣BlockingQueue的作用也是這三種,區(qū)別是BlockingQueue只作用于本機(jī)器,而消息隊(duì)列相當(dāng)于分布式BlockingQueue。

BlockingQueue作為阻塞隊(duì)列,主要應(yīng)用于生產(chǎn)者-消費(fèi)者模式的場(chǎng)景,在并發(fā)多線程中尤其常用。

  1. 比如像線程池中的任務(wù)調(diào)度場(chǎng)景,提交任務(wù)和拉取并執(zhí)行任務(wù)。
  2. 生產(chǎn)者與消費(fèi)者解耦的場(chǎng)景,生產(chǎn)者把數(shù)據(jù)放到隊(duì)列中,消費(fèi)者從隊(duì)列中取數(shù)據(jù)進(jìn)行消費(fèi)。兩者進(jìn)行解耦,不用感知對(duì)方的存在。
  3. 應(yīng)對(duì)突發(fā)流量的場(chǎng)景,業(yè)務(wù)高峰期突然來了很多請(qǐng)求,可以放到隊(duì)列中緩存起來,消費(fèi)者以正常的頻率從隊(duì)列中拉取并消費(fèi)數(shù)據(jù),起到削峰的作用。

BlockingQueue是個(gè)接口,定義了幾組放數(shù)據(jù)和取數(shù)據(jù)的方法,來滿足不同的場(chǎng)景。

操作

拋出異常

返回特定值

阻塞

阻塞一段時(shí)間

放數(shù)據(jù)

add()

offer()

put()

offer(e, time, unit)

取數(shù)據(jù)(同時(shí)刪除數(shù)據(jù))

remove()

poll()

take()

poll(time, unit)

取數(shù)據(jù)(不刪除)

element()

peek()

不支持

不支持

BlockingQueue有5個(gè)常見的實(shí)現(xiàn)類,應(yīng)用場(chǎng)景不同。

  • ArrayBlockingQueue

基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列,創(chuàng)建隊(duì)列時(shí)需指定容量大小,是有界隊(duì)列。

  • LinkedBlockingQueue

基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列,默認(rèn)是無界隊(duì)列,創(chuàng)建可以指定容量大小

  • SynchronousQueue

一種沒有緩沖的阻塞隊(duì)列,生產(chǎn)出的數(shù)據(jù)需要立刻被消費(fèi)

  • PriorityBlockingQueue

實(shí)現(xiàn)了優(yōu)先級(jí)的阻塞隊(duì)列,基于數(shù)據(jù)顯示,是無界隊(duì)列

  • DelayQueue

實(shí)現(xiàn)了延遲功能的阻塞隊(duì)列,基于PriorityQueue實(shí)現(xiàn)的,是無界隊(duì)列

今天重點(diǎn)講一下ArrayBlockingQueue的底層實(shí)現(xiàn)原理,在接下來的文章中再講一下其他隊(duì)列實(shí)現(xiàn)。

ArrayBlockingQueue類結(jié)構(gòu)

先看一下ArrayBlockingQueue類里面有哪些屬性:

public class ArrayBlockingQueue<E>
        extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {


    /**
     * 用來存放數(shù)據(jù)的數(shù)組
     */
    final Object[] items;

    /**
     * 下次取數(shù)據(jù)的數(shù)組下標(biāo)位置
     */
    int takeIndex;

    /**
     * 下次放數(shù)據(jù)的數(shù)組下標(biāo)位置
     */
    int putIndex;

    /**
     * 元素個(gè)數(shù)
     */
    int count;

    /**
     * 獨(dú)占鎖,用來保證存取數(shù)據(jù)安全
     */
    final ReentrantLock lock;

    /**
     * 取數(shù)據(jù)的條件
     */
    private final Condition notEmpty;

    /**
     * 放數(shù)據(jù)的條件
     */
    private final Condition notFull;

}

可以看出ArrayBlockingQueue底層是基于數(shù)組實(shí)現(xiàn)的,使用對(duì)象數(shù)組items存儲(chǔ)元素。為了實(shí)現(xiàn)隊(duì)列特性(一端插入,另一端刪除),定義了兩個(gè)指針,takeIndex表示下次取數(shù)據(jù)的位置,putIndex表示下次放數(shù)據(jù)的位置。 另外ArrayBlockingQueue還使用ReentrantLock保證線程安全,并且定義了兩個(gè)條件,當(dāng)條件滿足的時(shí)候才允許放數(shù)據(jù)或者取數(shù)據(jù),下面會(huì)詳細(xì)講。

初始化

ArrayBlockingQueue常用的初始化方法有兩個(gè):

  1. 指定容量大小
  2. 指定容量大小和是否是公平鎖
/**
 * 指定容量大小的構(gòu)造方法
 */
BlockingQueue<Integer> blockingDeque1 = new ArrayBlockingQueue<>(1);
/**
 * 指定容量大小、公平鎖的構(gòu)造方法
 */
BlockingQueue<Integer> blockingDeque1 = new ArrayBlockingQueue<>(1, true);

再看一下對(duì)應(yīng)的源碼實(shí)現(xiàn):

/**
 * 指定容量大小的構(gòu)造方法(默認(rèn)是非公平鎖)
 */
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}


/**
 * 指定容量大小、公平鎖的構(gòu)造方法
 *
 * @param capacity 數(shù)組容量
 * @param fair     是否是公平鎖
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0) {
        throw new IllegalArgumentException();
    }
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

放數(shù)據(jù)源碼

放數(shù)據(jù)的方法有四個(gè):

操作

拋出異常

返回特定值

阻塞

阻塞一段時(shí)間

放數(shù)據(jù)

add()

offer()

put()

offer(e, time, unit)

offer方法源碼

先看一下offer()方法源碼,其他方法邏輯也是大同小異。 無論是放數(shù)據(jù)還是取數(shù)據(jù),都是從隊(duì)頭開始,向隊(duì)尾移動(dòng)。

圖片圖片

/**
 * offer方法入口
 *
 * @param e 元素
 * @return 是否插入成功
 */
public boolean offer(E e) {
    // 1. 判空,傳參不允許為null
    checkNotNull(e);
    // 2. 加鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 3. 判斷數(shù)組是否已滿,如果滿了就直接返回false結(jié)束
        if (count == items.length) {
            return false;
        } else {
            // 4. 否則就插入
            enqueue(e);
            return true;
        }
    } finally {
        // 5. 釋放鎖
        lock.unlock();
    }
}

/**
 * 入隊(duì)
 *
 * @param x 元素
 */
private void enqueue(E x) {
    // 1. 獲取數(shù)組
    final Object[] items = this.items;
    // 2. 直接放入數(shù)組
    items[putIndex] = x;
    // 3. 移動(dòng)putIndex位置,如果到達(dá)數(shù)組的末尾就從頭開始
    if (++putIndex == items.length) {
        putIndex = 0;
    }
    // 4. 計(jì)數(shù)
    count++;
    // 5. 喚醒因?yàn)殛?duì)列為空,等待取數(shù)據(jù)的線程
    notEmpty.signal();
}

offer()在數(shù)組滿的時(shí)候,會(huì)返回false,表示添加失敗。 為了循環(huán)利用數(shù)組,添加元素的時(shí)候如果已經(jīng)到了隊(duì)尾,就從隊(duì)頭重新開始,相當(dāng)于一個(gè)循環(huán)隊(duì)列,像下面這樣:

圖片圖片

add方法源碼

再看一下另外三個(gè)添加元素方法源碼: add()方法在數(shù)組滿的時(shí)候,會(huì)拋出異常,底層基于offer()實(shí)現(xiàn)。

/**
 * add方法入口
 *
 * @param e 元素
 * @return 是否添加成功
 */
public boolean add(E e) {
    if (offer(e)) {
        return true;
    } else {
        throw new IllegalStateException("Queue full");
    }
}

put方法源碼

put()方法在數(shù)組滿的時(shí)候,會(huì)一直阻塞,直到有其他線程取走數(shù)據(jù),空出位置,才能添加成功。

/**
 * put方法入口
 *
 * @param e 元素
 */
public void put(E e) throws InterruptedException {
    // 1. 判空,傳參不允許為null
    checkNotNull(e);
    // 2. 加可中斷的鎖,防止一直阻塞
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 3. 如果隊(duì)列已滿,就一直阻塞,直到被喚醒
        while (count == items.length) {
            notFull.await();
        }
        // 4. 如果隊(duì)列未滿,直接入隊(duì)
        enqueue(e);
    } finally {
        // 5. 釋放鎖
        lock.unlock();
    }
}

offer(e, time, unit)源碼

再看一下offer(e, time, unit)方法源碼,在數(shù)組滿的時(shí)候, offer(e, time, unit)方法會(huì)阻塞一段時(shí)間。

/**
 * offer方法入口
 *
 * @param e       元素
 * @param timeout 超時(shí)時(shí)間
 * @param unit    時(shí)間單位
 * @return 是否添加成功
 */
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    // 1. 判空,傳參不允許為null
    checkNotNull(e);
    // 2. 把超時(shí)時(shí)間轉(zhuǎn)換為納秒
    long nanos = unit.toNanos(timeout);
    // 3. 加可中斷的鎖,防止一直阻塞
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 4. 循環(huán)判斷隊(duì)列是否已滿
        while (count == items.length) {
            if (nanos <= 0) {
                // 6. 如果隊(duì)列已滿,且超時(shí)時(shí)間已過,則返回false
                return false;
            }
            // 5. 如果隊(duì)列已滿,則等待指定時(shí)間
            nanos = notFull.awaitNanos(nanos);
        }
        // 7. 如果隊(duì)列未滿,則入隊(duì)
        enqueue(e);
        return true;
    } finally {
        // 8. 釋放鎖
        lock.unlock();
    }
}

彈出數(shù)據(jù)源碼

彈出數(shù)據(jù)(取出數(shù)據(jù)并刪除)的方法有四個(gè):

操作

拋出異常

返回特定值

阻塞

阻塞一段時(shí)間

取數(shù)據(jù)(同時(shí)刪除數(shù)據(jù))

remove()

poll()

take()

poll(time, unit)

poll方法源碼

看一下poll()方法源碼,其他方法邏輯大同小異。 poll()方法在彈出元素的時(shí)候,如果數(shù)組為空,則返回null,表示彈出失敗。

/**
 * poll方法入口
 */
public E poll() {
    // 1. 加鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 2. 如果數(shù)組為空,則返回null,否則返回隊(duì)列頭部元素
        return (count == 0) ? null : dequeue();
    } finally {
        // 3. 釋放鎖
        lock.unlock();
    }
}

/**
 * 出列
 */
private E dequeue() {
    // 1. 取出隊(duì)列頭部元素
    final Object[] items = this.items;
    E x = (E) items[takeIndex];
    // 2. 取出元素后,把該位置置空
    items[takeIndex] = null;
    // 3. 移動(dòng)takeIndex位置,如果到達(dá)數(shù)組的末尾就從頭開始
    if (++takeIndex == items.length) {
        takeIndex = 0;
    }
    // 4. 元素個(gè)數(shù)減一
    count--;
    if (itrs != null) {
        itrs.elementDequeued();
    }
    // 5. 喚醒因?yàn)殛?duì)列已滿,等待放數(shù)據(jù)的線程
    notFull.signal();
    return x;
}

可見取數(shù)據(jù)跟放數(shù)據(jù)一樣,都是循環(huán)遍歷數(shù)組。

remove方法源碼

再看一下remove()方法源碼,如果數(shù)組為空,remove()會(huì)拋出異常。

/**
 * remove方法入口
 */
public E remove() {
    // 1. 直接調(diào)用poll方法
    E x = poll();
    // 2. 如果取到數(shù)據(jù),直接返回,否則拋出異常
    if (x != null) {
        return x;
    } else {
        throw new NoSuchElementException();
    }
}

take方法源碼

再看一下take()方法源碼,如果數(shù)組為空,take()方法就一直阻塞,直到被喚醒。

/**
 * take方法入口
 */
public E take() throws InterruptedException {
    // 1. 加可中斷的鎖,防止一直阻塞
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 2. 如果數(shù)組為空,就一直阻塞,直到被喚醒
        while (count == 0) {
            notEmpty.await();
        }
        // 3. 如果數(shù)組不為空,就從數(shù)組中取數(shù)據(jù)
        return dequeue();
    } finally {
        // 4. 釋放鎖
        lock.unlock();
    }
}

poll(time, unit)源碼

再看一下poll(time, unit)方法源碼,在數(shù)組滿的時(shí)候, poll(time, unit)方法會(huì)阻塞一段時(shí)間。

/**
 * poll方法入口
 *
 * @param timeout 超時(shí)時(shí)間
 * @param unit    時(shí)間單位
 * @return 元素
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 1. 把超時(shí)時(shí)間轉(zhuǎn)換成納秒
    long nanos = unit.toNanos(timeout);
    // 2. 加可中斷的鎖,防止一直阻塞
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 3. 如果數(shù)組為空,就開始阻塞
        while (count == 0) {
            if (nanos <= 0) {
                // 5. 如果數(shù)組為空,且超時(shí)時(shí)間已過,則返回null
                return null;
            }
            // 4. 阻塞到到指定時(shí)間
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 6. 如果數(shù)組不為空,則出列
        return dequeue();
    } finally {
        // 7. 釋放鎖
        lock.unlock();
    }
}

查看數(shù)據(jù)源碼

再看一下查看數(shù)據(jù)源碼,查看數(shù)據(jù),并不刪除數(shù)據(jù)。

操作

拋出異常

返回特定值

阻塞

阻塞一段時(shí)間

取數(shù)據(jù)(不刪除)

element()

peek()

不支持

不支持

peek方法源碼

先看一下peek()方法源碼,如果數(shù)組為空,就返回null。

/**
 * peek方法入口
 */
public E peek() {
    // 1. 加鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 2. 返回?cái)?shù)組頭部元素,如果數(shù)組為空,則返回null
        return itemAt(takeIndex);
    } finally {
        // 3. 釋放鎖
        lock.unlock();
    }
}

/**
 * 返回當(dāng)前位置元素
 */
final E itemAt(int i) {
    return (E) items[i];
}

element方法源碼

再看一下element()方法源碼,如果數(shù)組為空,則拋出異常。

/**
 * element方法入口
 */
public E element() {
    // 1. 調(diào)用peek方法查詢數(shù)據(jù)
    E x = peek();
    // 2. 如果查到數(shù)據(jù),直接返回
    if (x != null) {
        return x;
    } else {
        // 3. 如果沒找到,則拋出異常
        throw new NoSuchElementException();
    }
}

總結(jié)

這篇文章講解了ArrayBlockingQueue隊(duì)列的核心源碼,了解到ArrayBlockingQueue隊(duì)列具有以下特點(diǎn):

  1. ArrayBlockingQueue實(shí)現(xiàn)了BlockingQueue接口,提供了四組放數(shù)據(jù)和讀數(shù)據(jù)的方法,來滿足不同的場(chǎng)景。
  2. ArrayBlockingQueue底層基于數(shù)組實(shí)現(xiàn),采用循環(huán)數(shù)組,提升了數(shù)組的空間利用率。
  3. ArrayBlockingQueue初始化的時(shí)候,必須指定隊(duì)列長(zhǎng)度,是有界的阻塞隊(duì)列,所以要預(yù)估好隊(duì)列長(zhǎng)度,保證生產(chǎn)者和消費(fèi)者速率相匹配。
  4. ArrayBlockingQueue的方法是線程安全的,使用ReentrantLock在操作前后加鎖來保證線程安全。

今天一起分析了ArrayBlockingQueue隊(duì)列的源碼,可以看到ArrayBlockingQueue的源碼非常簡(jiǎn)單,沒有什么神秘復(fù)雜的東西,下篇文章再一起接著分析其他的阻塞隊(duì)列源碼。

責(zé)任編輯:武曉燕 來源: 一燈架構(gòu)
相關(guān)推薦

2024-03-18 08:15:48

Java并發(fā)編程

2024-02-29 09:37:25

Java并發(fā)編程

2024-02-04 08:43:20

源碼線程池緩沖

2019-09-02 08:08:30

緩存HTTP數(shù)據(jù)庫

2013-05-28 13:57:12

MariaDB

2020-11-19 07:41:51

ArrayBlocki

2015-09-22 13:08:42

戴爾云計(jì)算

2022-12-16 08:31:37

調(diào)度線程池源碼

2025-01-03 08:40:53

Java并發(fā)編程Guava庫

2020-09-21 08:33:12

線程池調(diào)度Thread Pool

2023-05-23 08:54:43

SRESLO運(yùn)營(yíng)

2021-09-01 17:51:53

技術(shù)LinkedList 源碼

2018-10-31 15:54:47

Java線程池源碼

2013-06-08 10:11:31

Java線程池架構(gòu)

2015-10-10 09:39:42

Java線程池源碼解析

2024-01-29 15:54:41

Java線程池公平鎖

2021-05-26 11:30:24

Java線程池代碼

2021-09-02 09:53:42

開發(fā)Redis配置

2023-05-19 08:01:24

Key消費(fèi)場(chǎng)景

2020-01-22 16:29:52

機(jī)器學(xué)習(xí)人工智能計(jì)算機(jī)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)