Java并發(fā)編程中高效緩存設(shè)計的哲學(xué)
本文將基于并發(fā)編程和算法中經(jīng)典的哈希取模、鎖分段、 異步化、原子化。這幾個核心設(shè)計理念編寫逐步推演出一個相對高效的緩存工具,希望對你有所啟發(fā)。
基于緩存存儲運算結(jié)果
我們有一批數(shù)據(jù)需要通過運算才能獲得結(jié)果,而每一次運算大約耗時是500ms,所以為了避免重復(fù)運算導(dǎo)致的等待,我們希望對應(yīng)數(shù)據(jù)第一次運算的結(jié)果直接緩存到容器中,后續(xù)線程可直接通過容器獲得結(jié)果:
于是我們就有了第一個版本,利用緩存避免非必要的重復(fù)計算,從而提升程序在單位時間內(nèi)的吞吐量
public class ComputeCache {
public final Map<Integer, Integer> cache = new HashMap<>();
public synchronized int compute(int arg) {
if (cache.containsKey(arg)) {//若存在直接返回結(jié)果
return cache.get(arg);
} else {//若不存在則計算后緩存并返回
int result = doCompute(arg);
cache.put(arg, result);
return result;
}
}
//模擬耗時的計算
private int doCompute(int key) {
ThreadUtil.sleep(500);
return key << 1;
}
public synchronized int size() {
return cache.size();
}
}
我們利用下面這段單元測試來驗證緩存的性能和正確性,這里筆者也簡單介紹一下幾個比較核心的點:
- 聲明本機CPU核心數(shù)+1的線程數(shù)執(zhí)行并發(fā)運算
- 利用倒計時門閂控制線程并發(fā)流程起止,保證準確感知所有運算任務(wù)結(jié)束后,執(zhí)行耗時統(tǒng)計
- 利用容器中最直觀且容易檢查出錯誤的屬性size進行比對判斷我們的緩存是否正確
最終在筆者的機器下5000并發(fā)的耗時大約是26765ms,整體還是不太符合我們的預(yù)期:
//初始化緩存工具
ComputeCache cache = new ComputeCache();
//聲明處理計算密集型任務(wù)的緩存
ExecutorService threadPool = ThreadUtil.newExecutor(Runtime.getRuntime().availableProcessors() + 1);
//執(zhí)行5000次緩存調(diào)用
int size = 5000;
//通過hashSet數(shù)據(jù)比對來判斷容器正確性
ConcurrentHashSet<Integer> set = new ConcurrentHashSet<>();
CountDownLatch countDownLatch = new CountDownLatch(size);
long begin = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
//生成50的隨機數(shù)
int num = RandomUtil.randomInt(50);
//利用并發(fā)安全set完成計數(shù)
set.add(num);
//利用緩存工具進行計算
threadPool.submit(() -> {
try {
int res1 = cache.compute(num);
int res2 = cache.compute(num);
Assert.equals(res1, res2);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
long end = System.currentTimeMillis();
//利用size判斷容器正確性
Assert.equals(set.size(), cache.size());
Console.log("耗時:{}ms", end - begin);
threadPool.shutdownNow();
仔細查看代碼也很很直觀看出原因,上述緩存鎖的粒度是緩存工具這個示例的,這也就意味著單位時間內(nèi)只有一個線程可以操作該緩存工具,這使得在緩存運算初期大量運算任務(wù)必須串行的執(zhí)行:
鎖分段散列減小鎖粒度
所以我們就需要考慮鎖小鎖的粒度,即針對數(shù)值1計算用數(shù)值1的鎖,針對數(shù)值2計算則使用數(shù)值2的鎖,本質(zhì)上就是一種鎖分段的思想:
所以我們就考慮將synchronized關(guān)鍵字去掉,取而代之是使用ConcurrentHashMap這個線程安全的并發(fā)容器,利用其底層的entry級別的鎖來完成鎖的分段:
public final Map<Integer, Integer> cache = new ConcurrentHashMap<>();
public int compute(int arg) {
if (cache.containsKey(arg)) {//若存在直接返回結(jié)果
return cache.get(arg);
} else {//若不存在則計算后緩存并返回
int result = doCompute(arg);
cache.put(arg, result);
return result;
}
}
//模擬耗時的計算
private int doCompute(int key) {
ThreadUtil.sleep(500);
return key << 1;
}
public int size() {
return cache.size();
}
基于之前的單元測試壓測結(jié)果耗時大約是2022ms,相較于第一個版本有了很大的改進,但這還是不符合筆者的預(yù)期,仔細查看我們編寫的緩存可以試想這樣一個場景:
- 線程0希望獲得數(shù)值1的計算結(jié)果,通過contain發(fā)現(xiàn)沒有,調(diào)用doCompute執(zhí)行運算
- 線程1在線程0運算期間也看到數(shù)值1沒有運算結(jié)果,也執(zhí)行運算
因為數(shù)值計算的冪等性保證這種重復(fù)的運算不會帶來不好的結(jié)果,但針對這種耗時運算帶來的重復(fù)阻塞對于吞吐量要求較高的程序來說是非常不希望看到的:
這里補充說明一下,關(guān)于ConcurrentHashMap如果對其工作原理不太了解的讀者可以移步筆者這篇文章:《Java 并發(fā)容器總結(jié)》
異步化提升處理效率
所以我們必須想辦法避免這種非必要的重復(fù)運算,所以我們就必須借助一種手段將那些正在運算過程中的數(shù)值任務(wù)提前暴露,讓其他線程感知從而避免重復(fù)運算。就像Spring的依賴注入一樣,為避免循環(huán)依賴,先提前暴露一個未完全的對象讓被注入的bean提前感知,從而避免重復(fù)加載:
所以筆者這里使用FutureTask將緩存不存在的數(shù)值結(jié)果以任務(wù)維度存入緩存中,避免運算過程中其他線程看到緩存為空執(zhí)行重復(fù)運算:
public int compute(int key) throws ExecutionException, InterruptedException {
FutureTask<Integer> f = cache.get(key);
if (f == null) {//若為空
FutureTask<Integer> futureTask = new FutureTask<>(() -> doCompute(key));
//緩存保證下一個線程看到時直接取出使用
cache.put(key, futureTask);
futureTask.run();
f = futureTask;
}
return f.get();
}
以同樣的單元測試結(jié)果耗時為1321ms,相較于上一個版本也是有著顯著的提升,但這還是不符合我們的預(yù)期。
原子化避免重復(fù)運算
查看我們的緩存代碼可以看到,判空和set存入緩存操作是兩個動作,這種非原子操作依然是存在重復(fù)運算的情況,試想這樣一個場景:
- 線程0查看數(shù)值1沒有緩存結(jié)果
- 線程1在隨后也看到數(shù)值1沒有緩存結(jié)果
- 線程0生成運算任務(wù)提交緩存
- 線程1生成運算任務(wù)提交緩存將線程0結(jié)果覆蓋
雖然雙方可以基于各自的異步任務(wù)獲取正確結(jié)果,但還是存在重復(fù)提交任務(wù)的情況:
于是我們就有了這個最終的版本,即通過ConcurrentHashMap內(nèi)置的原子操作方法putIfAbsent將判空和保存操作以一個原子的維度進行操作,只有putIfAbsent返回null的情況下任務(wù)才能存入緩存并啟動運行,由此避免重復(fù)提交運行和開銷:
public int compute(int key) throws ExecutionException, InterruptedException {
FutureTask<Integer> f = cache.get(key);
if (f == null) {
FutureTask<Integer> futureTask = new FutureTask<>(() -> doCompute(key));
//利用putIfAbsent原子操作添加
f = cache.putIfAbsent(key, futureTask);
//若返回空說明第一次添加,則讓這個任務(wù)啟動,其他線程直接基于緩存中的任務(wù)獲取結(jié)果
if (f == null) {
f = futureTask;
f.run();
}
}
return f.get();
}
這一操作在單元測試的性能表現(xiàn)上差異不是很大,但是這種極致優(yōu)化的設(shè)計理念確是每一個java工程師所必須具備的素質(zhì)。