偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

詳解 Java 并發(fā)流程控制工具

開發(fā)
本文將針對JUC包下幾個(gè)常見的工具類進(jìn)行深入剖析和演示,通過針對本文的閱讀,讀者將會對JUC包下的工具有一個(gè)全面的了解和運(yùn)用。

本文將針對JUC包下幾個(gè)常見的工具類進(jìn)行深入剖析和演示,通過針對本文的閱讀,讀者將會對JUC包下的工具有一個(gè)全面的了解和運(yùn)用。

一、CountDownLatch(倒計(jì)時(shí)門閂)

1. CountDownLatch簡介

在并發(fā)編程的禪意中,CountDownLatch本質(zhì)上就是一種閉鎖,而閉鎖的語義則是等待所有其他活動都完成了,才會繼續(xù)執(zhí)行后續(xù)的操作。

筆者一般稱CountDownLatch為倒計(jì)時(shí)門閂,它主要用于需要某些條件下才能喚醒的需求場景,例如我們線程1必須等到線程2做完某些事,那么就可以設(shè)置一個(gè)CountDownLatch并將數(shù)值設(shè)置為1,一旦線程2完成業(yè)務(wù)邏輯后,將數(shù)值修改為0,此時(shí)線程1就會被喚醒:

2. 基于CountDownLatch實(shí)現(xiàn)等待多線程就緒

通過上述的描述可能有點(diǎn)抽象,我們直接通過幾個(gè)例子演示一下,我們現(xiàn)在有這樣一個(gè)需求,希望等待5個(gè)線程完成之后,打印輸出一句工作完成:

對應(yīng)的代碼示例如下,可以看到我們創(chuàng)建了數(shù)值為5的CountDownLatch ,一旦線程池里的線程完成工作后就調(diào)用countDown進(jìn)行扣減,一旦數(shù)值變?yōu)?,主線程await就會放行,執(zhí)行后續(xù)輸出:

int workerSize = 5;
        CountDownLatch workCount = new CountDownLatch(workerSize);
        ExecutorService threadPool = Executors.newFixedThreadPool(workerSize);

        for (int i = 0; i < workerSize; i++) {
            final int workerNum = i;
            //5個(gè)工人輸出完成工作后,扣減倒計(jì)時(shí)門閂數(shù)
            threadPool.submit(() -> {
                log.info("worker[{}]完成手頭的工作", workerNum);
                workCount.countDown();
            });
        }

        try {
            //阻塞當(dāng)前線程(主線程)往后走,只有倒計(jì)時(shí)門閂變?yōu)?之后才能繼續(xù)后續(xù)邏輯
            log.info("等待worker工作完成");
            workCount.await();
        } catch (InterruptedException e) {
            log.info("倒計(jì)時(shí)門閂阻塞失敗,失敗原因[{}]", e.getMessage(), e);
        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        log.info("所有工人都完成手頭的工作了");

對應(yīng)的我們也給出輸出結(jié)果,可以看到主線程在線程池線程完成后才輸出:

3. 基于CountDownLatch實(shí)現(xiàn)運(yùn)動員賽跑

實(shí)際上CountDownLatch可以讓多個(gè)線程進(jìn)行等待,我們不妨用線程模擬一下所有運(yùn)動員就緒后,等待槍響后起跑的場景:

代碼如下,每當(dāng)運(yùn)動員即線程池的線程準(zhǔn)備就緒,則調(diào)用await等待槍響,一旦所有運(yùn)動員就緒之后,主線程調(diào)用countDown模擬槍響,然后運(yùn)動員起跑:

public static void main(String[] args) {
        log.info("百米跑比賽開始");

        int playerNum = 3;
        CountDownLatch gun = new CountDownLatch(1);
        ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
        
        for (int i = 0; i < playerNum; i++) {
            final int playNo = i;
            
            threadPool.submit(() -> {
                log.info("[{}]號運(yùn)動員已就緒", playNo);
                try {
                    gun.await();
                } catch (InterruptedException e) {
                    log.info("[{}]號運(yùn)動員線程阻塞失敗,失敗原因[{}]", playNo, e.getMessage(), e);
                }
                log.info("[{}]號運(yùn)動員已經(jīng)到達(dá)重點(diǎn)", playNo);
            });
        }

        //按下槍 所有運(yùn)動員起跑
        gun.countDown();

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        log.info("百米賽跑已結(jié)束");
    }

對應(yīng)的我們也給出相應(yīng)的輸出結(jié)果:

4. 從源碼角度分析CountDownLatch工作流程

我們以等待所有工人完成工作的例子進(jìn)行解析,實(shí)際上在CountDownLatch是通過state和一個(gè)抽象隊(duì)列即aqs完成多線程之間的流程調(diào)度,主線程調(diào)用await方法等待其他worker線程,如果其它worker線程沒有完成工作,那么CountDownLatch就會將其存入抽象隊(duì)列中。

一旦其他線程將state設(shè)置為0時(shí),await對應(yīng)的線程就會從抽象隊(duì)列中釋放并喚醒:

對應(yīng)我們給出countDown的實(shí)現(xiàn),可以看到該方法底層就是將aqs隊(duì)列中的state進(jìn)行扣減:

public void countDown() {
        sync.releaseShared(1);
    }

//releaseShared內(nèi)部核心邏輯就是將state扣減1
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                //扣減state并通過cas修改賦值
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

而countDown本質(zhì)上就是查看這個(gè)state,如果state被扣減為0,則調(diào)用aqs底層doReleaseShared方法將隊(duì)列中等待線程喚醒:

public void countDown() {
        sync.releaseShared(1);
    }


public final boolean releaseShared(int arg) {
  //查看是否扣減為0
        if (tryReleaseShared(arg)) {
        //如果是0則將當(dāng)前等待線程喚醒
            doReleaseShared();
            return true;
        }
        return false;
    }

上文講解countDown涉及一些關(guān)于AQS的實(shí)用理解和設(shè)計(jì),關(guān)于更多AQS的知識點(diǎn),感興趣的讀者可以閱讀一下筆者的這篇文章:《AQS 源碼解析:原理與實(shí)踐》。

二、Semaphore(信號量)

1. 詳解Semaphore

信號量多用于限流的場景,例如我們希望單位時(shí)間內(nèi)只能有一個(gè)線程工作,我們就可以使用信號量,只有拿到線程的信號量才能工作,工作完成后釋放信號量,其余線程才能爭搶這個(gè)信號量并進(jìn)行進(jìn)一步的操作。

對應(yīng)我們給出下面這段代碼,可以看到筆者聲明信號量數(shù)值為6,每當(dāng)線程拿到3個(gè)信號量之后就會執(zhí)行業(yè)務(wù)操作,完成后調(diào)用release釋放3個(gè)令牌,讓其他線程繼續(xù)爭搶:

//設(shè)置可復(fù)用的信號量,令牌數(shù)為3
        Semaphore semaphore = new Semaphore(6, true);
        //創(chuàng)建5個(gè)線程
        int workSize = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(workSize);


        for (int i = 0; i < workSize; i++) {
            executorService.submit(() -> {
                try {
                    //拿3個(gè)令牌
                    semaphore.acquire(3);

                    log.info("進(jìn)行業(yè)務(wù)邏輯處理.......");
                    ThreadUtil.sleep(1000);

                    //釋放3個(gè)令牌
                    semaphore.release(3);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executorService.shutdown();
        while (!executorService.isTerminated()) {

        }

對應(yīng)輸出結(jié)果如下,可以看到每個(gè)線程拿到令牌后都會休眠1秒,從輸出結(jié)果來看每秒只有兩個(gè)線程才工作,符合我們的限流需求:

2. 詳解Semaphore工作原理

Semaphore底層也是用到的aqs隊(duì)列,線程進(jìn)行資源獲取時(shí)也是通過查看state是否足夠,在明確足夠的情況下進(jìn)行state扣減,然后進(jìn)行工作。如果線程發(fā)現(xiàn)state數(shù)量不夠,那么就會被Semaphore存入aqs底層的抽象隊(duì)列中,直到state數(shù)量足夠后被喚醒:

對此我們給出Semaphore底層的acquire的邏輯可以看到,它會讀取state數(shù)值然后進(jìn)行扣減,如果剩余數(shù)量大于0則說明令牌獲取成功線程可以執(zhí)行后續(xù)邏輯,反之說明當(dāng)前令牌數(shù)不夠,外部邏輯會將該線程掛到等待隊(duì)列中,等待令牌足夠后將其喚醒:

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                //讀取可用的state    
                int available = getState();
                //計(jì)算剩余的state
                int remaining = available - acquires;
                //如果小于0說明令牌數(shù)不足直接返回出去,讓外部將線程掛起,反之通過cas修改剩余數(shù),返回大于0的結(jié)果讓持有令牌的線程執(zhí)行后續(xù)邏輯
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

3. 基于Semaphore實(shí)現(xiàn)一個(gè)有界容器

利用Semaphore信號量并發(fā)獲取且資源循環(huán)可復(fù)用的特性,我們可以通過實(shí)例封閉技術(shù)落地一個(gè)有界的容器,如下代碼所示,只有得到信號量且添加成功了信號量才會成功扣減,如果沒有拿到信號量就阻塞無法添加,除非其他線程釋放自己的資源。

如下圖,筆者利用信號量實(shí)現(xiàn)一個(gè)列表容器的限流設(shè)置,可以看到當(dāng)前容器還剩一個(gè)空間,所以信號量數(shù)也是1,當(dāng)線程0獲得信號量成功后將元素24添加至容器中。隨后的線程1看到信號量為0,即知曉容器沒有可用空間就會被阻塞等待:

一旦線程1刪除一個(gè)元素成功后,就會歸還一個(gè)令牌,此時(shí)線程1就會被信號量喚醒,嘗試獲取令牌并添加元素,這就是我們有界容器實(shí)現(xiàn)的核心思路:

對應(yīng)的我們給出有界容器的落地代碼示例:

public class BoundedList<E> {


    private final List<E> list;
    private final Semaphore semaphore;

    /*
    初始化一個(gè)并發(fā)的有界容器
     */
    public BoundedList(int bound) {
        this.list = Collections.synchronizedList(new ArrayList<>());
        this.semaphore = new Semaphore(bound);
    }


    public boolean add(E element) {
        boolean wasAdded = false;
        try {
            //獲取令牌,成功后才會添加容器
            semaphore.acquire();
            wasAdded = list.add(element);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            //添加失敗則釋放令牌,讓其他線程可以嘗試到該有界容器中添加
            if (!wasAdded)
                semaphore.release();
            return wasAdded;
        }
    }

    public void remove(E element) {
        boolean remove = false;
        try {

            remove = list.remove(element);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            //只有明確元素移除成功,才會釋放令牌
            if (remove)
                semaphore.release();
        }
    }

    @Override
    public String toString() {
        return JSONUtil.toJsonStr(list);
    }
}

對應(yīng)測試代碼如下,大體思路為:

  • 嘗試讓線程0填滿容器使線程1阻塞
  • 隨后線程0移除一個(gè)元素
  • 線程1被喚醒,并成功獲取令牌,將元素5成功添加
BoundedList<Integer> list = new BoundedList<>(5);
        CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(() -> {
            //添加5個(gè)元素填滿容器
            Console.log("線程1添加5個(gè)元素");
            for (int i = 0; i < 5; i++) {
                list.add(i);
            }

            ThreadUtil.sleep(5000);
            //移除元素2,讓線程2添加元素5成功
            Console.log("線程1移除元素2");
            list.remove(2);

            countDownLatch.countDown();

        }).start();


        new Thread(() -> {
            ThreadUtil.sleep(1000);
            Console.log("線程2添加元素5");
            list.add(5);
            Console.log("線程2添加元素5成功");
            countDownLatch.countDown();
        }).start();


        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Console.log("線程1和線程2執(zhí)行完畢,有界容器元素:{}", list);

輸出結(jié)果如下,符合我們對有界容器預(yù)期:

線程1添加5個(gè)元素
線程2添加元素5
線程1移除元素2
線程2添加元素5成功
線程1和線程2執(zhí)行完畢,有界容器元素:[0,1,3,4,5]

4. Semaphore使用注意事項(xiàng)

  • 獲取和釋放的時(shí)候都可以指定數(shù)量,但是要保持一致。
  • 公平性設(shè)置為true會更加合理
  • 并不必須由獲取許可證的線程釋放許可證。可以是A獲取,B釋放。

三、Condition

1. 詳解Condition

Condition即條件對象,不是很常用或者直接用到的對象,常用于線程等待喚醒操作,例如A線程需要等待某個(gè)條件的時(shí)候,我們可以通過condition.await()方法,A線程就會進(jìn)入阻塞狀態(tài)。

線程B執(zhí)行condition.signal()方法,則JVM就會從被阻塞線程中找到等待該condition的線程。線程A收到可執(zhí)行信號的時(shí)候,他的線程狀態(tài)就會變成Runnable可執(zhí)行狀態(tài)。

對此我們給出代碼示例,可以看到我們從ReentrantLock 中拿到一個(gè)Condition 對象,讓創(chuàng)建的線程進(jìn)入等待狀態(tài),隨后讓主線程調(diào)用condition 的signal將其喚醒:

private ReentrantLock lock = new ReentrantLock();
    //條件對象,操控線程的等待和通知
    private Condition condition = lock.newCondition();

    public void waitCondition() throws InterruptedException {
        lock.lock();
        try {
            log.info("等待達(dá)到條件后通知");
            condition.await();
            log.info("收到通知,開始執(zhí)行業(yè)務(wù)邏輯");
        } finally {
            lock.unlock();
            log.info("執(zhí)行完成,釋放鎖");
        }
    }


    public void notifyCondition() throws InterruptedException {
        lock.lock();
        try {
            log.info("達(dá)到條件發(fā)起通知");
            condition.signal();
            log.info("發(fā)起通知結(jié)束");
        } finally {
            lock.unlock();
            log.info("發(fā)起通知執(zhí)行完成,釋放鎖");
        }
    }


    public static void main(String[] args) throws InterruptedException {
        Main obj = new Main();

        new Thread(() -> {
            try {
                obj.waitCondition();
                //讓出CPU時(shí)間片,交給主線程發(fā)起通知
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                log.error("等待條件通知設(shè)置失敗,失敗原因 [{}]", e.getMessage(), e);
            }
        }).start();

        //休眠3s喚醒等待線程
        Thread.sleep(3000);
        obj.notifyCondition();
    }

對應(yīng)的我們也給出輸出結(jié)果:

2. 基于條件對象完成生產(chǎn)者、消費(fèi)者模式

我們假設(shè)用一個(gè)隊(duì)列存放一波生產(chǎn)者生產(chǎn)的資源,當(dāng)資源滿了通知消費(fèi)者消費(fèi)。當(dāng)消費(fèi)者消費(fèi)空了,通知生產(chǎn)者生產(chǎn)。

所以這時(shí)候使用condition控制流程最合適(這也是阻塞的隊(duì)列內(nèi)部的實(shí)現(xiàn)),所以我們要定義兩個(gè)信號,分別為:

  • 當(dāng)資源被耗盡,我們就使用資源未滿條件(notFull): 調(diào)用signal通知生產(chǎn)者消費(fèi),消費(fèi)者調(diào)用await進(jìn)入等待。
  • 當(dāng)資源被填滿,使用資源為空條件(notEmpty):將生產(chǎn)者用await方法掛起,消費(fèi)者用signal喚醒消費(fèi)告知非空。

很明顯生產(chǎn)者和消費(fèi)者本質(zhì)上就是基于這兩個(gè)標(biāo)識分別標(biāo)志自己的等待時(shí)機(jī)和通知時(shí)機(jī),以生產(chǎn)者為例,即每生產(chǎn)一個(gè)資源后就可以調(diào)用notEmpty通知消費(fèi)者消費(fèi),當(dāng)生產(chǎn)者速度過快,則用await等待未滿notFull條件阻塞:

首先我們給出生產(chǎn)者和消費(fèi)者條件和資源隊(duì)列聲明,基于上述條件我們給出一個(gè)經(jīng)典的生產(chǎn)者和消費(fèi)者模式的示例,我們首先給出生產(chǎn)者代碼,可以看到資源滿的時(shí)候調(diào)用notFull.await();將自己掛起等待未滿,生產(chǎn)資源后調(diào)用 notEmpty.signal();通知消費(fèi)者消費(fèi)。

對應(yīng)消費(fèi)者示例代碼也是一樣,當(dāng)資源消費(fèi)完全,調(diào)用notEmpty.await();等待不空,一旦消費(fèi)定量資源調(diào)用notFull.signal();通知生產(chǎn)者生產(chǎn)。

最終代碼示例如下:

@Slf4j
public class ProducerMode {

    //鎖
    private static ReentrantLock lock = new ReentrantLock();
    // 資源未滿
    private Condition notFull = lock.newCondition();
    //資源為空
    private Condition notEmpty = lock.newCondition();

    private Queue<Integer> queue = new PriorityQueue<>(10);
    private int queueMaxSize = 10;

    /**
     * 生產(chǎn)者
     */
    private class Producer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (queueMaxSize == queue.size()) {
                        log.info("當(dāng)前隊(duì)列已滿,通知消費(fèi)者消費(fèi)");
                        //等待不滿條件觸發(fā)
                        notFull.await();

                    }

                    queue.offer(1);
                    log.info("生產(chǎn)者補(bǔ)貨,當(dāng)前隊(duì)列有 【{}】", queue.size());
                    //通知消費(fèi)者隊(duì)列不空,可以消費(fèi)
                    notEmpty.signal();
                } catch (Exception e) {
                    log.error("生產(chǎn)者報(bào)錯,失敗原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }



    }

    /**
     * 消費(fèi)者
     */
    private class Consumer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (0 == queue.size()) {
                        log.info("當(dāng)前隊(duì)列已空,通知生產(chǎn)者補(bǔ)貨");
                        //等待不空條件達(dá)到
                        notEmpty.await();

                    }

                    queue.poll();
                    //通知消費(fèi)者不滿了
                    notFull.signal();
                    log.info("消費(fèi)者完成消費(fèi),當(dāng)前隊(duì)列還剩余 【{}】個(gè)元素", queue.size());
                } catch (Exception e) {
                    log.error("生產(chǎn)者報(bào)錯,失敗原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }
    }


    public static void main(String[] args) {
        ProducerMode mode = new ProducerMode();
        Producer producer = mode.new Producer();
        ProducerMode.Consumer consumer = mode.new Consumer();
        producer.start();
        consumer.start();
    }
}

對應(yīng)的我們給出輸出結(jié)果:

四、CyclicBarrier

1. CyclicBarrier 原理和使用示例

CyclicBarrier 也就是循環(huán)柵欄對象,不是很常用,它主要用于等待線程數(shù)就緒后執(zhí)行公共邏輯的業(yè)務(wù)場景。 例如我們希望每湊齊5個(gè)線程后執(zhí)行后續(xù)邏輯,我們就可以說明CyclicBarrier 數(shù)值為5,然后每個(gè)線程到期后調(diào)用await等待其他線程就緒。

一旦到齊5個(gè),CyclicBarrier 就會通知這些線程開始工作,對應(yīng)的代碼如下所示:

public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                System.out.println("線程 " + Thread.currentThread().getName() + " 開始執(zhí)行任務(wù)");
                try {
                    // 模擬執(zhí)行任務(wù)
                    Thread.sleep(1000);
                    System.out.println("線程 " + Thread.currentThread().getName() + " 到達(dá)屏障");
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }

                System.out.println("所有線程都到達(dá)屏障,一起繼續(xù)執(zhí)行");
            }).start();
        }
    }

對應(yīng)的我們給出相應(yīng)輸出示例:

2. CyclicBarrier 下的多核并發(fā)運(yùn)算技巧

利用循環(huán)柵欄的特點(diǎn),我們可以很好基于計(jì)算機(jī)核心數(shù)完成所有的耗時(shí)運(yùn)算,等待所有計(jì)算完成之后,通過柵欄來匯聚計(jì)算結(jié)果打印輸出:

對應(yīng)我們給出主線程的實(shí)現(xiàn),可以看到該處理器會得到一個(gè)與核心數(shù)一致的列表,并將列表中的每個(gè)子列表交由worker線程處理,每當(dāng)worker完成列表中一個(gè)元素運(yùn)算后,就會觸發(fā)柵欄的方法打印結(jié)果:

public class ArraySquareCalculator {


    private final List<List<Integer>> taskList;

    private final Worker[] workers;


    private final CyclicBarrier barrier;

    public ArraySquareCalculator(List<List<Integer>> taskList) {

        if (taskList == null || taskList.isEmpty()) {
            throw new RuntimeException("任務(wù)列表不能為空");
        }
        if (taskList.size() != Runtime.getRuntime().availableProcessors()) {
            throw new RuntimeException("任務(wù)列表數(shù)量必須等于CPU數(shù)量");
        }

        this.taskList = taskList;

        barrier = new CyclicBarrier(taskList.size(), () -> {
            Console.log("所有線程都到達(dá)屏障,執(zhí)行結(jié)束");
            Console.log("執(zhí)行結(jié)果:{}", JSONUtil.toJsonStr(taskList));
        });

        workers = new Worker[taskList.size()];
        for (int i = 0; i < taskList.size(); i++) {
            workers[i] = new Worker(i, taskList, barrier);
        }


    }


    //啟動核心數(shù)對應(yīng)的工作線程執(zhí)行運(yùn)算
    public synchronized void start() {
        for (Worker worker : workers) {
            new Thread(worker).start();
        }
    }


    
}

對應(yīng)的我們也給出worker子線程代碼,可以看到核心數(shù)對應(yīng)的子線程worker完成各自負(fù)責(zé)列表的元素運(yùn)算后,就會通過柵欄提交給主線程告知完成:

public class Worker implements Runnable {


    private final int elementIdx;
    private final List<List<Integer>> list;
    private final CyclicBarrier barrier;

    Worker(int elementIdx, List<List<Integer>> list, CyclicBarrier cyclicBarrier) {
        this.elementIdx = elementIdx;
        this.list = list;
        this.barrier = cyclicBarrier;
    }

    @SneakyThrows
    @Override
    public void run() {
        //每個(gè)核心對應(yīng)的線程處理各自索引列表
        List<Integer> workList = list.get(elementIdx);
        for (int i = 0; i < workList.size(); i++) {
            //完成負(fù)責(zé)列表元素計(jì)算后,通過屏障等待所有線程完成
            workList.set(i, workList.get(i) << 1);
            barrier.await();
        }


    }
}

對應(yīng)的我們也給出測試代碼:

//創(chuàng)建一個(gè)與核心數(shù)一樣的列表
        int size = Runtime.getRuntime().availableProcessors();
        List<List<Integer>> list = new ArrayList<>();
        //添加元素到列表中
        for (int i = 0; i < size; i++) {
            ArrayList<Integer> arrayList = new ArrayList<>();
            for (int j = 1; j <= 3; j++) {
                arrayList.add(j);
            }
            list.add(arrayList);

        }
        //啟動并行運(yùn)算處理器
        ArraySquareCalculator calculator = new ArraySquareCalculator(list);
        calculator.start();

輸出結(jié)果如下,與預(yù)期一致:

3. CyclicBarrier如何控制并發(fā)

以上面并行核心線程運(yùn)算邏輯為例,本質(zhì)上await方法調(diào)用后底層就會完成count扣減,當(dāng)count為0后就會觸發(fā)一次主線程邏輯調(diào)用,也就是我們的打印輸出,即通過count來完成線程之間的循環(huán)并發(fā)流程阻塞和通知:

對應(yīng)的我們也給出await的源碼,可以看到其內(nèi)部是通過調(diào)用dowait執(zhí)行上述所說邏輯:

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

查看dowait即可印證我們的邏輯:

  • 所有線程調(diào)用await執(zhí)行count扣減
  • count為0調(diào)用barrierCommand也就是我們初始化時(shí)設(shè)置的打印輸出方法
  • 完成barrierCommand任務(wù)執(zhí)行后調(diào)用nextGeneration將count重置為初始化時(shí)的數(shù)值,對應(yīng)的我們的代碼就是CPU核心數(shù)
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

          //......

            int index = --count;
            //count扣減為0 步入執(zhí)行邏輯
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                 //調(diào)用barrierCommand執(zhí)行歸并邏輯運(yùn)算
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //將count重置為初始值
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
//......
        } finally {
            lock.unlock();
        }
    }

4. CyclicBarrier 與CountDownLatch區(qū)別(重點(diǎn))

CountDownLatch用戶事件即主要是業(yè)務(wù)流程上的控制并不是針對線程,CyclicBarrier 循環(huán)柵欄作用于線程,如上代碼必須等待線程到齊后觸發(fā)。

循環(huán)柵欄可重復(fù)使用,CountDownLatch則不能。

責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2025-07-04 09:05:35

2025-02-07 14:42:59

2017-05-31 17:09:52

LinuxShell命令

2024-06-06 09:09:41

SQL循環(huán)控制命令

2010-05-11 12:53:58

Unix awk

2011-08-23 13:36:11

T-SQL查詢流程控制語句

2009-12-15 09:56:51

Ruby流程控制

2021-05-27 09:30:51

Java流程控制

2021-05-27 05:27:22

流程控制Rust

2009-09-04 10:42:56

C#流程控制語句

2024-11-01 16:05:26

2010-07-19 10:11:58

Perl流程控制語句

2021-02-03 06:15:26

工具postManHttp

2021-08-05 06:54:05

流程控制default

2024-11-05 12:59:42

while 循環(huán)迭代字節(jié)碼

2011-09-08 13:53:31

Node.js

2013-12-13 15:48:52

Lua腳本語言

2015-07-23 15:17:37

JavaScript循環(huán)語句

2010-03-18 16:37:13

Python 程序流程

2011-08-24 16:36:00

T-SQL
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號