Cache2k:Guava Cache及Caffeine之外的新選擇

序
本文主要研究一下cache2k這款新型緩存。
示例
Cache<String,String> cache = new Cache2kBuilder<String, String>() {}
                .eternal(true)
                .expireAfterWrite(5, TimeUnit.MINUTES)    // expire/refresh after 5 minutes
                .setupWith(UniversalResiliencePolicy::enable, b -> b // enable resilience policy
                                .resilienceDuration(30, TimeUnit.SECONDS)          // cope with at most 30 seconds
                        // outage before propagating
                        // exceptions
                )
                .refreshAhead(true)                       // keep fresh when expiring
                .loader(k -> expensiveOperation(k))         // auto populating function
                .build();常見問題的解決方案
空值問題
JCache規(guī)范不支持null,所以cache2k默認也不支持,不過可以通過permitNullValues(true)來開啟,這樣子緩存就可以存儲null值。
cache stampede問題
又稱作cache miss storm,指的是高并發(fā)場景緩存同時失效導致大面積回源,cache2k采用的是block的請求方式,避免對同一個key并發(fā)回源。
org/cache2k/core/HeapCache.java。
protected Entry<K, V> getEntryInternal(K key, int hc, int val) {
    if (loader == null) {
      return peekEntryInternal(key, hc, val);
    }
    Entry<K, V> e;
    for (;;) {
      e = lookupOrNewEntry(key, hc, val);
      if (e.hasFreshData(clock)) {
        return e;
      }
      synchronized (e) {
        e.waitForProcessing();
        if (e.hasFreshData(clock)) {
          return e;
        }
        if (e.isGone()) {
          metrics.heapHitButNoRead();
          metrics.goneSpin();
          continue;
        }
        e.startProcessing(Entry.ProcessingState.LOAD, null);
        break;
      }
    }
    boolean finished = false;
    try {
      load(e);
      finished = true;
    } finally {
      e.ensureAbort(finished);
    }
    if (e.getValueOrException() == null && isRejectNullValues()) {
      return null;
    }
    return e;
  }同步回源造成的接口穩(wěn)定性問題
cache2k提供了refreshAhead參數(shù),在新數(shù)據(jù)沒有拉取成功之前,過期數(shù)據(jù)仍然可以訪問,避免請求到來時發(fā)現(xiàn)數(shù)據(jù)過期觸發(fā)同步回源造成接口延時增大問題。不過具體底層還依賴prefetchExecutor,如果refresh的時候沒有足夠的線程可以使用則會立馬過期,等待下次get出發(fā)同步回源。
org/cache2k/core/HeapCache.java。
public void timerEventRefresh(Entry<K, V> e, Object task) {
    metrics.timerEvent();
    synchronized (e) {
      if (e.getTask() != task) { return; }
      try {
        refreshExecutor.execute(createFireAndForgetAction(e, Operations.SINGLETON.refresh));
      } catch (RejectedExecutionException ex) {
        metrics.refreshRejected();
        expireOrScheduleFinalExpireEvent(e);
      }
    }
  }默認的executor如下,采用的是SynchronousQueue隊列,可以通過builder自己去設(shè)置refreshExecutor
Executor provideDefaultLoaderExecutor(int threadCount) {
    int corePoolThreadSize = 0;
    return new ThreadPoolExecutor(corePoolThreadSize, threadCount,
      21, TimeUnit.SECONDS,
      new SynchronousQueue<>(),
      threadFactoryProvider.newThreadFactory(getThreadNamePrefix()),
      new ThreadPoolExecutor.AbortPolicy());
  }回源故障問題
針對回源的下游出現(xiàn)故障的問題,cache2k提供了ResiliencePolicy策略,其實現(xiàn)類為UniversalResiliencePolicy
當load方法拋出異常且cache里頭還有數(shù)據(jù)的時候,異常不會拋給client,用當前的數(shù)據(jù)返回,這里有個resilienceDuration時間,如果超過這個時間load方法還繼續(xù)拋出異常則異常會拋給client。如果沒有單獨設(shè)置resilienceDuration,則默認取的是expiryAfterWrite時間。
org/cache2k/core/HeapCache.java。
private Object loadGotException(Entry<K, V> e, long t0, long t, Throwable wrappedException) {
    ExceptionWrapper<K, V> exceptionWrapper =
      new ExceptionWrapper(keyObjFromEntry(e), wrappedException, t0, e, exceptionPropagator);
    long expiry = 0;
    long refreshTime = 0;
    boolean suppressException = false;
    RefreshAheadPolicy.Context<Object> refreshCtx;
    try {
      if (e.isValidOrExpiredAndNoException()) {
        expiry = timing.suppressExceptionUntil(e, exceptionWrapper);
      }
      if (expiry > t0) {
        suppressException = true;
      } else {
        expiry = timing.cacheExceptionUntil(e, exceptionWrapper);
      }
      refreshCtx = getContext(e, t0, t, true, true, false, expiry);
      refreshTime = timing.calculateRefreshTime(refreshCtx);
    } catch (Exception ex) {
      return resiliencePolicyException(e, t0, t, new ResiliencePolicyException(ex), null);
    }
    exceptionWrapper = new ExceptionWrapper<>(exceptionWrapper, Math.abs(expiry));
    Object wrappedValue = exceptionWrapper;
    if (expiry != 0) {
      wrappedValue = timing.wrapLoadValueForRefresh(refreshCtx, e, exceptionWrapper);
    }
    Object loadResult;
    synchronized (e) {
      insertUpdateStats(e, (V) wrappedValue, t0, t, true, expiry, suppressException);
      if (suppressException) {
        e.setSuppressedLoadExceptionInformation(exceptionWrapper);
        loadResult = e.getValueOrException();
      } else {
        if (isRecordModificationTime()) {
          e.setModificationTime(t0);
        }
        e.setValueOrWrapper(exceptionWrapper);
        loadResult = exceptionWrapper;
      }
      finishLoadOrEviction(e, expiry, refreshTime);
    }
    return loadResult;
  }這里timing.suppressExceptionUntil是委托給了ResiliencePolicy#suppressExceptionUntil。
cache2k-addon/src/main/java/org/cache2k/addon/UniversalResiliencePolicy.java。
public long suppressExceptionUntil(K key,
                                     LoadExceptionInfo<K, V> loadExceptionInfo,
                                     CacheEntry<K, V> cachedEntry) {
    if (resilienceDuration == 0 || resilienceDuration == Long.MAX_VALUE) {
      return resilienceDuration;
    }
    long maxSuppressUntil = loadExceptionInfo.getSinceTime() + resilienceDuration;
    long deltaMs = calculateRetryDelta(loadExceptionInfo);
    return Math.min(loadExceptionInfo.getLoadTime() + deltaMs, maxSuppressUntil);
  }UniversalResiliencePolicy還提供了異常重試的功能,重試間隔為retryInterval,如果沒有配置則為resilienceDuration的5%,采取的是指數(shù)退避的模式,factor為1.5。
小結(jié)
cache2k提供了Guava Cache及Caffeine沒有的ResiliencePolicy,針對C端高并發(fā)場景提供了容錯的功能,值得借鑒一下。















 
 
 







 
 
 
 