深入解析 Guava Cache- 從基本用法、回收策略、刷新策略到實(shí)現(xiàn)原理
Guava Cache 是非常強(qiáng)大的本地緩存工具,提供了非常簡單 API 供開發(fā)者使用。
這篇文章,我們將詳細(xì)介紹 Guava Cache 的基本用法、回收策略,刷新策略,實(shí)現(xiàn)原理。
圖片
1.基本用法
1.1 依賴配置
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>1.2 創(chuàng)建緩存
Guava Cache 提供了基于 Builder 構(gòu)建者模式的構(gòu)造器,用戶只需要根據(jù)需求設(shè)置好各種參數(shù)即可使用。
(1)手工創(chuàng)建緩存對象
@Test
public void testHandCache() {
// 測試手工測試
Cache<String, String> cache = CacheBuilder.newBuilder().
// 最大容量為20(基于容量進(jìn)行回收)
maximumSize(20)
// 配置寫入后多久未更新,緩存會過期
.expireAfterWrite(10, TimeUnit.SECONDS).build();
cache.put("hello", "value_HELLO");
assertEquals("value_HELLO", cache.getIfPresent("hello"));
Thread.sleep(10000);
// 過期后重新獲取
assertNull(cache.getIfPresent("hello"));
}我們可以創(chuàng)建一個(gè)緩存對象 Cache ,通過 CacheBuilder 構(gòu)造器,配置相關(guān)參數(shù)(最大容量 20 個(gè)條目、緩存過期時(shí)間 10 秒),最后調(diào)用構(gòu)建方法。
(2)創(chuàng)建緩存加載器
CacheLoader 可以理解為一個(gè)固定的加載器,在創(chuàng)建 Cache 對象時(shí)指定,然后簡單地重寫 V load(K key) throws Exception 方法,就可以達(dá)到當(dāng)檢索不存在的時(shí)候,會自動的加載數(shù)據(jù)。
@Test
public void testLoadingCache() throws InterruptedException, ExecutionException {
CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
//自動寫緩存數(shù)據(jù)的方法
@Override
public String load(String key) {
System.out.println("加載 key:" + key);
return"value_" + key.toUpperCase();
}
@Override
//重新刷新緩存
public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
returnsuper.reload(key, oldValue);
}
};
LoadingCache<String, String> cache =
CacheBuilder.newBuilder()
// 最大容量為100(基于容量進(jìn)行回收)
.maximumSize(20)
// 配置寫入后多久未更新,緩存會過期
.expireAfterWrite(10, TimeUnit.SECONDS)
//配置寫入后多久刷新緩存
.refreshAfterWrite(1, TimeUnit.SECONDS).build(cacheLoader);
assertEquals(0, cache.size());
assertEquals("value_HELLO", cache.getUnchecked("hello"));
assertEquals(1, cache.size());
// 通過 Callable 獲取數(shù)據(jù)
String key = "mykey";
String value = cache.get(key, new Callable<String>() {
@Override
public String call() throws Exception {
return"call_" + key;
}
});
System.out.println("call value:" + value);
}和手工創(chuàng)建緩存對象不同,我們首先創(chuàng)建緩存加載器對象,并重寫 load 方法,然后通過緩存構(gòu)造器創(chuàng)建 LoadingCache 對象 ,該對象支持寫入后刷新方法。
同時(shí) LoadingCache 對象支持 Callable 模式,也就是調(diào)用 get 方法時(shí),可以傳入 Callable 對象。這樣可以在使用緩存時(shí),更加靈活。
2.回收策略
Guava Cache 提供了三種基本的緩存回收方式:
- 基于容量回收策略
- 基于時(shí)間的回收策略
- 基于引用回收策略
2.1 基于容量回收策略
基于容量的回收策略可以分為兩種:基于大小和基于權(quán)重。
基于大小:我們可以使用 maximumSize 方法設(shè)置最大緩存項(xiàng)數(shù)量,當(dāng)緩存項(xiàng)數(shù)量達(dá)到設(shè)定的最大值時(shí),舊的緩存項(xiàng)將會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
.maximumSize(100)
.build();基于權(quán)重:如果不同的緩存值,需要占據(jù)不同的內(nèi)存空間,也就是不同的緩存項(xiàng)有不同的“權(quán)重”(weights)。
我們可以使用 CacheBuilder.weigher(Weigher) 指定一個(gè)權(quán)重函數(shù),并且用 maximumWeight(long) 指定最大總重。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
.maximumWeight(1000)
.weigher(new Weigher<Object, Object>() {
public int weigh(Object key, Object value) {
// 定義權(quán)重計(jì)算方法
return value.size();
}
}).build();2.2 基于時(shí)間的回收策略
我們可以使用 expireAfterAccess 和 expireAfterWrite 方法設(shè)置緩存項(xiàng)的最大存活時(shí)間。
expireAfterAccess表示緩存項(xiàng)在給定時(shí)間內(nèi)沒有被讀/寫訪問會過期。expireAfterWrite表示緩存項(xiàng)在被創(chuàng)建或最后一次更新后的指定時(shí)間內(nèi)會過期。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
// 10分鐘沒有訪問后會被回收,或者重新加載
.expireAfterAccess(10, TimeUnit.MINUTES)
// 5分鐘沒有更新,緩存會被回收,或者重新加載
// .expireAfterWrite(5,TimeUnit.MINUTES)
.build();2.3 基于引用回收策略
Guava Cache 提供了以下三個(gè)方法來配置基于引用的回收策略:
- weakKeys() 方法:
通過調(diào)用weakKeys()方法,可以使緩存中的鍵使用弱引用。這意味著如果某個(gè)鍵沒有其他強(qiáng)引用指向它,那么該鍵可能會被垃圾回收,并且相應(yīng)的緩存項(xiàng)也會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
.weakKeys()
.build();- weakValues() 方法:
通過調(diào)用weakValues()方法,可以使緩存中的值使用弱引用。這樣,如果某個(gè)值沒有其他強(qiáng)引用指向它,那么該值可能會被垃圾回收,相應(yīng)的緩存項(xiàng)也會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
.weakValues()
.build();- softValues() 方法:
通過調(diào)用softValues()方法,可以使緩存中的值使用軟引用。軟引用相對于弱引用,更傾向于在內(nèi)存不足時(shí)被垃圾回收。如果某個(gè)值沒有其他強(qiáng)引用指向它,且內(nèi)存不足時(shí),該值可能會被垃圾回收,相應(yīng)的緩存項(xiàng)也會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
.softValues()
.build();一般來講,我們在生產(chǎn)環(huán)境使用的是(基于容量回收策略 + 基于時(shí)間的回收策略)兩者配合來使用。
當(dāng)然 ,我們同樣可以使用手工回收的方式。
Cache<String,String> cache = CacheBuilder.newBuilder().build();
Object value = new Object();
cache.put("key1","value1");
cache.put("key2","value2");
cache.put("key3","value3");
//1.清除指定的key
cache.invalidate("key1");
//2.批量清除list中全部key對應(yīng)的記錄
List<String> list = new ArrayList<String>();
list.add("key1");
list.add("key2");
cache.invalidateAll(list);3.刷新策略
3.1 手工刷新
我們可以強(qiáng)制緩存加載器重新加載鍵的新值,調(diào)用 LoadingCache 對象的刷新方法。
String value = loadingCache.get("key");
loadingCache.refresh("key");3.2 自動刷新
Guava Cache 提供了刷新(refresh)機(jī)制,可以通過 refreshAfterWrite 方法來設(shè)置刷新時(shí)間,當(dāng)緩存項(xiàng)過期的同時(shí)可以重新加載新值。
Cache<String, String> cache = CacheBuilder.newBuilder()
.refreshAfterWrite(5, TimeUnit.MINUTES)
// 設(shè)置并發(fā)級別為3,并發(fā)級別是指可以同時(shí)寫緩存的線程數(shù)
.concurrencyLevel(3)
.build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
// 異步加載新值的邏輯
return fetchDataFromDataSource(key);
}
});
// 在獲取緩存值時(shí),如果緩存項(xiàng)過期,將返回舊值
String value = cache.get("exampleKey");配置刷新方法 refreshAfterWrite,當(dāng)大量線程同時(shí)訪問緩存項(xiàng),緩存已過期時(shí),更新線程調(diào)用 load 方法更新該緩存,其他請求線程并不需要等待,框架直接返回該緩存項(xiàng)的舊值。
因?yàn)楦戮€程同時(shí)也是請求線程,所以在上面的示例代碼里面,刷新緩存是個(gè)同步操作,可不可以異步的加載緩存呢 ?
我們有兩種方式:異步加載緩存的原理是重寫 reload 方法。
@Test
public void testAnsynRefreshMethod1() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
//自動寫緩存數(shù)據(jù)的方法
@Override
public String load(String key) {
System.out.println(Thread.currentThread().getName() + " 加載 key:" + key);
// 從數(shù)據(jù)庫加載數(shù)據(jù)
return"value_" + key.toUpperCase();
}
@Override
//異步刷新緩存
public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
ListenableFutureTask<String> futureTask = ListenableFutureTask.create(() -> {
System.out.println(Thread.currentThread().getName() + " 異步加載 key:" + key + " oldValue:" + oldValue);
Thread.sleep(1000);
return load(key);
});
executorService.submit(futureTask);
return futureTask;
}
};
LoadingCache<String, String> cache = CacheBuilder.newBuilder()
// 最大容量為20(基于容量進(jìn)行回收)
.maximumSize(20)
//配置寫入后多久刷新緩存
.refreshAfterWrite(2, TimeUnit.SECONDS).build(cacheLoader);
String key = "hello";
// 第一次加載
String value = cache.get(key);
System.out.println(value);
Thread.sleep(3000);
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
String value2 = cache.get(key);
System.out.println(Thread.currentThread().getName() + value2);
// 第二次加載
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(20000);
}或者使用更優(yōu)雅的使用方式:
ExecutorService executorService = Executors.newFixedThreadPool(5);
CacheLoader<String, String> cacheLoader = CacheLoader.asyncReloading(
new CacheLoader<String, String>() {
//自動寫緩存數(shù)據(jù)的方法
@Override
public String load(String key) {
System.out.println(Thread.currentThread().getName() + " 加載 key:" + key);
// 從數(shù)據(jù)庫加載數(shù)據(jù)
return "value_" + key.toUpperCase();
}
} , executorService);自動刷新的缺點(diǎn)是:當(dāng)緩存項(xiàng)到了指定過期時(shí)間,不管是同步刷新還是異步刷新,絕大部分請求線程都會返回舊的數(shù)據(jù)值,緩存值會有一定的延遲效果。
所以一般場景下,使用efreshAfterWrite和 expireAfterWrite配合使用 。
比如說控制緩存每1秒進(jìn)行刷新,如果超過 2s 沒有訪問,那么則讓緩存失效,訪問時(shí)不會得到舊值,而是必須得待新值加載。
4.實(shí)現(xiàn)原理
Guava Cache 的數(shù)據(jù)結(jié)構(gòu)跟 JDK1.7 的 ConcurrentHashMap 類似,如下圖所示:
圖片
4.1 構(gòu)造函數(shù)
public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
CacheLoader<? super K1, V1> loader) {
checkWeightWithWeigher();
return new LocalCache.LocalLoadingCache<>(this, loader);
}通過構(gòu)造器 CacheBuilder 的構(gòu)建方法創(chuàng)建本地緩存類 LocalCache 的靜態(tài)包裝類 LocalLoadingCache對象。
class LocalCache<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> {
// ..... 省略代碼
staticclass LocalLoadingCache<K, V> extends LocalManualCache<K, V>
implements LoadingCache<K, V> {
LocalLoadingCache(
CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) {
super(new LocalCache<K, V>(builder, checkNotNull(loader)));
}
// LoadingCache methods
@Override
public V get(K key) throws ExecutionException {
return localCache.getOrLoad(key);
}
@Override
public V getUnchecked(K key) {
try {
return get(key);
} catch (ExecutionException e) {
thrownew UncheckedExecutionException(e.getCause());
}
}
@Override
public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException {
return localCache.getAll(keys);
}
@Override
public void refresh(K key) {
localCache.refresh(key);
}
// ..... 省略代碼
}
}LocalLoadingCache 類對外暴露了若干方法,它的底層依然是 LocalCache 對象來執(zhí)行相關(guān)緩存操作,LocalCache 本質(zhì)上就是一個(gè) Map 。
4.2 初始化緩存
LocalCache(
CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {
concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);
// key的強(qiáng)度,即引用類型的強(qiáng)弱
keyStrength = builder.getKeyStrength();
// value的強(qiáng)度,即引用類型的強(qiáng)弱
valueStrength = builder.getValueStrength();
// key的比較策略,跟key的引用類型有關(guān)
keyEquivalence = builder.getKeyEquivalence();
// value的比較策略,跟value的引用類型有關(guān)
valueEquivalence = builder.getValueEquivalence();
maxWeight = builder.getMaximumWeight();
weigher = builder.getWeigher();
//訪問后的過期時(shí)間,設(shè)置了expireAfterAccess參數(shù)
expireAfterAccessNanos = builder.getExpireAfterAccessNanos();
//寫入后的過期時(shí)間,設(shè)置了expireAfterWrite參數(shù)
expireAfterWriteNanos = builder.getExpireAfterWriteNanos();
refreshNanos = builder.getRefreshNanos();
int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);
if (evictsBySize() && !customWeigher()) {
initialCapacity = (int) Math.min(initialCapacity, maxWeight);
}
// Find the lowest power-of-two segmentCount that exceeds concurrencyLevel, unless
// maximumSize/Weight is specified in which case ensure that each segment gets at least 10
// entries. The special casing for size-based eviction is only necessary because that eviction
// happens per segment instead of globally, so too many segments compared to the maximum size
// will result in random eviction behavior.
int segmentShift = 0;
int segmentCount = 1;
while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {
++segmentShift;
segmentCount <<= 1;
}
this.segmentShift = 32 - segmentShift;
segmentMask = segmentCount - 1;
this.segments = newSegmentArray(segmentCount);
int segmentCapacity = initialCapacity / segmentCount;
if (segmentCapacity * segmentCount < initialCapacity) {
++segmentCapacity;
}
int segmentSize = 1;
while (segmentSize < segmentCapacity) {
segmentSize <<= 1;
}
if (evictsBySize()) {
// Ensure sum of segment max weights = overall max weights
long maxSegmentWeight = maxWeight / segmentCount + 1;
long remainder = maxWeight % segmentCount;
for (int i = 0; i < this.segments.length; ++i) {
if (i == remainder) {
maxSegmentWeight--;
}
this.segments[i] =
createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
}
} else {
for (int i = 0; i < this.segments.length; ++i) {
this.segments[i] =
createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
}
}
}LocalCache 維護(hù)一個(gè) Segment 數(shù)組,數(shù)組大小滿足如下條件:
- 數(shù)組大小是 2 的冪次 ,并且小于并發(fā)度 concurrencyLevel ;
- 若指定了容量大小,數(shù)組大小乘以 20 要大于緩存權(quán)重 maxWeight (假如設(shè)置容量大小最大值為40,那么 maxWeight 為 40 )。
接下來,我們看看 Segment 類的核心屬性 :
static class Segment<K, V> extends ReentrantLock {
// 存活的元素大小
volatileint count;
// 存活的元素權(quán)重
long totalWeight;
//修改、更新的數(shù)量,用來做弱一致性
int modCount;
//擴(kuò)容用
int threshold;
//存放Entry的數(shù)組,用來存放Entry,使用AtomicReferenceArray是因?yàn)橐肅AS來保證原子性
volatile@Nullable AtomicReferenceArray<ReferenceEntry<K, V>> table;
//如果key是弱引用的話,那么被 GC 回收后,就會放到ReferenceQueue,要根據(jù)這個(gè)queue做一些清理工作
final@Nullable ReferenceQueue<K> keyReferenceQueue;
//如果value是弱引用的話,那么被 GC 回收后,就會放到ReferenceQueue,要根據(jù)這個(gè)queue做一些清理工作
final@Nullable ReferenceQueue<V> valueReferenceQueue;
//記錄哪些entry被訪問,用于accessQueue的更新。
final Queue<ReferenceEntry<K, V>> recencyQueue;
// 讀取次數(shù)計(jì)數(shù)器
final AtomicInteger readCount = new AtomicInteger();
// 如果一個(gè)元素新寫入,則會記到這個(gè)隊(duì)列的尾部,用來做expire
@GuardedBy("this")
final Queue<ReferenceEntry<K, V>> writeQueue;
//讀、寫都會放到這個(gè)隊(duì)列,用來進(jìn)行LRU替換算法
@GuardedBy("this")
final Queue<ReferenceEntry<K, V>> accessQueue;
}ReferenceEntry 有幾種引用類型 :
圖片
下圖展示了 StringEntry 核心屬性 :
圖片
每種 Entry 對象都有 Next 屬性 ,指向下一個(gè) Entry 。對象值 valueReference 默認(rèn)是一個(gè)占位符 unSet ,表示沒有被設(shè)置過值。
4.3 查詢流程
進(jìn)入 LoadingCache 的 get(key) 方法 , 如下代碼所示:
// 1.調(diào)用LoadingCache的getOrLoad
V getOrLoad(K key) throws ExecutionException {
return get(key, defaultLoader);
}
// 2.計(jì)算 key 的哈希值,并判斷位于哪一個(gè)段 Segment,最后通過查詢
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
int hash = hash(checkNotNull(key));
return segmentFor(hash).get(key, hash, loader);
}(1)計(jì)算 key 對應(yīng)的哈希值
int hash(@Nullable Object key) {
int h = keyEquivalence.hash(key);
return rehash(h);
}(2)定位分段 Segment
Segment<K, V> segmentFor(int hash) {
// segmentMask = segmentCount - 1
return segments[(hash >>> segmentShift) & segmentMask];
}第二步驟,和 ConcurrentHashMap 類似,通過哈希值計(jì)算數(shù)據(jù)存儲在哪一個(gè)分段 Segment 。
(3)從定位的分段查詢出對象
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
// 判斷 key、loader 是否為空
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile
// don't call getLiveEntry, which would ignore loading values
// 根據(jù)hash定位到 table 的第一個(gè) Entry
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
// 獲取當(dāng)前時(shí)間
long now = map.ticker.read();
// 獲取當(dāng)前存活的 Value
V value = getLiveValue(e, now);
if (value != null) {
//記錄被訪問過
recordRead(e, now);
//記錄命中率
statsCounter.recordHits(1);
//判斷是否需要刷新,如果需要刷新,那么會去異步刷新,且返回舊值。
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
//如果 Entry 過期了且數(shù)據(jù)還在加載中,則等待直到加載完成。
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
// at this point e is either null or expired;
// 走到這一步表示: 之前沒有寫入過數(shù)據(jù) || 數(shù)據(jù)已經(jīng)過期 || 數(shù)據(jù)不是在加載中。
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
thrownew ExecutionError((Error) cause);
} elseif (cause instanceof RuntimeException) {
thrownew UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();
}
}A 定位第一個(gè)Entry
ReferenceEntry<K, V> getEntry(Object key, int hash) {
for (ReferenceEntry<K, V> e = getFirst(hash); e != null; e = e.getNext()) {
// 判斷哈希值
if (e.getHash() != hash) {
continue;
}
// 判斷key
K entryKey = e.getKey();
if (entryKey == null) {
tryDrainReferenceQueues();
continue;
}
if (map.keyEquivalence.equivalent(key, entryKey)) {
return e;
}
}
returnnull;
}B 從第一個(gè) Entry 獲取存活的值
V getLiveValue(ReferenceEntry<K, V> entry, long now) {
if (entry.getKey() == null) {
tryDrainReferenceQueues();
return null;
}
V value = entry.getValueReference().get();
if (value == null) {
tryDrainReferenceQueues();
return null;
}
if (map.isExpired(entry, now)) {
tryExpireEntries(now);
return null;
}
return value;
}
boolean isExpired(ReferenceEntry<K, V> entry, long now) {
checkNotNull(entry);
// 如果配置了 expireAfterAccess ,比較當(dāng)前時(shí)間和 Entry 的 accessTime 比較
if (expiresAfterAccess() && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
returntrue;
}
// 如果配置了 expireAfterWrite ,比較當(dāng)前時(shí)間和 Entry 的 writeTime 比較
if (expiresAfterWrite() && (now - entry.getWriteTime() >= expireAfterWriteNanos)) {
returntrue;
}
returnfalse;
}假如 Entry 的 key 為空,或者 vlaue 為空,或者過期了,則返回空 。
C 調(diào)度刷新 scheduleRefresh
V scheduleRefresh(
ReferenceEntry<K, V> entry,
K key,
int hash,
V oldValue,
long now,
CacheLoader<? super K, V> loader) {
//1、是否配置了 refreshAfterWrite
//2、用 writeTime 判斷是否達(dá)到刷新的時(shí)間
//3、是否在加載中,如果是則沒必要再進(jìn)行刷新
if (map.refreshes()
&& (now - entry.getWriteTime() > map.refreshNanos)
&& !entry.getValueReference().isLoading()) {
V newValue = refresh(key, hash, loader, true);
if (newValue != null) {
return newValue;
}
}
return oldValue;
}調(diào)度刷新方法會判斷三個(gè)條件 :
- 配置了刷新時(shí)間 refreshAfterWrite
- 當(dāng)前時(shí)間減去 Entry 的寫入時(shí)間大于刷新時(shí)間
- 當(dāng)前 Entry 未處于加載中
當(dāng)滿足了三個(gè)條件之后,調(diào)用 refresh 方法,當(dāng)異步加載成功后,返回新值。
V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
//插入一個(gè) LoadingValueReference ,實(shí)質(zhì)是把對應(yīng)Entry的ValueReference替換為新建的LoadingValueReference
final LoadingValueReference<K, V> loadingValueReference =
insertLoadingValueReference(key, hash, checkTime);
if (loadingValueReference == null) {
returnnull;
}
// 調(diào)用異步加載方法loadAsync
ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
if (result.isDone()) {
try {
return Uninterruptibles.getUninterruptibly(result);
} catch (Throwable t) {
// don't let refresh exceptions propagate; error was already logged
}
}
returnnull;
}首先將 Entry 對象的 ValueReference 包裝為新建的 LoadingValueReference , 表明當(dāng)前對象正在加載中。
LoadingValueReference<K, V> insertLoadingValueReference(
final K key, final int hash, boolean checkTime) {
ReferenceEntry<K, V> e = null;
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// Look for an existing entry.
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// We found an existing entry.
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()
|| (checkTime && (now - e.getWriteTime() < map.refreshNanos))) {
// refresh is a no-op if loading is pending
// if checkTime, we want to check *after* acquiring the lock if refresh still needs
// to be scheduled
returnnull;
}
// continue returning old value while loading
++modCount;
LoadingValueReference<K, V> loadingValueReference =
new LoadingValueReference<>(valueReference);
e.setValueReference(loadingValueReference);
return loadingValueReference;
}
}
++modCount;
LoadingValueReference<K, V> loadingValueReference = new LoadingValueReference<>();
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
return loadingValueReference;
} finally {
unlock();
postWriteCleanup();
}
}接下來,分析異步加載loadAsync方法:
ListenableFuture<V> loadAsync(
final K key,
final int hash,
final LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader) {
final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
loadingFuture.addListener(
new Runnable() {
@Override
public void run() {
try {
getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
loadingValueReference.setException(t);
}
}
},
directExecutor());
return loadingFuture;
}
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
try {
// 記錄耗時(shí)時(shí)間
stopwatch.start();
V previousValue = oldValue.get();
if (previousValue == null) {
V newValue = loader.load(key);
return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
}
ListenableFuture<V> newValue = loader.reload(key, previousValue);
if (newValue == null) {
return Futures.immediateFuture(null);
}
// To avoid a race, make sure the refreshed value is set into loadingValueReference
// *before* returning newValue from the cache query.
return transform(
newValue,
new com.google.common.base.Function<V, V>() {
@Override
public V apply(V newValue) {
LoadingValueReference.this.set(newValue);
return newValue;
}
},
directExecutor());
} catch (Throwable t) {
ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return result;
}
}loadAsync 方法流程:
- 調(diào)用 loadingValueReference 對象的 loadFuture 方法,假如舊數(shù)據(jù)為空值,則同步調(diào)用加載器 loader 的 load 方法 ,并返回包裝了新值的 Future 。
- 假如舊數(shù)據(jù)不為空值,則調(diào)用加載器 loader 的 reload 方法(此處可以重新實(shí)現(xiàn)為異步的方式),經(jīng)過轉(zhuǎn)換操作返回包裝了新值的 Future 。
- 將新的值存儲在 Entry 對象里。
D 查詢/加載 lockedGetOrLoad
如果之前沒有寫入過數(shù)據(jù) 、 數(shù)據(jù)已經(jīng)過期、 數(shù)據(jù)不是在加載中,則會調(diào)用lockedGetOrLoad方法。
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
//用來判斷是否需要?jiǎng)?chuàng)建一個(gè)新的Entry
boolean createNewEntry = true;
//segment上鎖
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
//做一些清理工作
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
//通過key定位entry
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
//找到entry
valueReference = e.getValueReference();
//如果value在加載中則不需要重復(fù)創(chuàng)建entry
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
//value為null說明已經(jīng)過期且被清理掉了
if (value == null) {
//寫通知queue
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
//過期但還沒被清理
} elseif (map.isExpired(e, now)) {
//寫通知queue
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
//其他情況則直接返回value
//來到這步,是不是覺得有點(diǎn)奇怪,我們分析一下:
//進(jìn)入lockedGetOrLoad方法的條件是數(shù)據(jù)已經(jīng)過期 || 數(shù)據(jù)不是在加載中,但是在lock之前都有可能發(fā)生并發(fā),進(jìn)而改變entry的狀態(tài),所以在上面中再次判斷了isLoading和isExpired。所以來到這步說明,原來數(shù)據(jù)是過期的且在加載中,lock的前一刻加載完成了,到了這步就有值了。
return value;
}
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
//創(chuàng)建一個(gè)Entry,且set一個(gè)新的 LoadingValueReference。
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
//同步加載數(shù)據(jù)
if (createNewEntry) {
try {
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}5.總結(jié)
通過解析 Guava Cache 的實(shí)現(xiàn)原理,我們發(fā)現(xiàn) Guava LocalCache 與 ConcurrentHashMap 有以下不同:
- ConcurrentHashMap ”分段控制并發(fā)“是隱式的(實(shí)現(xiàn)中沒有Segment對象),而 LocalCache 是顯式的。
在 JDK 1.8 之后,ConcurrentHashMap 采用synchronized + CAS實(shí)現(xiàn):當(dāng) put 的元素在哈希桶數(shù)組中不存在時(shí),直接 CAS 進(jìn)行寫操作;在發(fā)生哈希沖突的情況下使用 synchronized 鎖定頭節(jié)點(diǎn)。其實(shí)是比分段鎖更細(xì)粒度的鎖實(shí)現(xiàn),只在特定場景下鎖定其中一個(gè)哈希桶,降低鎖的影響范圍。 - Guava Cache 使用 ReferenceEntry 來封裝鍵值對,并且對于值來說,還額外實(shí)現(xiàn)了 ValueReference 引用對象來封裝對應(yīng) Value 對象。
- Guava Cache 支持過期 + 自動 loader 機(jī)制,這也使得其加鎖方式與 ConcurrentHashMap 不同。
- Guava Cache 支持 segment 粒度上支持了 LRU 機(jī)制, 體現(xiàn)在 Segment 上就是 writeQueue 和 accessQueue。
隊(duì)列中的元素按照訪問或者寫時(shí)間排序,新的元素會被添加到隊(duì)列尾部。如果,在隊(duì)列中已經(jīng)存在了該元素,則會先delete掉,然后再尾部添加該節(jié)點(diǎn)。


























