偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

多線(xiàn)程必考的「生產(chǎn)者 - 消費(fèi)者」模型,看這篇文章就夠了

開(kāi)發(fā) 前端
生產(chǎn)者 - 消費(fèi)者模型 Producer-consumer problem 是一個(gè)非常經(jīng)典的多線(xiàn)程并發(fā)協(xié)作的模型,在分布式系統(tǒng)里非常常見(jiàn)。也是面試中無(wú)論中美大廠(chǎng)都非常愛(ài)考的一個(gè)問(wèn)題,對(duì)應(yīng)屆生問(wèn)的要少一些,但是對(duì)于有工作經(jīng)驗(yàn)的工程師來(lái)說(shuō),非常愛(ài)考。

[[341897]]

 生產(chǎn)者 - 消費(fèi)者模型 Producer-consumer problem 是一個(gè)非常經(jīng)典的多線(xiàn)程并發(fā)協(xié)作的模型,在分布式系統(tǒng)里非常常見(jiàn)。也是面試中無(wú)論中美大廠(chǎng)都非常愛(ài)考的一個(gè)問(wèn)題,對(duì)應(yīng)屆生問(wèn)的要少一些,但是對(duì)于有工作經(jīng)驗(yàn)的工程師來(lái)說(shuō),非常愛(ài)考。

這個(gè)問(wèn)題有非常多的版本和解決方式,在本文我重點(diǎn)是和大家壹齊理清思路,由淺入深的思考問(wèn)題,保證大家看完了都能有所收獲。

問(wèn)題背景

簡(jiǎn)單來(lái)說(shuō),這個(gè)模型是由兩類(lèi)線(xiàn)程構(gòu)成:

  • 生產(chǎn)者線(xiàn)程:“生產(chǎn)”產(chǎn)品,并把產(chǎn)品放到一個(gè)隊(duì)列里;
  • 消費(fèi)者線(xiàn)程:“消費(fèi)”產(chǎn)品。
  • 隊(duì)列:數(shù)據(jù)緩存區(qū)。

有了這個(gè)隊(duì)列,生產(chǎn)者就只需要關(guān)注生產(chǎn),而不用管消費(fèi)者的消費(fèi)行為,更不用等待消費(fèi)者線(xiàn)程執(zhí)行完;消費(fèi)者也只管消費(fèi),不用管生產(chǎn)者是怎么生產(chǎn)的,更不用等著生產(chǎn)者生產(chǎn)。

所以該模型實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者之間的解藕和異步。

什么是異步呢?

比如說(shuō)你和你女朋友打電話(huà),就得等她接了電話(huà)你們才能說(shuō)話(huà),這是同步。

但是如果你跟她發(fā)微信,并不需要等她回復(fù),她也不需要立刻回復(fù),而是等她有空了再回,這就是異步。

但是呢,生產(chǎn)者和消費(fèi)者之間也不能完全沒(méi)有聯(lián)系的。

  • 如果隊(duì)列里的產(chǎn)品已經(jīng)滿(mǎn)了,生產(chǎn)者就不能繼續(xù)生產(chǎn);
  • 如果隊(duì)列里的產(chǎn)品從無(wú)到有,生產(chǎn)者就得通知一下消費(fèi)者,告訴它可以來(lái)消費(fèi)了;
  • 如果隊(duì)列里已經(jīng)沒(méi)有產(chǎn)品了,消費(fèi)者也無(wú)法繼續(xù)消費(fèi);
  • 如果隊(duì)列里的產(chǎn)品從滿(mǎn)到不滿(mǎn),消費(fèi)者也得去通知下生產(chǎn)者,說(shuō)你可以來(lái)生產(chǎn)了。

所以它們之間還需要有協(xié)作,最經(jīng)典的就是使用 Object 類(lèi)里自帶的 wait() 和 notify() 或者 notifyAll() 的消息通知機(jī)制。

上述描述中的等著,其實(shí)就是用 wait() 來(lái)實(shí)現(xiàn)的;

而通知,就是 notify() 或者 notifyAll() 。

那么基于這種消息通知機(jī)制,我們還能夠平衡生產(chǎn)者和消費(fèi)者之間的速度差異。

如果生產(chǎn)者的生產(chǎn)速度很慢,但是消費(fèi)者消費(fèi)的很快,就像是我們每月工資就發(fā)兩次,但是每天都要花錢(qián),也就是 1:15.

那么我們就需要調(diào)整生產(chǎn)者(發(fā)工資)為 15 個(gè)線(xiàn)程,消費(fèi)者保持 1 個(gè)線(xiàn)程,這樣是不是很爽~

總結(jié)下該模型的三大優(yōu)點(diǎn):

解藕,異步,平衡速度差異。

wait()/notify()

接下來(lái)我們需要重點(diǎn)看下這個(gè)通知機(jī)制。

wait() 和 notify() 都是 Java 中的 Object 類(lèi)自帶的方法,可以用來(lái)實(shí)現(xiàn)線(xiàn)程間的通信。

在上一節(jié)講的 11 個(gè) APIs 里我也提到了它,我們這里再展開(kāi)講一下。

wait() 方法是用來(lái)讓當(dāng)前線(xiàn)程等待,直到有別的線(xiàn)程調(diào)用 notify() 將它喚醒,或者我們可以設(shè)定一個(gè)時(shí)間讓它自動(dòng)蘇醒。

調(diào)用該方法之前,線(xiàn)程必須要獲得該對(duì)象的對(duì)象監(jiān)視器鎖,也就是只能用在加鎖的方法下。

而調(diào)用該方法之后,當(dāng)前線(xiàn)程會(huì)釋放鎖。(提示:這里很重要,也是下文代碼中用 while 而非 if 的原因。)

notify() 方法只能通知一個(gè)線(xiàn)程,如果多個(gè)線(xiàn)程在等待,那就喚醒任意一個(gè)。

notifyAll() 方法是可以喚醒所有等待線(xiàn)程,然后加入同步隊(duì)列。

這里我們用到了 2 個(gè)隊(duì)列:

  • 同步隊(duì)列:對(duì)應(yīng)于我們上一節(jié)講的線(xiàn)程狀態(tài)中的 Runnable,也就是線(xiàn)程準(zhǔn)備就緒,就等著搶資源了。
  • 等待隊(duì)列:對(duì)應(yīng)于我們上一節(jié)講的線(xiàn)程狀態(tài)中的 Waiting,也就是等待狀態(tài)。

這里需要注意,從等待狀態(tài)線(xiàn)程無(wú)法直接進(jìn)入 Q2,而是要先重新加入同步隊(duì)列,再次等待拿鎖,拿到了鎖才能進(jìn)去 Q2;一旦出了 Q2,鎖就丟了。

在 Q2 里,其實(shí)只有一個(gè)線(xiàn)程,因?yàn)檫@里我們必須要加鎖才能進(jìn)行操作。

實(shí)現(xiàn)

這里我首先建了一個(gè)簡(jiǎn)單的 Product 類(lèi),用來(lái)表示生產(chǎn)和消費(fèi)的產(chǎn)品,大家可以自行添加更多的 fields。

  1. public class Product  { 
  2.     private String name
  3.  
  4.     public Product(String name) { 
  5.         this.name = name
  6.     } 
  7.  
  8.     public String getName() { 
  9.         return name
  10.     } 
  11.  
  12.     public void setName(String name) { 
  13.         this.name = name
  14.     } 

主函數(shù)里我設(shè)定了兩類(lèi)線(xiàn)程,并且這里選擇用普通的 ArrayDeque 來(lái)實(shí)現(xiàn) Queue,更簡(jiǎn)單的方式是直接用 Java 中的 BlockingQueue 來(lái)實(shí)現(xiàn)。

BlockingQueue 是阻塞隊(duì)列,它有一系列的方法可以讓線(xiàn)程實(shí)現(xiàn)自動(dòng)阻塞,常用的 BlockingQueue 有很多,后面會(huì)單獨(dú)出一篇文章來(lái)講。

這里為了更好的理解并發(fā)協(xié)同的這個(gè)過(guò)程,我們先自己處理。

  1. public class Test { 
  2.     public static void main(String[] args) { 
  3.         Queue<Product> queue = new ArrayDeque<>(); 
  4.  
  5.         for (int i = 0; i < 100; i++) { 
  6.             new Thread(new Producer(queue, 100)).start(); 
  7.             new Thread(new Consumer(queue, 100)).start(); 
  8.         } 
  9.     } 

然后就是 Producer 和 Consumer 了。

  1. public class Producer implements Runnable{ 
  2.     private Queue<Product> queue; 
  3.     private int maxCapacity; 
  4.  
  5.     public Producer(Queue queue, int maxCapacity) { 
  6.         this.queue = queue; 
  7.         this.maxCapacity = maxCapacity; 
  8.     } 
  9.  
  10.     @Override 
  11.     public void run() { 
  12.         synchronized (queue) { 
  13.             while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解釋 
  14.                 try { 
  15.                     System.out.println("生產(chǎn)者" + Thread.currentThread().getName() + "等待中... Queue 已達(dá)到最大容量,無(wú)法生產(chǎn)"); 
  16.                     wait(); 
  17.                     System.out.println("生產(chǎn)者" + Thread.currentThread().getName() + "退出等待"); 
  18.                 } catch (InterruptedException e) { 
  19.                     e.printStackTrace(); 
  20.                 } 
  21.             } 
  22.             if (queue.size() == 0) { //隊(duì)列里的產(chǎn)品從無(wú)到有,需要通知在等待的消費(fèi)者 
  23.                 queue.notifyAll(); 
  24.             } 
  25.             Random random = new Random(); 
  26.             Integer i = random.nextInt(); 
  27.             queue.offer(new Product("產(chǎn)品"  + i.toString())); 
  28.             System.out.println("生產(chǎn)者" + Thread.currentThread().getName() + "生產(chǎn)了產(chǎn)品:" + i.toString()); 
  29.         } 
  30.     } 

其實(shí)它的主邏輯很簡(jiǎn)單,我這里為了方便演示加了很多打印語(yǔ)句才顯得有點(diǎn)復(fù)雜。

我們把主要邏輯拎出來(lái)看:

  1. public void run() { 
  2.        synchronized (queue) { 
  3.            while (queue.size() == maxCapacity) { //一定要用 while,而不是 if,下文解釋 
  4.                try { 
  5.                    wait(); 
  6.                } catch (InterruptedException e) { 
  7.                    e.printStackTrace(); 
  8.                } 
  9.            } 
  10.            if (queue.size() == 0) { 
  11.                queue.notifyAll(); 
  12.            } 
  13.            queue.offer(new Product("產(chǎn)品"  + i.toString())); 
  14.        } 
  15.    } 

這里有 3 塊內(nèi)容,再對(duì)照這個(gè)過(guò)程來(lái)看:

  1. 生產(chǎn)者線(xiàn)程拿到鎖后,其實(shí)就是進(jìn)入了 Q2 階段。首先檢查隊(duì)列是否容量已滿(mǎn),如果滿(mǎn)了,那就要去 Q3 等待;
  2. 如果不滿(mǎn),先檢查一下隊(duì)列原本是否為空,如果原來(lái)是空的,那就需要通知消費(fèi)者;
  3. 最后生產(chǎn)產(chǎn)品。

這里有個(gè)問(wèn)題,為什么只能用 while 而不是 if?

其實(shí)在這一小段,生產(chǎn)者線(xiàn)程經(jīng)歷了幾個(gè)過(guò)程:

  1. 如果隊(duì)列已滿(mǎn),它就沒(méi)法生產(chǎn),那也不能占著位置不做事,所以要把鎖讓出來(lái),去 Q3 - 等待隊(duì)列 等著;
  2. 在等待隊(duì)列里被喚醒之后,不能直接奪過(guò)鎖來(lái),而是要先加入 Q1 - 同步隊(duì)列 等待資源;
  3. 一旦搶到資源,關(guān)門(mén)上鎖,才能來(lái)到 Q2 繼續(xù)執(zhí)行 wait() 之后的活,但是,此時(shí)這個(gè)隊(duì)列有可能又滿(mǎn)了,所以退出 wait() 之后,還需要再次檢查 queue.size() == maxCapacity 這個(gè)條件,所以要用 while。

那么為什么可能又滿(mǎn)了呢?

因?yàn)榫€(xiàn)程沒(méi)有一直拿著鎖,在被喚醒之后,到拿到鎖之間的這段時(shí)間里,有可能其他的生產(chǎn)者線(xiàn)程先拿到了鎖進(jìn)行了生產(chǎn),所以隊(duì)列又經(jīng)歷了一個(gè)從不滿(mǎn)到滿(mǎn)的過(guò)程。

總結(jié):在使用線(xiàn)程的等待通知機(jī)制時(shí),一般都要在 while 循環(huán)中調(diào)用 wait() 方法。

消費(fèi)者線(xiàn)程是完全對(duì)稱(chēng)的,我們來(lái)看代碼。

  1. public class Consumer implements Runnable{ 
  2.     private Queue<Product> queue; 
  3.     private int maxCapacity; 
  4.  
  5.     public Consumer(Queue queue, int maxCapacity) { 
  6.         this.queue = queue; 
  7.         this.maxCapacity = maxCapacity; 
  8.     } 
  9.  
  10.     @Override 
  11.     public void run() { 
  12.         synchronized (queue) { 
  13.             while (queue.isEmpty()) { 
  14.                 try { 
  15.                     System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "等待中... Queue 已缺貨,無(wú)法消費(fèi)"); 
  16.                     wait(); 
  17.                     System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "退出等待"); 
  18.                 } catch (InterruptedException e) { 
  19.                     e.printStackTrace(); 
  20.                 } 
  21.             } 
  22.             if (queue.size() == maxCapacity) { 
  23.                 queue.notifyAll(); 
  24.             } 
  25.  
  26.             Product product = queue.poll(); 
  27.             System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "消費(fèi)了:" + product.getName()); 
  28.         } 
  29.     } 

結(jié)果如下:

小結(jié)

生產(chǎn)者 - 消費(fèi)者問(wèn)題是面試中經(jīng)常會(huì)遇到的題目,本文首先講了該模型的三大優(yōu)點(diǎn):解藕,異步,平衡速度差異,然后講解了等待/通知的消息機(jī)制以及在該模型中的應(yīng)用,最后進(jìn)行了代碼實(shí)現(xiàn)。

文中所有代碼已經(jīng)放到了我的 Github 上:https://github.com/xiaoqi6666/NYCSDE。

這個(gè) Github 匯總了我所有的文章和資料,之后也會(huì)一直更新和維護(hù),還希望大家給小齊點(diǎn)個(gè) Star,你們的支持和認(rèn)可,就是我創(chuàng)作的最大動(dòng)力,我們下篇文章見(jiàn)!

本文轉(zhuǎn)載自微信公眾號(hào)「碼農(nóng)田小齊」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系碼農(nóng)田小齊公眾號(hào)。

 

責(zé)任編輯:武曉燕 來(lái)源: 碼農(nóng)田小齊
相關(guān)推薦

2012-02-14 12:31:27

Java

2017-05-16 12:30:21

Python多線(xiàn)程生產(chǎn)者消費(fèi)者模式

2017-03-30 22:41:55

虛擬化操作系統(tǒng)軟件

2021-12-22 11:00:05

模型Golang語(yǔ)言

2015-08-26 09:39:30

java消費(fèi)者

2024-07-05 11:01:13

2021-11-10 07:47:48

Traefik邊緣網(wǎng)關(guān)

2009-08-13 13:14:31

C#生產(chǎn)者和消費(fèi)者

2024-10-11 09:27:52

2019-09-25 09:17:43

物聯(lián)網(wǎng)技術(shù)信息安全

2022-05-27 08:18:00

HashMapHash哈希表

2024-03-26 00:00:06

RedisZSet排行榜

2018-10-31 17:22:25

AI人工智能芯片

2024-03-14 11:58:43

2019-10-31 09:48:53

MySQL數(shù)據(jù)庫(kù)事務(wù)

2024-09-27 11:51:33

Redis多線(xiàn)程單線(xiàn)程

2018-08-17 09:14:43

餓了么容器演進(jìn)

2021-08-31 10:26:24

存儲(chǔ)

2024-02-28 08:59:47

2020-10-13 07:44:40

緩存雪崩 穿透
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)