偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Concurrent In Java

開發(fā) 后端
本篇文章一系列只是對(duì)JUC各個(gè)部分做了說明和介紹,希望對(duì)大家有所啟發(fā)。

這一系列只是對(duì)JUC各個(gè)部分做了說明和介紹,沒人深入原理!

concurrent并發(fā)包,讓你易于編寫并發(fā)程序。并發(fā)下我們經(jīng)常需要使用的基礎(chǔ)設(shè)施和解決的問題有ThreadPool、Lock、管道、集合點(diǎn)、線程之間等待和喚醒、線程間數(shù)據(jù)傳輸、共享資源訪問控制、并發(fā)線程之間的相互等待,等待。

concurrent提供的工具能夠解決絕大部分的場(chǎng)景,還能提高程序吞吐量。

現(xiàn)代的服務(wù)器多采用多核CPU,從而不同線程之間有可能真正地在同時(shí)運(yùn)行而不是cpu時(shí)間切片。在處理大計(jì)算量的程序上要盡可能利用CPU多核特性,提高系統(tǒng)吞吐量。

并發(fā)編程主要面臨三個(gè)問題:

1.如何讓多個(gè)線程同時(shí)為同一個(gè)任務(wù)工作(并發(fā)編程設(shè)計(jì))

2.多個(gè)線程之間對(duì)共享資源的爭(zhēng)用。

3.多個(gè)線程之間如何相互合作、傳遞數(shù)據(jù)。

1. concurrent包提供的集合

concurrent包直接提供了標(biāo)準(zhǔn)集合的一些實(shí)現(xiàn),在下面做簡(jiǎn)單介紹。在大部分情況下可以使用它們提供高并發(fā)環(huán)境下對(duì)集合訪問的吞吐量。

1.1 ConcurrentHashMap

Map的一個(gè)并發(fā)實(shí)現(xiàn)。在多線程環(huán)境下,它具有很高的吞吐量和具備可靠的數(shù)據(jù)一致性。它支持并發(fā)讀和一定程度的并發(fā)修改(默認(rèn)16個(gè)并發(fā),可以通過構(gòu)造函數(shù)修改)。

HashMap的實(shí)現(xiàn)是非線程安全的,高并發(fā)下會(huì)get方法常會(huì)死鎖,有的時(shí)候會(huì)表現(xiàn)為CPU居高不下。

  1. public V get(Object key) {  
  2. if (key == null)  
  3. return getForNullKey();  
  4. int hash = hash(key.hashCode());  
  5. for (Entry e = table[indexFor(hash, table.length)];  
  6. e != null;  
  7. e = e.next) {  
  8. Object k;  
  9. if (e.hash == hash && ((k = e.key) == key || key.equals(k)))  
  10. return e.value;  
  11. }  
  12. return null;  
  13. }  
  14.  

在get操作里面for循環(huán)取對(duì)象的操作,由于高并發(fā)同時(shí)讀寫,for循環(huán)的結(jié)果變得不可預(yù)知,所以有可能一直循環(huán)。

所以高并發(fā)環(huán)境下盡量不要直接使用HashMap,對(duì)系統(tǒng)造成的影響很難排除。

和Collections.synchronizedMap(new HashMap(...))相比,外ConcurrentHashMap在高并發(fā)的環(huán)境下有著更優(yōu)秀的吞吐量。因?yàn)镃oncurrentHashMap可以支持寫并發(fā),基本原理是內(nèi)部分段,分段的數(shù)量決定著并發(fā)程度。通過concurrencyLevel參數(shù)可以設(shè)置。如果你能預(yù)期并發(fā)數(shù)量那么設(shè)置該參數(shù)可以獲取更優(yōu)吞吐量。

另外為ConcurrentHashMap還實(shí)現(xiàn)了:

V putIfAbsent(K key, V value);

boolean remove(Object key, Object value);

boolean replace(K key, V oldValue, V newValue);

V replace(K key, V value);

這四個(gè)一致性的操作方法。

1.2 BlockingQueue

BlockingQueue定義了一個(gè)接口,繼承了Queue接口。Queue是一種數(shù)據(jù)結(jié)構(gòu),意思是它的項(xiàng)以先入先出(FIFO)順序存儲(chǔ)。

BlockingQueue為我們提供了一些多線程阻塞語義的方法,新增和重定義了一些方法插入:

 

 

BlockingQueue是線程安全的,非常適合多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者線程之間傳遞數(shù)據(jù)。

形象地理解,BlockingQueue好比有很多格子的傳輸帶系統(tǒng),不過當(dāng)你(生產(chǎn)者)調(diào)用put方法的時(shí)候,如果有空閑的格子那么放入物體后立刻返回,如果沒有空閑格子那么一直處于等待狀態(tài)。add方法意味著如果沒有空閑格子系統(tǒng)就會(huì)報(bào)警,然后如果處理該報(bào)警則按照你的意愿。offer方法優(yōu)先于add方法,它通過返回true 或 flase來告訴你是否放入成功。offer超時(shí)方法,如果不空閑的情況下,嘗試等待一段時(shí)間。

BlockingQueue有很多實(shí)現(xiàn)ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue

補(bǔ)充Dueue是個(gè)雙向隊(duì)列,可以當(dāng)做堆棧來使用。

BlockingQueue在ThreadPool中,作為任務(wù)隊(duì)列來使用,用來保存沒有立刻執(zhí)行的工作任務(wù)對(duì)象。

1.3 SynchronousQueue

SychronousQueue是BlockingQueue的一個(gè)實(shí)現(xiàn),它看起來是一個(gè)隊(duì)列,但是其實(shí)沒有容量,是特定條件下的一個(gè)精簡(jiǎn)實(shí)現(xiàn)。

做個(gè)比喻,SychronousQueue對(duì)象就像一個(gè)接力棒,現(xiàn)在有兩個(gè)運(yùn)動(dòng)員交棒者和接棒者(線程)要做交接。在交接點(diǎn),交棒者沒有交出之前是不能松開的(一種等待狀態(tài)),接棒者在接到棒之前是必須等待。換一句話說不管誰先到交接點(diǎn),必須處于等待狀態(tài)。

在生產(chǎn)者和消費(fèi)者模型中。如果生產(chǎn)者向SychronousQueue進(jìn)行put操作,直到有另外的消費(fèi)者線程進(jìn)行take操作時(shí)才能返回。對(duì)消費(fèi)者也是一樣,take操作會(huì)被阻塞,直到生產(chǎn)者put。

在這種生產(chǎn)者-消費(fèi)者模型下,生產(chǎn)者和消費(fèi)者是進(jìn)行手對(duì)手傳遞產(chǎn)品,在消費(fèi)者消費(fèi)一個(gè)產(chǎn)品之前,生產(chǎn)者必須處于等待狀態(tài)。它給我們提供了在線程之間交換單一元素的極輕量級(jí)方法,并且具有阻塞語義。

提示:上面舉例中有寫局限性。其實(shí)生產(chǎn)者和消費(fèi)者進(jìn)程是可以任意數(shù)量的。M:N。生產(chǎn)線程之間會(huì)對(duì)SychronousQueue進(jìn)行爭(zhēng)用,消費(fèi)者也是一樣。

對(duì)SychronousQueue類似于其他語境中“會(huì)合通道”或 “連接”點(diǎn)問題。它非常適合于傳遞性設(shè)計(jì),在這種設(shè)計(jì)中,在一個(gè)線程中運(yùn)行的對(duì)象要將某些信息、事件或任務(wù)傳遞給在另一個(gè)線程中運(yùn)行的對(duì)象,它就必須與該對(duì)象同步。

1.4Exchanger

是SychronousQueue的雙向?qū)崿F(xiàn)。用來伙伴線程間交互對(duì)象。Exchanger 可能在比如遺傳算法和管道設(shè)計(jì)中很有用。

形象地說,就是兩個(gè)人在預(yù)定的地方交互物品,任何一方?jīng)]到之前都處于等待狀態(tài)。

1.5 CopyOnWriteArrayList 和 CopyOnWriteArraySet

它們分別是List接口和Set接口的實(shí)現(xiàn)。正如類名所描述的那樣,當(dāng)數(shù)據(jù)結(jié)構(gòu)發(fā)生變化的時(shí)候,會(huì)復(fù)制自身的內(nèi)容,來保證一致性。大家都知道復(fù)制全部副本是非常昂貴的操作,看來這是一個(gè)非常不好的實(shí)現(xiàn)。事實(shí)上沒有最好和最差的方案,只有最合適的方案。一般情況下,處理多線程同步問題,我們傾向使用同步的 ArrayList,但同步也有其成本。

那么在什么情況下使用CopyOnWriteArrayList 或者CopyOnWriteArraySet呢?

數(shù)據(jù)量小。

對(duì)數(shù)據(jù)結(jié)構(gòu)的修改是偶然發(fā)生的,相對(duì)于讀操作。

舉例來說,如果我們實(shí)現(xiàn)觀察者模式的話,作為監(jiān)聽器集合是非常合適的。

1.6 TimeUnit

雖然是個(gè)時(shí)間單位,但是它也是concurrent包里面的。也許你以前的代碼里面經(jīng)常出現(xiàn)1*60*1000來表示一分鐘,代碼可讀性很差?,F(xiàn)在你可以通過TimeUnit來編寫可讀性更好的代碼,concurrent的api里面涉及到時(shí)間的地方都會(huì)使用該對(duì)象。

我之所以先進(jìn)并發(fā)框架常用的集合,是因?yàn)榫€程池的實(shí)現(xiàn)特性都利用了BlockingQueue的一些特性。

#p#

2. ThreadPool

雖然線程和進(jìn)程相比是輕量級(jí)許多,但是線程的創(chuàng)建成本還是不可忽律,所以就有了線程池化的設(shè)計(jì)。線程池的創(chuàng)建、管理、回收、任務(wù)隊(duì)列管理、任務(wù)分配等細(xì)節(jié)問題依然負(fù)責(zé),沒有必要重復(fù)發(fā)明輪子,concurrent包已經(jīng)為我們準(zhǔn)備了一些優(yōu)秀線程池的實(shí)現(xiàn)。

2.1 認(rèn)識(shí)ExecutorService 接口

ExecutorService 接口,它能提供的功能就是用來在將來某一個(gè)時(shí)刻異步地執(zhí)行一系列任務(wù)。雖然簡(jiǎn)單一句話,但是包含了很多需求點(diǎn)。它的實(shí)現(xiàn)至少包含了線程池和任務(wù)隊(duì)列兩個(gè)方面,其實(shí)還包括了任務(wù)失敗處理策略等。

 

 

經(jīng)常使用submit方法,用來提交任務(wù)對(duì)象。

簡(jiǎn)單的例子:

ExecutorService es = Executors.newCachedThreadPool();

es.submit(new Runnable(){

@Override

public void run() {

System.out.println("do some thing");

}

});

es.shutdown();

上面的例子只是完成了提交了一個(gè)任務(wù),異步地去執(zhí)行它。但是有些使用場(chǎng)景更為復(fù)雜,比如等待獲得異步任務(wù)的返回結(jié)果,或者最多等上固定的時(shí)間。

submit 方法返回一個(gè)對(duì)象,F(xiàn)uture??雌饋碛悬c(diǎn)別扭,代表將來的對(duì)象。其實(shí)看一下Future的方法就明白了。

 

其實(shí)Future對(duì)象代表了一個(gè)異步任務(wù)的結(jié)果,可以用來取消任務(wù)、查詢?nèi)蝿?wù)狀態(tài),還有通過get方法獲得異步任務(wù)返回的結(jié)果。當(dāng)調(diào)用get方法的時(shí)候,當(dāng)前線程被阻塞直到任務(wù)被處理完成或者出現(xiàn)異常。

 

我們可以通過保存Future對(duì)象來跟蹤查詢異步任務(wù)的執(zhí)行情況。

顯然Runnable接口中定義的 public void run();方法并不能返回結(jié)果對(duì)象,所以concurrent包提供了Callable接口,它可以被用來返回結(jié)果對(duì)象。

2.2 ThreadPoolExecutor

ThreadPoolExecutor實(shí)現(xiàn)了ExecutorService 接口,也是我們最主要使用的實(shí)現(xiàn)類。

首先非常有必要看一些類的最完整的構(gòu)造函數(shù)

 

  1. ThreadPoolExecutor(int corePoolSize,  
  2.  
  3.    int maximumPoolSize,  
  4.  
  5. long keepAliveTime,  
  6.  
  7. TimeUnit unit,  
  8.  
  9. BlockingQueue workQueue,  
  10.  
  11. ThreadFactory threadFactory,  
  12.  
  13. RejectedExecutionHandler handler)  
  14.  

 

ThreadPoolExecutor對(duì)象中有個(gè)poolSize變量表示當(dāng)前線程池中正在運(yùn)行的線程數(shù)量。

注意:這個(gè)有關(guān)非常重要的關(guān)系,常常被誤解。poolSize變量和corePoolSize、maximumPoolSize以及workQueue的關(guān)系。

首先線程池被創(chuàng)建初期,還沒有執(zhí)行任何任務(wù)的時(shí)候,poolSize等于0;

每次向線程池提交任務(wù)的時(shí)候,線程池處理過程如下:

1. 如果poolSize少于 corePoolSize,則首選添加新的線程,而不進(jìn)行排隊(duì)。

2. 如果poolSize等于或多于 corePoolSize,則首選將請(qǐng)求加入隊(duì)列workQueue,而不添加新的線程。

3. 如果第二步執(zhí)行失敗(隊(duì)已滿),則創(chuàng)建新的線程執(zhí)行任務(wù),但是如果果poolSize已經(jīng)達(dá)到maximumPoolSize,那么就拒絕該任務(wù)。如果處理被拒絕的任務(wù)就取決于RejectedExecutionHandler handler的設(shè)置了,默認(rèn)情況下會(huì)拋出異常。

系統(tǒng)存在四種任務(wù)拒絕策略:

在默認(rèn)的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運(yùn)行時(shí) RejectedExecutionException。

在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調(diào)用運(yùn)行該任務(wù)的 execute 本身。此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。

在 ThreadPoolExecutor.DiscardPolicy 中,不能執(zhí)行的任務(wù)將被刪除。

在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過程)。

keepAliveTime活動(dòng)線程如果空閑一段時(shí)間是否可以回收,通常只作用于超出corePoolSize的線程。corePoolSize的線程創(chuàng)建了就不會(huì)被回收。但是到j(luò)ava 6 之后增加了public void allowCoreThreadTimeOut(boolean value)方法,允許core進(jìn)程也可以根據(jù)keepAliveTime來回收,默認(rèn)為false。

決定線程池特性的還有workQueue的實(shí)現(xiàn)類,有三種類SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue,分別對(duì)應(yīng)同步隊(duì)列、無界隊(duì)列、有界隊(duì)列。

(摘自JavaDoc)

類SynchronousQueue,直接提交。工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。在此,如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗,因此會(huì)構(gòu)造一個(gè)新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請(qǐng)求集時(shí)出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)(設(shè)置maximumPoolSizes 為Integer.MAX_VALUE)。當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長(zhǎng)的可能性。

LinkedBlockingQueue,無界隊(duì)列。使用無界隊(duì)列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待。這樣,創(chuàng)建的線程就不會(huì)超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時(shí),適合于使用無界隊(duì)列;例如,在 Web 頁服務(wù)器中。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請(qǐng)求,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長(zhǎng)的可能性。

ArrayBlockingQueue,有界隊(duì)列。當(dāng)使用有限的 maximumPoolSizes 時(shí),有界隊(duì)列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過您許可的更多線程安排時(shí)間。使用小型隊(duì)列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會(huì)降低吞吐量。

綜上:構(gòu)造參數(shù)的設(shè)置是互相制約和影響的。只有當(dāng)你重復(fù)了解其相互關(guān)系的時(shí)候、或有特殊需求的時(shí)候,才可以自己構(gòu)造ThreadPoolExecutor對(duì)象,否則可以使用Executores是個(gè)工廠類。

提示使用線程池是注意處理shutdown,確保你系統(tǒng)關(guān)閉的時(shí)候主動(dòng)關(guān)閉shutdown。

2.3 ScheduledExecutorService

擴(kuò)展了ExecutorService接口,提供時(shí)間排程的功能。

 

 

schedule方法被用來延遲指定時(shí)間來執(zhí)行某個(gè)指定任務(wù)。如果你需要周期性重復(fù)執(zhí)行定時(shí)任務(wù)可以使用scheduleAtFixedRate或者scheduleWithFixedDelay方法,它們不同的是前者以固定頻率執(zhí)行,后者以相對(duì)固定頻率執(zhí)行。

(感謝wenbois2000 提出原先的錯(cuò)誤,我在這里重新描述!對(duì)于原先的錯(cuò)誤,實(shí)在不好意思啊,再次感謝!)不管任務(wù)執(zhí)行耗時(shí)是否大于間隔時(shí)間,scheduleAtFixedRate和scheduleWithFixedDelay都不會(huì)導(dǎo)致同一個(gè)任務(wù)并發(fā)地被執(zhí)行。唯一不同的是scheduleWithFixedDelay是當(dāng)前一個(gè)任務(wù)結(jié)束的時(shí)刻,開始結(jié)算間隔時(shí)間,如0秒開始執(zhí)行第一次任務(wù),任務(wù)耗時(shí)5秒,任務(wù)間隔時(shí)間3秒,那么第二次任務(wù)執(zhí)行的時(shí)間是在第8秒開始。

ScheduledExecutorService的實(shí)現(xiàn)類,是ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor對(duì)象包含的線程數(shù)量是沒有可伸縮性的,只會(huì)有固定數(shù)量的線程。不過你可以通過其構(gòu)造函數(shù)來設(shè)定線程的優(yōu)先級(jí),來降低定時(shí)任務(wù)線程的系統(tǒng)占用。

特別提示:通過ScheduledExecutorService執(zhí)行的周期任務(wù),如果任務(wù)執(zhí)行過程中拋出了異常,那么過ScheduledExecutorService就會(huì)停止執(zhí)行任務(wù),且也不會(huì)再周期地執(zhí)行該任務(wù)了。所以你如果想保住任務(wù)都一直被周期執(zhí)行,那么catch一切可能的異常。

2.4 Executors

Executores是個(gè)工廠類,用來生成ThreadPoolExecutor對(duì)象,它提供了一些常用的線程池配置方案,滿足我們大部分場(chǎng)景。

1. newCachedThreadPool

 

  1. public static ExecutorService newCachedThreadPool() {  
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
  3. 60L, TimeUnit.SECONDS,  
  4. new SynchronousQueue());  
  5. }  
  6.  

 

分析下出這個(gè)線程池配置的工作模式,當(dāng)沒有空閑進(jìn)程時(shí)就新建線程執(zhí)行,當(dāng)有空閑線程時(shí)就使用空閑線程執(zhí)行。當(dāng)線程空閑大60秒時(shí),系統(tǒng)自動(dòng)回收線程。

該線程池非常適合執(zhí)行短小異步任務(wù)時(shí)吞吐量非常高,會(huì)重復(fù)利用CPU的能力。但是如果任務(wù)處理IO邊界任務(wù),那么會(huì)消耗大量線程切換,降低系統(tǒng)吞吐量。所以執(zhí)行短小的計(jì)算任務(wù)非常高效,且當(dāng)沒有任務(wù)時(shí)不會(huì)消耗系統(tǒng)資源。

注意:線程池中沒有變量表示線程是否空閑。那么程序是如何控制的呢?不得不贊嘆concurrent實(shí)現(xiàn)的非常精巧。當(dāng)創(chuàng)建出來的線程完成原來的任務(wù)后,會(huì)調(diào)用BlockingQueue的Poll方法,對(duì)于SynchronousQueue實(shí)現(xiàn)而言會(huì)阻塞調(diào)用線程,直到另外的線程offer調(diào)用。

然而ThreadPool在分配任務(wù)的時(shí)候總是先去嘗試調(diào)用offer方法,所以就會(huì)觸發(fā)空閑線程再次調(diào)用。

精妙的是ThreadPoolExecutor的處理邏輯一樣,但是用BlockingQueue實(shí)現(xiàn)變了就產(chǎn)生不同的行為。

2. newFixedThreadPool

 

  1. public static ExecutorService newFixedThreadPool(int nThreads) {  
  2.  
  3. return new ThreadPoolExecutor(nThreads, nThreads,  
  4.  
  5. 0L, TimeUnit.MILLISECONDS,  
  6.  
  7. new LinkedBlockingQueue());  
  8.  
  9. }  
  10.  

 

創(chuàng)建固定線程數(shù)量的線程池,采用無界隊(duì)列,當(dāng)有更多任務(wù)的時(shí)候?qū)⒈环湃牍ぷ麝?duì)列中排隊(duì)。如果線程池不經(jīng)常執(zhí)行任務(wù)時(shí),你可以調(diào)用allowCoreThreadTimeOut(boolean value)的方法讓系統(tǒng)自動(dòng)回收core的進(jìn)程,以節(jié)約系統(tǒng)資源。

3. newSingleThreadExecutor

 

  1. public static ExecutorService newSingleThreadExecutor() {  
  2.  
  3. return new FinalizableDelegatedExecutorService  
  4.  
  5. (new ThreadPoolExecutor(11,  
  6.  
  7. 0L, TimeUnit.MILLISECONDS,  
  8.  
  9. new LinkedBlockingQueue()));  
  10.  
  11. }  
  12.  

只有一個(gè)工作線程的線程池。和newFixedThreadPool(1)相比,不同之處有兩點(diǎn):

1. 不可以重新配置newSingleThreadExecutor創(chuàng)建出來的線程池。

2. 當(dāng)創(chuàng)建出來的線程池對(duì)象被GC回收時(shí),會(huì)自動(dòng)調(diào)用shutdown方法。

4.newScheduledThreadPool

 

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
  2.        return new ScheduledThreadPoolExecutor(corePoolSize);  
  3.    }  

 

生成一個(gè)可以執(zhí)行時(shí)間調(diào)度的線程池。其實(shí)內(nèi)部使用無界工作隊(duì)列,線程數(shù)量最多能達(dá)到corePoolSize。

2.5 ExecutorCompletionService

這是個(gè)巧妙的設(shè)計(jì),內(nèi)部維護(hù)了一已經(jīng)完成了任務(wù)結(jié)果隊(duì)列,通過take方法可以同步地等待一個(gè)個(gè)結(jié)果對(duì)象。

詳情見http://www.oschina.net/uploads/doc/javase-6-doc-api-zh_CN/java/util/concurrent/ExecutorCompletionService.html

#p#

3. java.util.concurrent.locks

java 早期內(nèi)置synchronized關(guān)鍵字解決多線程對(duì)共享資源訪問的一些問題,和其還配套了Object的notify 和 wait方法,用來控制線程之間的同步。

concurrent軟件包提供了更為高級(jí)和抽象的Lock工具,能解決更多的問題。

Lock是控制多個(gè)線程對(duì)共享資源進(jìn)行訪問的工具。通常Lock限定多線程對(duì)同一個(gè)共享資源訪問的限制,一次只允許一個(gè)線程獲得Lock,即獲得對(duì)共享資源的訪問權(quán)限,線程間是互斥的。但是也有一些鎖如果ReadWriteLock是允許部分線程同時(shí)訪問共享資源的。

幾個(gè)術(shù)語:

爭(zhēng)用:當(dāng)多個(gè)Thread在同一時(shí)間內(nèi)(相對(duì)概念)想要占有同一個(gè)Lock對(duì)象。那么JVM會(huì)調(diào)度解決爭(zhēng)用。

獲取順序:當(dāng)多個(gè)線程爭(zhēng)用同一個(gè)Lock對(duì)象,那么JVM就要決定哪個(gè)線程將會(huì)獲得鎖權(quán)限。存在兩種模式:公平和不公平。 默認(rèn)都是不公平模式,包括synchronized關(guān)鍵字,jvm決定順序的時(shí)候也是采用不公平策略。因?yàn)楣讲呗孕枰到y(tǒng)記錄大量輔助信息來判斷分配順序,而不公平策略由JVM決定一直快速高效的算法來分配Lock。所以不公平策略的系統(tǒng)吞吐量會(huì)比較高(花費(fèi)更少的空間和計(jì)算在分配上),如果沒有特殊需要?jiǎng)t默認(rèn)采用不公平策略。

重入:當(dāng)前線程獲取指定的鎖對(duì)象權(quán)限后,還可以再次獲取該鎖。Lock內(nèi)部會(huì)有一個(gè)計(jì)數(shù)器來表明當(dāng)前線程獲取了該鎖的數(shù)量。如果一個(gè)線程獲取了一個(gè)鎖兩次,那么線程必須釋放鎖兩次,才能被看作完全釋放了該鎖,所以編程的時(shí)候一定要注意使用重入。synchronized關(guān)鍵字也是支持重入語義的。

3.1 Lock & ReentrantLock

ReentrantLock實(shí)現(xiàn)了Lock接口,一個(gè)可重入(reentrant)的互斥鎖 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監(jiān)視器鎖相同的一些基本行為和語義,但功能更強(qiáng)大。

摘自JavaDoc的一段獲取規(guī)則 “當(dāng)鎖沒有被另一個(gè)線程所擁有時(shí),調(diào)用 lock 的線程將成功獲取該鎖并返回。如果當(dāng)前線程已經(jīng)擁有該鎖,此方法將立即返回。ReentrantLock 將由最近成功獲得鎖,并且還沒有釋放該鎖的線程所擁有?!?/P>

經(jīng)典使用方法。

 

  1. public void m() {   
  2.     lock.lock();  // block until condition holds  
  3.     try {  
  4.       // ... method body  
  5.     } finally {  
  6.       lock.unlock()  
  7.     }  
  8.   }  
  9.  

 

ReentrantLock除了實(shí)現(xiàn)了Lock規(guī)定的方法外,還實(shí)現(xiàn)了tryLock、isLocked 等方法,幫助你實(shí)現(xiàn)更多的場(chǎng)景。

Condition

和Object的wait和notify方法類似。ReentrantLock對(duì)象附加了Conditon對(duì)象,用來完成掛起和喚醒操作,使用lock.newCondition() 方法生成。

一個(gè)來自JKD的例子:

 

  1. class BoundedBuffer {  
  2.   final Lock lock = new ReentrantLock();  
  3.   final Condition notFull  = lock.newCondition();   
  4.   final Condition notEmpty = lock.newCondition();   
  5.  
  6.   final Object[] items = new Object[100];  
  7.   int putptr, takeptr, count;  
  8.  
  9.   public void put(Object x) throws InterruptedException {  
  10.     lock.lock();  
  11.     try {  
  12.       while (count == items.length)   
  13.         notFull.await();  
  14.       items[putptr] = x;   
  15.       if (++putptr == items.length) putptr = 0;  
  16.       ++count;  
  17.       notEmpty.signal();  
  18.     } finally {  
  19.       lock.unlock();  
  20.     }  
  21.   }  
  22.  
  23.   public Object take() throws InterruptedException {  
  24.     lock.lock();  
  25.     try {  
  26.       while (count == 0)   
  27.         notEmpty.await();  
  28.       Object x = items[takeptr];   
  29.       if (++takeptr == items.length) takeptr = 0;  
  30.       --count;  
  31.       notFull.signal();  
  32.       return x;  
  33.     } finally {  
  34.       lock.unlock();  
  35.     }  
  36.   }   
  37. }  

 

利用Conditon對(duì)象可以讓所有對(duì)同一個(gè)鎖對(duì)象進(jìn)行爭(zhēng)用的Thread之間進(jìn)行同步。

Lock VS synchronized

除非你有明確的需求或者并發(fā)遇到瓶頸的時(shí)候再?zèng)Q定使用ReentrantLock。synchronized在大部分時(shí)候還是可以工作的很好,jvm會(huì)自動(dòng)處理和回收鎖。

ReentrantLock提供了更多的選擇和狀態(tài)信息。和

3.2 ReadWriteLock & ReentrantReadWriteLock

列舉一個(gè)場(chǎng)景對(duì)象X,擁有方法a、b、c。a和b方法不改表X的內(nèi)部狀態(tài),c改變內(nèi)部狀態(tài)。在多線程環(huán)境下,我們要求只讀和寫(變更狀態(tài))是不能同時(shí)進(jìn)行的,而只讀操作是可以同時(shí)并發(fā)的,且實(shí)際運(yùn)行過程中讀操作數(shù)量遠(yuǎn)遠(yuǎn)大于寫操作的數(shù)量。

如果用synchronized關(guān)鍵字的話,兩個(gè)只讀方法a、b也會(huì)互斥,并發(fā)性能收到限制。

那么這個(gè)情況下ReadWriteLock就非常有用,使用也非常簡(jiǎn)單。

 

  1. class RWDictionary {  
  2.    private final Map m = new TreeMap();  
  3.    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();  
  4.    private final Lock r = rwl.readLock();  
  5.    private final Lock w = rwl.writeLock();  
  6.  
  7.    public Data get(String key) {  
  8.        r.lock();  
  9.        try { return m.get(key); }  
  10.        finally { r.unlock(); }  
  11.    }  
  12.    public String[] allKeys() {  
  13.        r.lock();  
  14.        try { return m.keySet().toArray(); }  
  15.        finally { r.unlock(); }  
  16.    }  
  17.    public Data put(String key, Data value) {  
  18.        w.lock();  
  19.        try { return m.put(key, value); }  
  20.        finally { w.unlock(); }  
  21.    }  
  22.    public void clear() {  
  23.        w.lock();  
  24.        try { m.clear(); }  
  25.        finally { w.unlock(); }  
  26.    }  
  27. }  
  28.  

 

要記得write鎖是獨(dú)占的,它一樣可以使用ReentrantLock的Condition功能。

使用任何的鎖都要通過try catch 或者 finally 來處理異常,避免忘記unlock。

#p#

4. 同步輔助類

你提交了一些任務(wù),但你想等它們都完成了再做另外一些事情;你提交了一些任務(wù),但是不想讓它們立刻執(zhí)行,等你喊123開始的時(shí)候,它們才開始執(zhí)行;等等這些場(chǎng)景,線程之間需要相互配合,或者等待某一個(gè)條件成熟執(zhí)行。這些場(chǎng)景想你就需要用到同步輔助類。

4.1 CountDownLatch

CountDownLatch 內(nèi)部有個(gè)計(jì)數(shù)器,通過構(gòu)造函數(shù)來指定。這個(gè)類就好比是倒計(jì)時(shí)的電子牌,當(dāng)?shù)褂?jì)時(shí)為0的時(shí)候就可以一起做一些事情。

摘自JavaDoc的方法介紹

 

 

摘自JavaDoc的例子

 

  1. class Driver { // ...  
  2.   void main() throws InterruptedException {  
  3.     CountDownLatch startSignal = new CountDownLatch(1);  
  4.     CountDownLatch doneSignal = new CountDownLatch(N);  
  5.  
  6.     for (int i = 0; i < N; ++i) // create and start threads  
  7.       new Thread(new Worker(startSignal, doneSignal)).start();  
  8.  
  9.     doSomethingElse();            // don't let run yet  
  10.     startSignal.countDown();      // let all threads proceed  
  11.     doSomethingElse();  
  12.     doneSignal.await();           // wait for all to finish  
  13.   }  
  14. }  
  15.  
  16. class Worker implements Runnable {  
  17.   private final CountDownLatch startSignal;  
  18.   private final CountDownLatch doneSignal;  
  19.   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {  
  20.      this.startSignal = startSignal;  
  21.      this.doneSignal = doneSignal;  
  22.   }  
  23.   public void run() {  
  24.      try {  
  25.        startSignal.await();  
  26.        doWork();  
  27.        doneSignal.countDown();  
  28. catch (InterruptedException ex) {} // return;  
  29.   }  
  30.  
  31.   void doWork() { ... }  
  32. }  
  33.  

 

當(dāng)CountDownLatch(1)的時(shí)候,它就好比是個(gè)信號(hào)槍了。

4.2 CyclicBarrier

  1. new CyclicBarrier(N,   
  2.             new Runnable() {  
  3.               public void run() {   
  4.                mergeRows(...);   
  5.     }  
  6. }); 

這個(gè)同步輔助類,它讓多個(gè)線程可以在多個(gè)屏障點(diǎn)進(jìn)行等待,所以叫cyclic,而且有個(gè)附加選擇你可以在線程到達(dá)屏障點(diǎn)后執(zhí)行一個(gè)任務(wù)(在釋放其他線程之前)

 

 

為了幫助你理解,假設(shè)一個(gè)場(chǎng)景。

有一個(gè)任務(wù),A、B、C分別從三個(gè)倉庫(甲乙丙)搬運(yùn)不同3個(gè)不同的零件到客戶X的公司,然后再一起組裝機(jī)器,完成后一起坐車去公司總部。

這個(gè)任務(wù)需要ABC三個(gè)線程同時(shí)進(jìn)行,但是由于從倉庫到客戶X那邊距離不等、交通狀態(tài)未知的情況下,所花費(fèi)的時(shí)間也不等。同時(shí)由于三個(gè)人負(fù)責(zé)的零件不同,所以安裝機(jī)器的時(shí)候花費(fèi)時(shí)間也不一樣。這個(gè)場(chǎng)景中有兩個(gè)需要線程間等待的地方。CyclicBarrier就可以閃亮登場(chǎng)了。

 

  1. public class Main3 {  
  2.  
  3. public static void main(String[] args) {  
  4.  
  5. CyclicBarrier barrier = new CyclicBarrier(3,new Runnable() {  
  6.  
  7. @Override 
  8.  
  9. public void run() {  
  10.  
  11. System.out.println("到達(dá)公共屏障點(diǎn)");  
  12.  
  13. }  
  14.  
  15. });  
  16.  
  17. ExecutorService es = Executors.newCachedThreadPool();  
  18.  
  19. es.submit(new Worker("A"50008000, barrier));  
  20.  
  21. es.submit(new Worker("B"200016000, barrier));  
  22.  
  23. es.submit(new Worker("C"90002000, barrier));  
  24.  
  25. es.shutdown();  
  26.  
  27. }  
  28.  
  29. static class Worker implements Runnable {  
  30.  
  31. String name;  
  32.  
  33. int t1;// 搬運(yùn)零件所需要的時(shí)間  
  34.  
  35. int t2;// 參與組裝工作需要的時(shí)間  
  36.  
  37. CyclicBarrier barrier;  
  38.  
  39. public Worker(String name, int t1, int t2, CyclicBarrier barrier) {  
  40.  
  41. super();  
  42.  
  43. this.name = name;  
  44.  
  45. this.t1 = t1;  
  46.  
  47. this.t2 = t2;  
  48.  
  49. this.barrier = barrier;  
  50.  
  51. }  
  52.  
  53. @Override 
  54.  
  55. public void run() {  
  56.  
  57. try {  
  58.  
  59. print(name + " 開始搬運(yùn)零件");  
  60.  
  61. Thread.sleep(t1);// 模擬搬運(yùn)時(shí)間  
  62.  
  63. print(name + " 到達(dá)目的地");  
  64.  
  65. int a = barrier.await(); // 等待其他人  
  66.  
  67. if(a==0){  
  68.  
  69. //說明是最后一個(gè)到的可以執(zhí)行特殊操作  
  70.  
  71. }  
  72.  
  73. print(name + " 開始組裝機(jī)器");  
  74.  
  75. Thread.sleep(t2);// 模擬組裝時(shí)間.  
  76.  
  77. print(name + " 完成組裝機(jī)器");  
  78.  
  79. barrier.await(); // 等待其他人組裝完畢  
  80.  
  81. print(name + " 一起回總公司");  
  82.  
  83. catch (Exception e) {  
  84.  
  85. e.printStackTrace();  
  86.  
  87. }  
  88.  
  89. }  
  90.  
  91. }  
  92.  
  93. static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
  94.  
  95. static void print(String x) {  
  96.  
  97. System.out.println( sdf.format(new Date()) + ": "+x);  
  98.  
  99. }  
  100.  
  101. }  

 

4.3 Semaphore

一個(gè)經(jīng)典的信號(hào)量計(jì)數(shù)器。一般被用來控制對(duì)共享資源同時(shí)訪問線程數(shù)量的控制。

特殊情況下信號(hào)量設(shè)置為1,那么就類似互斥鎖的功能。

此類的構(gòu)造方法可選地接受一個(gè)公平 參數(shù)。當(dāng)設(shè)置為 false 時(shí),此類不對(duì)線程獲取鎖的順序做任何保證。和之前提到的爭(zhēng)用獲取順序一樣,在非公平模式下,系統(tǒng)將獲得更好的吞吐量,jvm也會(huì)保證在非公平模式下讓所有線程得到訪問機(jī)會(huì)。

【編輯推薦】

  1. java.util.concurrent 您不知道的 5 件事
  2. util.concurrent移植到C#
責(zé)任編輯:金賀 來源: JavaEye博客
相關(guān)推薦

2013-02-26 09:23:16

JavaJava類接口

2021-11-17 07:44:29

React 前端 組件

2010-07-12 10:03:50

ibmdwjava

2009-08-14 14:50:41

util.concur

2021-01-13 11:29:43

Python多線程異步

2024-01-17 12:44:23

Python并發(fā)編程

2024-12-24 08:03:56

2013-03-22 09:56:29

大數(shù)據(jù)應(yīng)用框架MapReduce

2020-11-06 13:25:38

React Concu

2024-10-31 09:30:05

線程池工具Java

2022-06-15 07:32:35

Lock線程Java

2025-03-28 04:00:00

互聯(lián)網(wǎng)Java讀操作

2009-07-09 10:28:19

線程池JDK5

2022-12-15 19:27:33

多線程代碼性能

2021-06-07 14:04:13

并發(fā)編程Future

2013-04-03 11:07:46

JavaJava線程

2020-07-08 12:05:55

Java線程池策略

2009-06-18 08:51:03

Spring3.0 M

2021-01-20 08:36:15

工具AtomicRefer JDK

2025-04-03 07:41:55

API阻塞隊(duì)列數(shù)據(jù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)