【技術(shù)革命】JDK21虛擬線(xiàn)程來(lái)襲,讓系統(tǒng)的吞吐量翻倍!
1. 虛擬線(xiàn)程簡(jiǎn)介
虛擬線(xiàn)程是一種輕量級(jí)線(xiàn)程,可大大減少編寫(xiě)、維護(hù)和觀察高吞吐量并發(fā)應(yīng)用程序的工作量。從JDK19開(kāi)始發(fā)布了虛擬線(xiàn)程的預(yù)覽功能,直到JDK21最終確定虛擬線(xiàn)程。
虛擬線(xiàn)程既廉價(jià)(相比平臺(tái)線(xiàn)程)又可以創(chuàng)建非常的多,因此絕不應(yīng)池化:每個(gè)應(yīng)用任務(wù)都應(yīng)創(chuàng)建一個(gè)新的虛擬線(xiàn)程。因此,大多數(shù)虛擬線(xiàn)程的壽命都很短,調(diào)用堆棧也很淺,只需執(zhí)行一次 HTTP 客戶(hù)端調(diào)用或一次 JDBC 查詢(xún)。相比之下,平臺(tái)線(xiàn)程重量級(jí)、成本高,因此通常必須池化。這些線(xiàn)程的壽命往往較長(zhǎng),具有較深的調(diào)用堆棧,可在多個(gè)任務(wù)之間共享。
總之,虛擬線(xiàn)程保留了可靠的每請(qǐng)求線(xiàn)程風(fēng)格,這種風(fēng)格與 Java 平臺(tái)的設(shè)計(jì)相協(xié)調(diào),同時(shí)還能優(yōu)化利用可用硬件。使用虛擬線(xiàn)程不需要學(xué)習(xí)新的概念,但可能需要放棄為應(yīng)對(duì)當(dāng)前線(xiàn)程的高成本而養(yǎng)成的習(xí)慣。虛擬線(xiàn)程不僅能幫助應(yīng)用程序開(kāi)發(fā)人員,還能幫助框架設(shè)計(jì)人員提供易于使用的 API,這些 API 與平臺(tái)設(shè)計(jì)兼容,同時(shí)又不影響可擴(kuò)展性。
虛擬線(xiàn)程是 java.lang.Thread 的一個(gè)實(shí)例,它在底層操作系統(tǒng)線(xiàn)程上運(yùn)行 Java 代碼,但在代碼的整個(gè)生命周期中不會(huì)捕獲操作系統(tǒng)線(xiàn)程。這意味著許多虛擬線(xiàn)程可以在同一個(gè)操作系統(tǒng)線(xiàn)程上運(yùn)行 Java 代碼,從而有效地共享操作系統(tǒng)線(xiàn)程。平臺(tái)線(xiàn)程會(huì)壟斷寶貴的操作系統(tǒng)線(xiàn)程,而虛擬線(xiàn)程不會(huì)。虛擬線(xiàn)程的數(shù)量可能遠(yuǎn)遠(yuǎn)大于操作系統(tǒng)線(xiàn)程的數(shù)量。
虛擬線(xiàn)程是線(xiàn)程的一種輕量級(jí)實(shí)現(xiàn),由 JDK 而不是操作系統(tǒng)提供。它們是用戶(hù)模式線(xiàn)程的一種形式,在其他多線(xiàn)程語(yǔ)言(如 Go 中的 goroutines(協(xié)程(輕量級(jí)線(xiàn)程)) 和 Erlang 中的進(jìn)程)中取得了成功。用戶(hù)模式線(xiàn)程在 Java 早期版本中甚至被稱(chēng)為 "綠色線(xiàn)程",當(dāng)時(shí)操作系統(tǒng)線(xiàn)程尚未成熟和普及。然而,Java 的綠色線(xiàn)程都共享一個(gè)操作系統(tǒng)線(xiàn)程(M:1 調(diào)度),最終被作為操作系統(tǒng)線(xiàn)程包裝器(1:1 調(diào)度)實(shí)現(xiàn)的平臺(tái)線(xiàn)程所超越。虛擬線(xiàn)程采用 M:N 調(diào)度,即大量(M)虛擬線(xiàn)程被安排運(yùn)行在較少數(shù)量(N)的操作系統(tǒng)線(xiàn)程上。
虛擬線(xiàn)程是 java.lang.Thread 的一個(gè)實(shí)例,與特定操作系統(tǒng)線(xiàn)程無(wú)關(guān)。相比之下,平臺(tái)線(xiàn)程是以傳統(tǒng)方式實(shí)現(xiàn)的 java.lang.Thread 實(shí)例,是操作系統(tǒng)線(xiàn)程的薄包裝。
2. 傳統(tǒng)請(qǐng)求線(xiàn)程模型
通常服務(wù)器應(yīng)用程序處理相互獨(dú)立的并發(fā)請(qǐng)求時(shí),在請(qǐng)求的整個(gè)持續(xù)聲明周期內(nèi)為該請(qǐng)求指定一個(gè)線(xiàn)程來(lái)處理該請(qǐng)求。這種按請(qǐng)求線(xiàn)程的風(fēng)格易于理解、易于編程、易于調(diào)試和配置。
對(duì)于一個(gè)請(qǐng)求處理的處理時(shí)間,應(yīng)用程序同時(shí)處理的請(qǐng)求數(shù)(即并發(fā)數(shù))必須與吞吐量成比例增長(zhǎng)。例如,假設(shè)一個(gè)平均延遲為 50 毫秒的請(qǐng)求并發(fā)處理 10 個(gè)請(qǐng)求,實(shí)現(xiàn)了每秒 200 個(gè)請(qǐng)求的吞吐量。若要將該應(yīng)用的吞吐量提高到到每秒 2000 個(gè)請(qǐng)求,則需要并發(fā)處理 100 個(gè)請(qǐng)求。如果每個(gè)請(qǐng)求在請(qǐng)求持續(xù)時(shí)間內(nèi)都由一個(gè)線(xiàn)程處理,那么要使應(yīng)用程序跟上進(jìn)度,線(xiàn)程數(shù)必須隨著吞吐量的增加而增加。
由于 JDK 將線(xiàn)程作為操作系統(tǒng)(OS)線(xiàn)程的包裝器來(lái)實(shí)現(xiàn)。操作系統(tǒng)線(xiàn)程的成本很高,所以我們不能擁有太多的線(xiàn)程,這就使得線(xiàn)程的實(shí)現(xiàn)不適合按請(qǐng)求執(zhí)行的方式。如果每個(gè)請(qǐng)求在其生命周期內(nèi)都要使用一個(gè)線(xiàn)程,也就是一個(gè)操作系統(tǒng)線(xiàn)程,那么在 CPU 或網(wǎng)絡(luò)連接等其他資源耗盡之前,線(xiàn)程數(shù)量往往就已經(jīng)成為限制因素了。JDK 當(dāng)前的線(xiàn)程實(shí)現(xiàn)將應(yīng)用程序的吞吐量限制在遠(yuǎn)低于硬件支持的水平。即使對(duì)線(xiàn)程進(jìn)行了池化,也會(huì)出現(xiàn)這種情況,因?yàn)槌鼗兄诒苊鈫?dòng)新線(xiàn)程的高昂成本,但不會(huì)增加線(xiàn)程總數(shù)。
3. 虛擬線(xiàn)程使用
使用方式1:
// 創(chuàng)建一個(gè)執(zhí)行器,為每個(gè)任務(wù)啟動(dòng)一個(gè)新的虛擬線(xiàn)程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 10_000).forEach(i -> {
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return i;
});
});
}
本例中的任務(wù)是簡(jiǎn)單的代碼--休眠1秒--現(xiàn)代硬件可以輕松支持 10,000 個(gè)虛擬線(xiàn)程同時(shí)運(yùn)行此類(lèi)代碼。而實(shí)際上,JDK 只在少量操作系統(tǒng)線(xiàn)程(可能只有一個(gè))上運(yùn)行此代碼代碼。
如果該程序使用 ExecutorService(例如 Executors.newCachedThreadPool())為每個(gè)任務(wù)創(chuàng)建一個(gè)新的平臺(tái)線(xiàn)程,情況就會(huì)截然不同。ExecutorService 會(huì)嘗試創(chuàng)建 10,000 個(gè)平臺(tái)線(xiàn)程,從而創(chuàng)建 10,000 個(gè)操作系統(tǒng)線(xiàn)程,根據(jù)機(jī)器和操作系統(tǒng)的不同,程序可能會(huì)崩潰。
即便使用Executors.newFixedThreadPool(200)創(chuàng)建固定數(shù)量的線(xiàn)程,情況也不會(huì)好到哪里去。ExecutorService 將創(chuàng)建 200 個(gè)平臺(tái)線(xiàn)程,供所有 10,000 個(gè)任務(wù)共享,因此許多任務(wù)將順序運(yùn)行而非并發(fā)運(yùn)行,程序?qū)⑿枰荛L(zhǎng)時(shí)間才能完成。對(duì)于該程序而言,擁有 200 個(gè)平臺(tái)線(xiàn)程的池每秒只能完成 200 個(gè)任務(wù),而虛擬線(xiàn)程每秒可完成約 10,000 個(gè)任務(wù)(經(jīng)過(guò)充分預(yù)熱后)。此外,如果將示例程序中的 10_000 改為 1_000_000,那么程序?qū)⑻峤?1,000,000 個(gè)任務(wù),創(chuàng)建 1,000,000 個(gè)虛擬線(xiàn)程并發(fā)運(yùn)行,(充分預(yù)熱后)吞吐量將達(dá)到每秒約 1,000,000 個(gè)任務(wù)。
注意:如果程序中的任務(wù)在一秒鐘內(nèi)執(zhí)行計(jì)算(例如對(duì)一個(gè)巨大的數(shù)組進(jìn)行排序),而不僅僅是休眠,那么增加線(xiàn)程數(shù)超過(guò)處理器內(nèi)核數(shù)將無(wú)濟(jì)于事,無(wú)論它們是虛擬線(xiàn)程還是平臺(tái)線(xiàn)程。虛擬線(xiàn)程不是更快的線(xiàn)程--它們運(yùn)行代碼的速度并不比平臺(tái)線(xiàn)程快。它們的存在是為了提供規(guī)模(更高的吞吐量),而不是速度(更低的延遲)。虛擬線(xiàn)程的數(shù)量可能比平臺(tái)線(xiàn)程多得多,因此根據(jù)利特爾定律,虛擬線(xiàn)程可以提供更高吞吐量所需的更高并發(fā)性。
使用方式2:
手動(dòng)創(chuàng)建虛擬線(xiàn)程
// 創(chuàng)建虛擬線(xiàn)程
OfVirtual virtual = Thread.ofVirtual().name("pack") ;
virtual.start(() -> {
System.out.printf("%s - 任務(wù)執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 創(chuàng)建不自動(dòng)啟動(dòng)的線(xiàn)程
Thread thread = virtual.unstarted(() -> {
System.out.printf("%s - 任務(wù)執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 手動(dòng)啟動(dòng)虛擬線(xiàn)程
thread.start() ;
// 打印線(xiàn)程對(duì)象:VirtualThread[#21,pack]/runnable
System.out.println(thread) ;
// 創(chuàng)建普通線(xiàn)程
OfPlatform platform = Thread.ofPlatform().name("pack") ;
Thread thread = platform.start(() -> {
System.out.printf("%s - 任務(wù)執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 這里輸出:Thread[#21,pack,5,main]
System.out.println(thread) ;
在上面的代碼中,打印thread輸出的不是對(duì)應(yīng)的平臺(tái)線(xiàn)程,而是虛擬線(xiàn)程
VirtualThread[#21,pack]/runnable
在執(zhí)行的任務(wù)中通過(guò)Thread.currentThread().getName()方法是沒(méi)有任何信息,我們可以通過(guò)上面的name()方法來(lái)設(shè)置線(xiàn)程的名稱(chēng)及相關(guān)的前綴。如下:
Thread.ofPlatform().name("pack") ;
Thread.ofVirtual().name("pack", 0) ;
使用方式3:
通過(guò)ThreadFactory工廠創(chuàng)建
ThreadFactory threadFactory = Thread.ofVirtual().factory() ;
threadFactory.newThread(() -> {
System.out.printf("%s - 任務(wù)執(zhí)行完成", Thread.currentThread().getName()) ;
}).start() ;
使用方式4:
直接通過(guò)Thread靜態(tài)方法
Thread.startVirtualThread(() -> {
System.out.printf("%s - 任務(wù)執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
4. 虛擬線(xiàn)程與傳統(tǒng)線(xiàn)程池對(duì)比
使用虛擬線(xiàn)程
public class Demo06 {
static class Task implements Runnable {
@Override
public void run() {
System.err.printf("start - %d%n", System.currentTimeMillis()) ;
try {
Thread.sleep(Duration.ofSeconds(1));
} catch (InterruptedException e) {}
System.err.printf(" end - %d%n", System.currentTimeMillis()) ;
}
}
public static void main(String[] args) throws Exception {
ExecutorService es= Executors.newVirtualThreadPerTaskExecutor() ;
es.submit(new Task()) ;
es.submit(new Task()) ;
es.submit(new Task()) ;
System.in.read() ;
}
}
輸出結(jié)果:
start - 1698827467289
start - 1698827467289
start - 1698827467291
end - 1698827468317
end - 1698827468317
end - 1698827468317
從結(jié)果看出,基本是同時(shí)開(kāi)始,結(jié)束也是基本一起結(jié)束,總耗時(shí)1s。
使用傳統(tǒng)線(xiàn)程
任務(wù)都一樣,只是創(chuàng)建線(xiàn)程池的類(lèi)型修改
public static void main(String[] args) throws Exception {
ExecutorService es= Executors.newFixedThreadPool(1) ;
es.submit(new Task()) ;
es.submit(new Task()) ;
es.submit(new Task()) ;
}
輸出結(jié)果:
start - 1698827686133
end - 1698827687165
start - 1698827687165
end - 1698827688177
start - 1698827688177
end - 1698827689178
從結(jié)果知道這里是一個(gè)任務(wù)一個(gè)任務(wù)的執(zhí)行串行化,但是你注意觀察,其實(shí)每個(gè)任務(wù)的的開(kāi)始start 的輸出都是要等前一個(gè)線(xiàn)程執(zhí)行完了后才能執(zhí)行。結(jié)合上面的虛擬線(xiàn)程對(duì)比,start是同時(shí)輸出的,這也是虛擬線(xiàn)程的有點(diǎn)了。
5. 使用案例
這是一個(gè)遠(yuǎn)程接口調(diào)用的示例:
遠(yuǎn)程3個(gè)接口,如下:
@GetMapping("/userinfo")
public Object queryUserInfo() {
try {
TimeUnit.SECONDS.sleep(2) ;
} catch (InterruptedException e) {e.printStackTrace();}
return "查詢(xún)用戶(hù)信息" ;
}
@GetMapping("/stock")
public Object queryStock() {
try {
TimeUnit.SECONDS.sleep(3) ;
} catch (InterruptedException e) {e.printStackTrace();}
return "查詢(xún)庫(kù)存信息" ;
}
@GetMapping("/order")
public Object queryOrder() {
try {
TimeUnit.SECONDS.sleep(4) ;
} catch (InterruptedException e) {e.printStackTrace();}
return "查詢(xún)訂單信息" ;
}
接口調(diào)用服務(wù),如下:
@Resource
private RestTemplate restTemplate ;
public Map<String, Object> rpc() {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var start = System.currentTimeMillis() ;
// 1.查詢(xún)用戶(hù)信息
var userinfo = executor.submit(() -> query("http://localhost:8080/demos/userinfo"));
// 2.查詢(xún)庫(kù)存信息
var stock = executor.submit(() -> query("http://localhost:8080/demos/stock"));
// 3.查詢(xún)訂單信息
var order = executor.submit(() -> query("http://localhost:8080/demos/order"));
Map<String, Object> res = Map.of("userinfo", userinfo.get(), "stock", stock.get(), "order", order.get()) ;
System.out.printf("總計(jì)耗時(shí):%d毫秒%n", (System.currentTimeMillis() - start)) ;
return res ;
} catch (Exception e) {
return Map.of() ;
}
}
private Object query(String url) {
return this.restTemplate.getForObject(url, String.class) ;
}
在這個(gè)案例中,如果使用傳統(tǒng)的線(xiàn)程池,如果并發(fā)量大,那么很可能很多的任務(wù)都要排隊(duì)等待,或者你需要?jiǎng)?chuàng)建更多的平臺(tái)線(xiàn)程來(lái)滿(mǎn)足吞吐量問(wèn)題。但是現(xiàn)在有了虛擬線(xiàn)程你可以不用再考慮線(xiàn)程不夠用的情況了,每個(gè)任務(wù)的執(zhí)行都會(huì)被一個(gè)虛擬的線(xiàn)程執(zhí)行(不是平臺(tái)線(xiàn)程,可能這些虛擬線(xiàn)程只會(huì)對(duì)應(yīng)到一個(gè)平臺(tái)線(xiàn)程)。
虛擬線(xiàn)程可在以下情況顯著提高應(yīng)用吞吐量:
- 并發(fā)任務(wù)的數(shù)量很高(超過(guò)幾千)
- 工作負(fù)載不受cpu限制,因?yàn)樵谶@種情況下,線(xiàn)程比處理器內(nèi)核多并不能提高吞吐量
6. 結(jié)構(gòu)化并發(fā)(預(yù)覽功能)
結(jié)構(gòu)化并發(fā)目前還是預(yù)覽功能,并沒(méi)有在JDK21中正式發(fā)布,不過(guò)我們可以先來(lái)看看什么是結(jié)構(gòu)化并發(fā)。
結(jié)構(gòu)化并發(fā) API 是來(lái)簡(jiǎn)化并發(fā)編程。結(jié)構(gòu)化并發(fā)將在不同線(xiàn)程中運(yùn)行的一組相關(guān)任務(wù)視為一個(gè)工作單元,從而簡(jiǎn)化了錯(cuò)誤處理和取消,提高了可靠性,并增強(qiáng)了可觀察性。
結(jié)構(gòu)化并發(fā)的目標(biāo)是:
- 推廣一種并發(fā)編程風(fēng)格,消除因取消和關(guān)閉而產(chǎn)生的常見(jiàn)風(fēng)險(xiǎn),如線(xiàn)程泄漏和取消延遲。
- 提高并發(fā)代碼的可觀察性。
通過(guò)示例來(lái)理解結(jié)構(gòu)化并發(fā)。
如下示例是通過(guò)傳統(tǒng)線(xiàn)程池的方式并發(fā)的從遠(yuǎn)程獲取信息,代碼如下:
static RestTemplate restTemplate = new RestTemplate() ;
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(2) ;
Future<Object> userInfo = es.submit(UnstructuredConcurrentDemo::queryUserInfo) ;
Future<Object> stock = es.submit(UnstructuredConcurrentDemo::queryStock) ;
Object userInfoRet = userInfo.get() ;
System.out.printf("執(zhí)行結(jié)果:用戶(hù)信息:%s%n", userInfoRet.toString()) ;
Object stockRet = stock.get() ;
System.out.printf("執(zhí)行結(jié)果:庫(kù)存信息:%s%n", stockRet.toString()) ;
}
public static Object queryUserInfo() {
return restTemplate.getForObject("http://localhost:8080/demos/userinfo", String.class) ;
}
public static Object queryStock() {
return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;
}
上面的代碼中沒(méi)有什么問(wèn)題,程序都能夠運(yùn)行的正常,結(jié)果如下:
08:49:53.502 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
08:49:53.504 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結(jié)果:用戶(hù)信息:查詢(xún)用戶(hù)信息
08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結(jié)果:庫(kù)存信息:查詢(xún)庫(kù)存信息
但是如果其中一個(gè)任務(wù)執(zhí)行失敗了后會(huì)如何呢?將其中一個(gè)任務(wù)拋出異常,如下代碼:
public static Object queryStock() {
System.out.println(1 / 0) ;
return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;
}
再次執(zhí)行代碼,結(jié)果如下:
發(fā)生異常:java.lang.ArithmeticException: / by zero
09:06:05.938 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock
09:06:05.948 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]
09:06:08.972 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
09:06:08.974 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結(jié)果:庫(kù)存信息:查詢(xún)庫(kù)存信息
從結(jié)果看出,獲取用戶(hù)信息子任務(wù)發(fā)生異常后,并不會(huì)影響到獲取庫(kù)存子任務(wù)的執(zhí)行。
通過(guò)結(jié)構(gòu)化并發(fā)方式
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<Object> userInfo = scope.fork(UnstructuredConcurrentDemo::queryUserInfo) ;
Supplier<Object> stock = scope.fork(UnstructuredConcurrentDemo::queryStock) ;
// 等待在此任務(wù)范圍內(nèi)啟動(dòng)的所有子任務(wù)完成或某個(gè)子任務(wù)失敗。
scope.join() ;
Object userInfoRet = userInfo.get() ;
System.out.printf("執(zhí)行結(jié)果:用戶(hù)信息:%s%n", userInfoRet.toString()) ;
Object stockRet = stock.get() ;
System.out.printf("執(zhí)行結(jié)果:庫(kù)存信息:%s%n", stockRet.toString()) ;
}
當(dāng)一個(gè)子任務(wù)發(fā)生錯(cuò)誤時(shí),其它的子任務(wù)會(huì)在未完成的情況下取消,執(zhí)行結(jié)果如下:
08:59:51.951 [] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock
08:59:51.961 [] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]
Exception in thread "main" java.lang.IllegalStateException: Subtask not completed or did not complete successfully
at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.get(StructuredTaskScope.java:936)
at com.pack.rpc.UnstructuredConcurrentDemo.structured(UnstructuredConcurrentDemo.java:26)
at com.pack.rpc.UnstructuredConcurrentDemo.main(UnstructuredConcurrentDemo.java:17)
從控制臺(tái)的輸出看出,獲取庫(kù)存的調(diào)用被取消了。
完畢?。?!