Concurrent In Java
這一系列只是對(duì)JUC各個(gè)部分做了說明和介紹,沒人深入原理!
concurrent并發(fā)包,讓你易于編寫并發(fā)程序。并發(fā)下我們經(jīng)常需要使用的基礎(chǔ)設(shè)施和解決的問題有ThreadPool、Lock、管道、集合點(diǎn)、線程之間等待和喚醒、線程間數(shù)據(jù)傳輸、共享資源訪問控制、并發(fā)線程之間的相互等待,等待。
concurrent提供的工具能夠解決絕大部分的場(chǎng)景,還能提高程序吞吐量。
現(xiàn)代的服務(wù)器多采用多核CPU,從而不同線程之間有可能真正地在同時(shí)運(yùn)行而不是cpu時(shí)間切片。在處理大計(jì)算量的程序上要盡可能利用CPU多核特性,提高系統(tǒng)吞吐量。
并發(fā)編程主要面臨三個(gè)問題:
1.如何讓多個(gè)線程同時(shí)為同一個(gè)任務(wù)工作(并發(fā)編程設(shè)計(jì))
2.多個(gè)線程之間對(duì)共享資源的爭(zhēng)用。
3.多個(gè)線程之間如何相互合作、傳遞數(shù)據(jù)。
1. concurrent包提供的集合
concurrent包直接提供了標(biāo)準(zhǔn)集合的一些實(shí)現(xiàn),在下面做簡(jiǎn)單介紹。在大部分情況下可以使用它們提供高并發(fā)環(huán)境下對(duì)集合訪問的吞吐量。
1.1 ConcurrentHashMap
Map的一個(gè)并發(fā)實(shí)現(xiàn)。在多線程環(huán)境下,它具有很高的吞吐量和具備可靠的數(shù)據(jù)一致性。它支持并發(fā)讀和一定程度的并發(fā)修改(默認(rèn)16個(gè)并發(fā),可以通過構(gòu)造函數(shù)修改)。
HashMap的實(shí)現(xiàn)是非線程安全的,高并發(fā)下會(huì)get方法常會(huì)死鎖,有的時(shí)候會(huì)表現(xiàn)為CPU居高不下。
- public V get(Object key) {
- if (key == null)
- return getForNullKey();
- int hash = hash(key.hashCode());
- for (Entry e = table[indexFor(hash, table.length)];
- e != null;
- e = e.next) {
- Object k;
- if (e.hash == hash && ((k = e.key) == key || key.equals(k)))
- return e.value;
- }
- return null;
- }
在get操作里面for循環(huán)取對(duì)象的操作,由于高并發(fā)同時(shí)讀寫,for循環(huán)的結(jié)果變得不可預(yù)知,所以有可能一直循環(huán)。
所以高并發(fā)環(huán)境下盡量不要直接使用HashMap,對(duì)系統(tǒng)造成的影響很難排除。
和Collections.synchronizedMap(new HashMap(...))相比,外ConcurrentHashMap在高并發(fā)的環(huán)境下有著更優(yōu)秀的吞吐量。因?yàn)镃oncurrentHashMap可以支持寫并發(fā),基本原理是內(nèi)部分段,分段的數(shù)量決定著并發(fā)程度。通過concurrencyLevel參數(shù)可以設(shè)置。如果你能預(yù)期并發(fā)數(shù)量那么設(shè)置該參數(shù)可以獲取更優(yōu)吞吐量。
另外為ConcurrentHashMap還實(shí)現(xiàn)了:
V putIfAbsent(K key, V value);
boolean remove(Object key, Object value);
boolean replace(K key, V oldValue, V newValue);
V replace(K key, V value);
這四個(gè)一致性的操作方法。
1.2 BlockingQueue
BlockingQueue定義了一個(gè)接口,繼承了Queue接口。Queue是一種數(shù)據(jù)結(jié)構(gòu),意思是它的項(xiàng)以先入先出(FIFO)順序存儲(chǔ)。
BlockingQueue為我們提供了一些多線程阻塞語義的方法,新增和重定義了一些方法插入:
BlockingQueue是線程安全的,非常適合多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者線程之間傳遞數(shù)據(jù)。
形象地理解,BlockingQueue好比有很多格子的傳輸帶系統(tǒng),不過當(dāng)你(生產(chǎn)者)調(diào)用put方法的時(shí)候,如果有空閑的格子那么放入物體后立刻返回,如果沒有空閑格子那么一直處于等待狀態(tài)。add方法意味著如果沒有空閑格子系統(tǒng)就會(huì)報(bào)警,然后如果處理該報(bào)警則按照你的意愿。offer方法優(yōu)先于add方法,它通過返回true 或 flase來告訴你是否放入成功。offer超時(shí)方法,如果不空閑的情況下,嘗試等待一段時(shí)間。
BlockingQueue有很多實(shí)現(xiàn)ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue
補(bǔ)充Dueue是個(gè)雙向隊(duì)列,可以當(dāng)做堆棧來使用。
BlockingQueue在ThreadPool中,作為任務(wù)隊(duì)列來使用,用來保存沒有立刻執(zhí)行的工作任務(wù)對(duì)象。
1.3 SynchronousQueue
SychronousQueue是BlockingQueue的一個(gè)實(shí)現(xiàn),它看起來是一個(gè)隊(duì)列,但是其實(shí)沒有容量,是特定條件下的一個(gè)精簡(jiǎn)實(shí)現(xiàn)。
做個(gè)比喻,SychronousQueue對(duì)象就像一個(gè)接力棒,現(xiàn)在有兩個(gè)運(yùn)動(dòng)員交棒者和接棒者(線程)要做交接。在交接點(diǎn),交棒者沒有交出之前是不能松開的(一種等待狀態(tài)),接棒者在接到棒之前是必須等待。換一句話說不管誰先到交接點(diǎn),必須處于等待狀態(tài)。
在生產(chǎn)者和消費(fèi)者模型中。如果生產(chǎn)者向SychronousQueue進(jìn)行put操作,直到有另外的消費(fèi)者線程進(jìn)行take操作時(shí)才能返回。對(duì)消費(fèi)者也是一樣,take操作會(huì)被阻塞,直到生產(chǎn)者put。
在這種生產(chǎn)者-消費(fèi)者模型下,生產(chǎn)者和消費(fèi)者是進(jìn)行手對(duì)手傳遞產(chǎn)品,在消費(fèi)者消費(fèi)一個(gè)產(chǎn)品之前,生產(chǎn)者必須處于等待狀態(tài)。它給我們提供了在線程之間交換單一元素的極輕量級(jí)方法,并且具有阻塞語義。
提示:上面舉例中有寫局限性。其實(shí)生產(chǎn)者和消費(fèi)者進(jìn)程是可以任意數(shù)量的。M:N。生產(chǎn)線程之間會(huì)對(duì)SychronousQueue進(jìn)行爭(zhēng)用,消費(fèi)者也是一樣。
對(duì)SychronousQueue類似于其他語境中“會(huì)合通道”或 “連接”點(diǎn)問題。它非常適合于傳遞性設(shè)計(jì),在這種設(shè)計(jì)中,在一個(gè)線程中運(yùn)行的對(duì)象要將某些信息、事件或任務(wù)傳遞給在另一個(gè)線程中運(yùn)行的對(duì)象,它就必須與該對(duì)象同步。
1.4Exchanger
是SychronousQueue的雙向?qū)崿F(xiàn)。用來伙伴線程間交互對(duì)象。Exchanger 可能在比如遺傳算法和管道設(shè)計(jì)中很有用。
形象地說,就是兩個(gè)人在預(yù)定的地方交互物品,任何一方?jīng)]到之前都處于等待狀態(tài)。
1.5 CopyOnWriteArrayList 和 CopyOnWriteArraySet
它們分別是List接口和Set接口的實(shí)現(xiàn)。正如類名所描述的那樣,當(dāng)數(shù)據(jù)結(jié)構(gòu)發(fā)生變化的時(shí)候,會(huì)復(fù)制自身的內(nèi)容,來保證一致性。大家都知道復(fù)制全部副本是非常昂貴的操作,看來這是一個(gè)非常不好的實(shí)現(xiàn)。事實(shí)上沒有最好和最差的方案,只有最合適的方案。一般情況下,處理多線程同步問題,我們傾向使用同步的 ArrayList,但同步也有其成本。
那么在什么情況下使用CopyOnWriteArrayList 或者CopyOnWriteArraySet呢?
數(shù)據(jù)量小。
對(duì)數(shù)據(jù)結(jié)構(gòu)的修改是偶然發(fā)生的,相對(duì)于讀操作。
舉例來說,如果我們實(shí)現(xiàn)觀察者模式的話,作為監(jiān)聽器集合是非常合適的。
1.6 TimeUnit
雖然是個(gè)時(shí)間單位,但是它也是concurrent包里面的。也許你以前的代碼里面經(jīng)常出現(xiàn)1*60*1000來表示一分鐘,代碼可讀性很差?,F(xiàn)在你可以通過TimeUnit來編寫可讀性更好的代碼,concurrent的api里面涉及到時(shí)間的地方都會(huì)使用該對(duì)象。
我之所以先進(jìn)并發(fā)框架常用的集合,是因?yàn)榫€程池的實(shí)現(xiàn)特性都利用了BlockingQueue的一些特性。
#p#
2. ThreadPool
雖然線程和進(jìn)程相比是輕量級(jí)許多,但是線程的創(chuàng)建成本還是不可忽律,所以就有了線程池化的設(shè)計(jì)。線程池的創(chuàng)建、管理、回收、任務(wù)隊(duì)列管理、任務(wù)分配等細(xì)節(jié)問題依然負(fù)責(zé),沒有必要重復(fù)發(fā)明輪子,concurrent包已經(jīng)為我們準(zhǔn)備了一些優(yōu)秀線程池的實(shí)現(xiàn)。
2.1 認(rèn)識(shí)ExecutorService 接口
ExecutorService 接口,它能提供的功能就是用來在將來某一個(gè)時(shí)刻異步地執(zhí)行一系列任務(wù)。雖然簡(jiǎn)單一句話,但是包含了很多需求點(diǎn)。它的實(shí)現(xiàn)至少包含了線程池和任務(wù)隊(duì)列兩個(gè)方面,其實(shí)還包括了任務(wù)失敗處理策略等。

經(jīng)常使用submit方法,用來提交任務(wù)對(duì)象。
簡(jiǎn)單的例子:
ExecutorService es = Executors.newCachedThreadPool();
es.submit(new Runnable(){
@Override
public void run() {
System.out.println("do some thing");
}
});
es.shutdown();
上面的例子只是完成了提交了一個(gè)任務(wù),異步地去執(zhí)行它。但是有些使用場(chǎng)景更為復(fù)雜,比如等待獲得異步任務(wù)的返回結(jié)果,或者最多等上固定的時(shí)間。
submit 方法返回一個(gè)對(duì)象,F(xiàn)uture??雌饋碛悬c(diǎn)別扭,代表將來的對(duì)象。其實(shí)看一下Future的方法就明白了。

其實(shí)Future對(duì)象代表了一個(gè)異步任務(wù)的結(jié)果,可以用來取消任務(wù)、查詢?nèi)蝿?wù)狀態(tài),還有通過get方法獲得異步任務(wù)返回的結(jié)果。當(dāng)調(diào)用get方法的時(shí)候,當(dāng)前線程被阻塞直到任務(wù)被處理完成或者出現(xiàn)異常。
我們可以通過保存Future對(duì)象來跟蹤查詢異步任務(wù)的執(zhí)行情況。
顯然Runnable接口中定義的 public void run();方法并不能返回結(jié)果對(duì)象,所以concurrent包提供了Callable接口,它可以被用來返回結(jié)果對(duì)象。
2.2 ThreadPoolExecutor
ThreadPoolExecutor實(shí)現(xiàn)了ExecutorService 接口,也是我們最主要使用的實(shí)現(xiàn)類。
首先非常有必要看一些類的最完整的構(gòu)造函數(shù)
- ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue
workQueue, - ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
ThreadPoolExecutor對(duì)象中有個(gè)poolSize變量表示當(dāng)前線程池中正在運(yùn)行的線程數(shù)量。
注意:這個(gè)有關(guān)非常重要的關(guān)系,常常被誤解。poolSize變量和corePoolSize、maximumPoolSize以及workQueue的關(guān)系。
首先線程池被創(chuàng)建初期,還沒有執(zhí)行任何任務(wù)的時(shí)候,poolSize等于0;
每次向線程池提交任務(wù)的時(shí)候,線程池處理過程如下:
1. 如果poolSize少于 corePoolSize,則首選添加新的線程,而不進(jìn)行排隊(duì)。
2. 如果poolSize等于或多于 corePoolSize,則首選將請(qǐng)求加入隊(duì)列workQueue,而不添加新的線程。
3. 如果第二步執(zhí)行失敗(隊(duì)已滿),則創(chuàng)建新的線程執(zhí)行任務(wù),但是如果果poolSize已經(jīng)達(dá)到maximumPoolSize,那么就拒絕該任務(wù)。如果處理被拒絕的任務(wù)就取決于RejectedExecutionHandler handler的設(shè)置了,默認(rèn)情況下會(huì)拋出異常。
系統(tǒng)存在四種任務(wù)拒絕策略:
在默認(rèn)的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運(yùn)行時(shí) RejectedExecutionException。
在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調(diào)用運(yùn)行該任務(wù)的 execute 本身。此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。
在 ThreadPoolExecutor.DiscardPolicy 中,不能執(zhí)行的任務(wù)將被刪除。
在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過程)。
keepAliveTime活動(dòng)線程如果空閑一段時(shí)間是否可以回收,通常只作用于超出corePoolSize的線程。corePoolSize的線程創(chuàng)建了就不會(huì)被回收。但是到j(luò)ava 6 之后增加了public void allowCoreThreadTimeOut(boolean value)方法,允許core進(jìn)程也可以根據(jù)keepAliveTime來回收,默認(rèn)為false。
決定線程池特性的還有workQueue的實(shí)現(xiàn)類,有三種類SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue,分別對(duì)應(yīng)同步隊(duì)列、無界隊(duì)列、有界隊(duì)列。
(摘自JavaDoc)
類SynchronousQueue,直接提交。工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。在此,如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗,因此會(huì)構(gòu)造一個(gè)新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請(qǐng)求集時(shí)出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)(設(shè)置maximumPoolSizes 為Integer.MAX_VALUE)。當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長(zhǎng)的可能性。
LinkedBlockingQueue,無界隊(duì)列。使用無界隊(duì)列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待。這樣,創(chuàng)建的線程就不會(huì)超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時(shí),適合于使用無界隊(duì)列;例如,在 Web 頁服務(wù)器中。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請(qǐng)求,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長(zhǎng)的可能性。
ArrayBlockingQueue,有界隊(duì)列。當(dāng)使用有限的 maximumPoolSizes 時(shí),有界隊(duì)列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過您許可的更多線程安排時(shí)間。使用小型隊(duì)列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會(huì)降低吞吐量。
綜上:構(gòu)造參數(shù)的設(shè)置是互相制約和影響的。只有當(dāng)你重復(fù)了解其相互關(guān)系的時(shí)候、或有特殊需求的時(shí)候,才可以自己構(gòu)造ThreadPoolExecutor對(duì)象,否則可以使用Executores是個(gè)工廠類。
提示使用線程池是注意處理shutdown,確保你系統(tǒng)關(guān)閉的時(shí)候主動(dòng)關(guān)閉shutdown。
2.3 ScheduledExecutorService
擴(kuò)展了ExecutorService接口,提供時(shí)間排程的功能。
schedule方法被用來延遲指定時(shí)間來執(zhí)行某個(gè)指定任務(wù)。如果你需要周期性重復(fù)執(zhí)行定時(shí)任務(wù)可以使用scheduleAtFixedRate或者scheduleWithFixedDelay方法,它們不同的是前者以固定頻率執(zhí)行,后者以相對(duì)固定頻率執(zhí)行。
(感謝wenbois2000 提出原先的錯(cuò)誤,我在這里重新描述!對(duì)于原先的錯(cuò)誤,實(shí)在不好意思啊,再次感謝!)不管任務(wù)執(zhí)行耗時(shí)是否大于間隔時(shí)間,scheduleAtFixedRate和scheduleWithFixedDelay都不會(huì)導(dǎo)致同一個(gè)任務(wù)并發(fā)地被執(zhí)行。唯一不同的是scheduleWithFixedDelay是當(dāng)前一個(gè)任務(wù)結(jié)束的時(shí)刻,開始結(jié)算間隔時(shí)間,如0秒開始執(zhí)行第一次任務(wù),任務(wù)耗時(shí)5秒,任務(wù)間隔時(shí)間3秒,那么第二次任務(wù)執(zhí)行的時(shí)間是在第8秒開始。
ScheduledExecutorService的實(shí)現(xiàn)類,是ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor對(duì)象包含的線程數(shù)量是沒有可伸縮性的,只會(huì)有固定數(shù)量的線程。不過你可以通過其構(gòu)造函數(shù)來設(shè)定線程的優(yōu)先級(jí),來降低定時(shí)任務(wù)線程的系統(tǒng)占用。
特別提示:通過ScheduledExecutorService執(zhí)行的周期任務(wù),如果任務(wù)執(zhí)行過程中拋出了異常,那么過ScheduledExecutorService就會(huì)停止執(zhí)行任務(wù),且也不會(huì)再周期地執(zhí)行該任務(wù)了。所以你如果想保住任務(wù)都一直被周期執(zhí)行,那么catch一切可能的異常。
2.4 Executors
Executores是個(gè)工廠類,用來生成ThreadPoolExecutor對(duì)象,它提供了一些常用的線程池配置方案,滿足我們大部分場(chǎng)景。
1. newCachedThreadPool
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue());
- }
分析下出這個(gè)線程池配置的工作模式,當(dāng)沒有空閑進(jìn)程時(shí)就新建線程執(zhí)行,當(dāng)有空閑線程時(shí)就使用空閑線程執(zhí)行。當(dāng)線程空閑大60秒時(shí),系統(tǒng)自動(dòng)回收線程。
該線程池非常適合執(zhí)行短小異步任務(wù)時(shí)吞吐量非常高,會(huì)重復(fù)利用CPU的能力。但是如果任務(wù)處理IO邊界任務(wù),那么會(huì)消耗大量線程切換,降低系統(tǒng)吞吐量。所以執(zhí)行短小的計(jì)算任務(wù)非常高效,且當(dāng)沒有任務(wù)時(shí)不會(huì)消耗系統(tǒng)資源。
注意:線程池中沒有變量表示線程是否空閑。那么程序是如何控制的呢?不得不贊嘆concurrent實(shí)現(xiàn)的非常精巧。當(dāng)創(chuàng)建出來的線程完成原來的任務(wù)后,會(huì)調(diào)用BlockingQueue的Poll方法,對(duì)于SynchronousQueue實(shí)現(xiàn)而言會(huì)阻塞調(diào)用線程,直到另外的線程offer調(diào)用。
然而ThreadPool在分配任務(wù)的時(shí)候總是先去嘗試調(diào)用offer方法,所以就會(huì)觸發(fā)空閑線程再次調(diào)用。
精妙的是ThreadPoolExecutor的處理邏輯一樣,但是用BlockingQueue實(shí)現(xiàn)變了就產(chǎn)生不同的行為。
2. newFixedThreadPool
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue());
- }
創(chuàng)建固定線程數(shù)量的線程池,采用無界隊(duì)列,當(dāng)有更多任務(wù)的時(shí)候?qū)⒈环湃牍ぷ麝?duì)列中排隊(duì)。如果線程池不經(jīng)常執(zhí)行任務(wù)時(shí),你可以調(diào)用allowCoreThreadTimeOut(boolean value)的方法讓系統(tǒng)自動(dòng)回收core的進(jìn)程,以節(jié)約系統(tǒng)資源。
3. newSingleThreadExecutor
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue()));
- }
只有一個(gè)工作線程的線程池。和newFixedThreadPool(1)相比,不同之處有兩點(diǎn):
1. 不可以重新配置newSingleThreadExecutor創(chuàng)建出來的線程池。
2. 當(dāng)創(chuàng)建出來的線程池對(duì)象被GC回收時(shí),會(huì)自動(dòng)調(diào)用shutdown方法。
4.newScheduledThreadPool
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
- return new ScheduledThreadPoolExecutor(corePoolSize);
- }
生成一個(gè)可以執(zhí)行時(shí)間調(diào)度的線程池。其實(shí)內(nèi)部使用無界工作隊(duì)列,線程數(shù)量最多能達(dá)到corePoolSize。
2.5 ExecutorCompletionService
這是個(gè)巧妙的設(shè)計(jì),內(nèi)部維護(hù)了一已經(jīng)完成了任務(wù)結(jié)果隊(duì)列,通過take方法可以同步地等待一個(gè)個(gè)結(jié)果對(duì)象。
詳情見http://www.oschina.net/uploads/doc/javase-6-doc-api-zh_CN/java/util/concurrent/ExecutorCompletionService.html
#p#
3. java.util.concurrent.locks
java 早期內(nèi)置synchronized關(guān)鍵字解決多線程對(duì)共享資源訪問的一些問題,和其還配套了Object的notify 和 wait方法,用來控制線程之間的同步。
concurrent軟件包提供了更為高級(jí)和抽象的Lock工具,能解決更多的問題。
Lock是控制多個(gè)線程對(duì)共享資源進(jìn)行訪問的工具。通常Lock限定多線程對(duì)同一個(gè)共享資源訪問的限制,一次只允許一個(gè)線程獲得Lock,即獲得對(duì)共享資源的訪問權(quán)限,線程間是互斥的。但是也有一些鎖如果ReadWriteLock是允許部分線程同時(shí)訪問共享資源的。
幾個(gè)術(shù)語:
爭(zhēng)用:當(dāng)多個(gè)Thread在同一時(shí)間內(nèi)(相對(duì)概念)想要占有同一個(gè)Lock對(duì)象。那么JVM會(huì)調(diào)度解決爭(zhēng)用。
獲取順序:當(dāng)多個(gè)線程爭(zhēng)用同一個(gè)Lock對(duì)象,那么JVM就要決定哪個(gè)線程將會(huì)獲得鎖權(quán)限。存在兩種模式:公平和不公平。 默認(rèn)都是不公平模式,包括synchronized關(guān)鍵字,jvm決定順序的時(shí)候也是采用不公平策略。因?yàn)楣讲呗孕枰到y(tǒng)記錄大量輔助信息來判斷分配順序,而不公平策略由JVM決定一直快速高效的算法來分配Lock。所以不公平策略的系統(tǒng)吞吐量會(huì)比較高(花費(fèi)更少的空間和計(jì)算在分配上),如果沒有特殊需要?jiǎng)t默認(rèn)采用不公平策略。
重入:當(dāng)前線程獲取指定的鎖對(duì)象權(quán)限后,還可以再次獲取該鎖。Lock內(nèi)部會(huì)有一個(gè)計(jì)數(shù)器來表明當(dāng)前線程獲取了該鎖的數(shù)量。如果一個(gè)線程獲取了一個(gè)鎖兩次,那么線程必須釋放鎖兩次,才能被看作完全釋放了該鎖,所以編程的時(shí)候一定要注意使用重入。synchronized關(guān)鍵字也是支持重入語義的。
3.1 Lock & ReentrantLock
ReentrantLock實(shí)現(xiàn)了Lock接口,一個(gè)可重入(reentrant)的互斥鎖 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監(jiān)視器鎖相同的一些基本行為和語義,但功能更強(qiáng)大。
摘自JavaDoc的一段獲取規(guī)則 “當(dāng)鎖沒有被另一個(gè)線程所擁有時(shí),調(diào)用 lock 的線程將成功獲取該鎖并返回。如果當(dāng)前線程已經(jīng)擁有該鎖,此方法將立即返回。ReentrantLock 將由最近成功獲得鎖,并且還沒有釋放該鎖的線程所擁有?!?/P>
經(jīng)典使用方法。
- public void m() {
- lock.lock(); // block until condition holds
- try {
- // ... method body
- } finally {
- lock.unlock()
- }
- }
ReentrantLock除了實(shí)現(xiàn)了Lock規(guī)定的方法外,還實(shí)現(xiàn)了tryLock、isLocked 等方法,幫助你實(shí)現(xiàn)更多的場(chǎng)景。
Condition
和Object的wait和notify方法類似。ReentrantLock對(duì)象附加了Conditon對(duì)象,用來完成掛起和喚醒操作,使用lock.newCondition() 方法生成。
一個(gè)來自JKD的例子:
- class BoundedBuffer {
- final Lock lock = new ReentrantLock();
- final Condition notFull = lock.newCondition();
- final Condition notEmpty = lock.newCondition();
- final Object[] items = new Object[100];
- int putptr, takeptr, count;
- public void put(Object x) throws InterruptedException {
- lock.lock();
- try {
- while (count == items.length)
- notFull.await();
- items[putptr] = x;
- if (++putptr == items.length) putptr = 0;
- ++count;
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
- }
- public Object take() throws InterruptedException {
- lock.lock();
- try {
- while (count == 0)
- notEmpty.await();
- Object x = items[takeptr];
- if (++takeptr == items.length) takeptr = 0;
- --count;
- notFull.signal();
- return x;
- } finally {
- lock.unlock();
- }
- }
- }
利用Conditon對(duì)象可以讓所有對(duì)同一個(gè)鎖對(duì)象進(jìn)行爭(zhēng)用的Thread之間進(jìn)行同步。
Lock VS synchronized
除非你有明確的需求或者并發(fā)遇到瓶頸的時(shí)候再?zèng)Q定使用ReentrantLock。synchronized在大部分時(shí)候還是可以工作的很好,jvm會(huì)自動(dòng)處理和回收鎖。
ReentrantLock提供了更多的選擇和狀態(tài)信息。和
3.2 ReadWriteLock & ReentrantReadWriteLock
列舉一個(gè)場(chǎng)景對(duì)象X,擁有方法a、b、c。a和b方法不改表X的內(nèi)部狀態(tài),c改變內(nèi)部狀態(tài)。在多線程環(huán)境下,我們要求只讀和寫(變更狀態(tài))是不能同時(shí)進(jìn)行的,而只讀操作是可以同時(shí)并發(fā)的,且實(shí)際運(yùn)行過程中讀操作數(shù)量遠(yuǎn)遠(yuǎn)大于寫操作的數(shù)量。
如果用synchronized關(guān)鍵字的話,兩個(gè)只讀方法a、b也會(huì)互斥,并發(fā)性能收到限制。
那么這個(gè)情況下ReadWriteLock就非常有用,使用也非常簡(jiǎn)單。
- class RWDictionary {
- private final Map
m = new TreeMap(); - private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- private final Lock r = rwl.readLock();
- private final Lock w = rwl.writeLock();
- public Data get(String key) {
- r.lock();
- try { return m.get(key); }
- finally { r.unlock(); }
- }
- public String[] allKeys() {
- r.lock();
- try { return m.keySet().toArray(); }
- finally { r.unlock(); }
- }
- public Data put(String key, Data value) {
- w.lock();
- try { return m.put(key, value); }
- finally { w.unlock(); }
- }
- public void clear() {
- w.lock();
- try { m.clear(); }
- finally { w.unlock(); }
- }
- }
要記得write鎖是獨(dú)占的,它一樣可以使用ReentrantLock的Condition功能。
使用任何的鎖都要通過try catch 或者 finally 來處理異常,避免忘記unlock。
#p#
4. 同步輔助類
你提交了一些任務(wù),但你想等它們都完成了再做另外一些事情;你提交了一些任務(wù),但是不想讓它們立刻執(zhí)行,等你喊123開始的時(shí)候,它們才開始執(zhí)行;等等這些場(chǎng)景,線程之間需要相互配合,或者等待某一個(gè)條件成熟執(zhí)行。這些場(chǎng)景想你就需要用到同步輔助類。
4.1 CountDownLatch
CountDownLatch 內(nèi)部有個(gè)計(jì)數(shù)器,通過構(gòu)造函數(shù)來指定。這個(gè)類就好比是倒計(jì)時(shí)的電子牌,當(dāng)?shù)褂?jì)時(shí)為0的時(shí)候就可以一起做一些事情。
摘自JavaDoc的方法介紹
摘自JavaDoc的例子
- class Driver { // ...
- void main() throws InterruptedException {
- CountDownLatch startSignal = new CountDownLatch(1);
- CountDownLatch doneSignal = new CountDownLatch(N);
- for (int i = 0; i < N; ++i) // create and start threads
- new Thread(new Worker(startSignal, doneSignal)).start();
- doSomethingElse(); // don't let run yet
- startSignal.countDown(); // let all threads proceed
- doSomethingElse();
- doneSignal.await(); // wait for all to finish
- }
- }
- class Worker implements Runnable {
- private final CountDownLatch startSignal;
- private final CountDownLatch doneSignal;
- Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
- this.startSignal = startSignal;
- this.doneSignal = doneSignal;
- }
- public void run() {
- try {
- startSignal.await();
- doWork();
- doneSignal.countDown();
- } catch (InterruptedException ex) {} // return;
- }
- void doWork() { ... }
- }
當(dāng)CountDownLatch(1)的時(shí)候,它就好比是個(gè)信號(hào)槍了。
4.2 CyclicBarrier
- new CyclicBarrier(N,
- new Runnable() {
- public void run() {
- mergeRows(...);
- }
- });
這個(gè)同步輔助類,它讓多個(gè)線程可以在多個(gè)屏障點(diǎn)進(jìn)行等待,所以叫cyclic,而且有個(gè)附加選擇你可以在線程到達(dá)屏障點(diǎn)后執(zhí)行一個(gè)任務(wù)(在釋放其他線程之前)
為了幫助你理解,假設(shè)一個(gè)場(chǎng)景。
有一個(gè)任務(wù),A、B、C分別從三個(gè)倉庫(甲乙丙)搬運(yùn)不同3個(gè)不同的零件到客戶X的公司,然后再一起組裝機(jī)器,完成后一起坐車去公司總部。
這個(gè)任務(wù)需要ABC三個(gè)線程同時(shí)進(jìn)行,但是由于從倉庫到客戶X那邊距離不等、交通狀態(tài)未知的情況下,所花費(fèi)的時(shí)間也不等。同時(shí)由于三個(gè)人負(fù)責(zé)的零件不同,所以安裝機(jī)器的時(shí)候花費(fèi)時(shí)間也不一樣。這個(gè)場(chǎng)景中有兩個(gè)需要線程間等待的地方。CyclicBarrier就可以閃亮登場(chǎng)了。
- public class Main3 {
- public static void main(String[] args) {
- CyclicBarrier barrier = new CyclicBarrier(3,new Runnable() {
- @Override
- public void run() {
- System.out.println("到達(dá)公共屏障點(diǎn)");
- }
- });
- ExecutorService es = Executors.newCachedThreadPool();
- es.submit(new Worker("A", 5000, 8000, barrier));
- es.submit(new Worker("B", 2000, 16000, barrier));
- es.submit(new Worker("C", 9000, 2000, barrier));
- es.shutdown();
- }
- static class Worker implements Runnable {
- String name;
- int t1;// 搬運(yùn)零件所需要的時(shí)間
- int t2;// 參與組裝工作需要的時(shí)間
- CyclicBarrier barrier;
- public Worker(String name, int t1, int t2, CyclicBarrier barrier) {
- super();
- this.name = name;
- this.t1 = t1;
- this.t2 = t2;
- this.barrier = barrier;
- }
- @Override
- public void run() {
- try {
- print(name + " 開始搬運(yùn)零件");
- Thread.sleep(t1);// 模擬搬運(yùn)時(shí)間
- print(name + " 到達(dá)目的地");
- int a = barrier.await(); // 等待其他人
- if(a==0){
- //說明是最后一個(gè)到的可以執(zhí)行特殊操作
- }
- print(name + " 開始組裝機(jī)器");
- Thread.sleep(t2);// 模擬組裝時(shí)間.
- print(name + " 完成組裝機(jī)器");
- barrier.await(); // 等待其他人組裝完畢
- print(name + " 一起回總公司");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
- static void print(String x) {
- System.out.println( sdf.format(new Date()) + ": "+x);
- }
- }
4.3 Semaphore
一個(gè)經(jīng)典的信號(hào)量計(jì)數(shù)器。一般被用來控制對(duì)共享資源同時(shí)訪問線程數(shù)量的控制。
特殊情況下信號(hào)量設(shè)置為1,那么就類似互斥鎖的功能。
此類的構(gòu)造方法可選地接受一個(gè)公平 參數(shù)。當(dāng)設(shè)置為 false 時(shí),此類不對(duì)線程獲取鎖的順序做任何保證。和之前提到的爭(zhēng)用獲取順序一樣,在非公平模式下,系統(tǒng)將獲得更好的吞吐量,jvm也會(huì)保證在非公平模式下讓所有線程得到訪問機(jī)會(huì)。
【編輯推薦】