深入解析 SpringCloud 的負載衡器 Loadbalancer
這期我們結(jié)合之前幾篇關(guān)于nacos客戶端緩存來說明一下集成loadbalancer后服務(wù)調(diào)用時可能存在的問題和解決方案。
一、詳解Spring Cloud loadbalancer服務(wù)調(diào)用機制
1. loadbalancer使用說明
首先我們引出本文所使用的loadbalancer依賴,可以看到版本為3.1.5,作為服務(wù)消費者我們通過loadbalancer作為負載均衡器替換到默認的ribbon,同時我們還引入caffeine觸發(fā)Loadbalancer完成基于caffeine的服務(wù)緩存裝配:
<!--使用loadbalancer負載均衡器替換ribbon-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
對應(yīng)我們還是給出本文項目的架構(gòu)圖,可以看到當前項目給出兩個服務(wù)提供者nacos-provider和一個服務(wù)消費者nacos-consumer,當服務(wù)消費者從nacos拉取到可用服務(wù)之后,會通過權(quán)重算法調(diào)用可用的服務(wù)示例:
對應(yīng)的我們也給出本文服務(wù)消費者所用到的負載均衡算法的配置:
/**
* 將負載均衡算法設(shè)置為權(quán)重算法
*
* @param environment
* @param loadBalancerClientFactory
* @return
*/
@Bean
ReactorLoadBalancer<ServiceInstance> weightedLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new WeightedLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
同時我們也給出WeightedLoadBalancer 這個權(quán)重算法的實現(xiàn),讀者可基于代碼自行了解一下:
/**
* 基于權(quán)重的負載均衡算法
*/
public class WeightedLoadBalancer implements ReactorServiceInstanceLoadBalancer {
/**
* loadbalancer 提供的訪問當前服務(wù)的名稱
*/
final String serviceId;
/**
* 基于nacos緩存獲取服務(wù)列表
*/
ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public WeightedLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@Autowired
private NacosDiscoveryProperties nacosDiscoveryProperties;
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
//如果serviceInstanceListSupplierProvider不存在則采用NoopServiceInstanceListSupplier返回空實例列表
ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
//基于服務(wù)獲取策略supplier略獲取可用實例,采用權(quán)重算法返回本地調(diào)用的服務(wù)實例
return supplier.get(request)
.next()
.map(serviceInstances -> new DefaultResponse(NacosBalancer.getHostByRandomWeight3(serviceInstances)));
}
}
2. loadbalancer自動裝配初始化
基于上述的配置,loadbalancer進行自動裝配的時候就會識別到Caffeine和CaffeineCacheManager的存在,于是觸發(fā)caffeineLoadBalancerCacheManager的裝配,該管理類內(nèi)部有一個cacheMap,本次裝配將會以cachingServiceInstanceListSupplierCache為key,caffeine cache為value作為鍵值對存入,后續(xù)所有的服務(wù)緩存實例信息都會存儲在這個鍵值對中的caffeine cache中:
對應(yīng)的我們也給出自動裝配的源碼,如下所示,可以看到因為Caffeine和CaffeineCacheManager的存在,我們觸發(fā)了CaffeineBasedLoadBalancerCacheManager的裝配:
@Configuration(proxyBeanMethods = false)
//Caffeine和CaffeineCacheManager都存在觸發(fā)自動裝配
@ConditionalOnClass({ Caffeine.class, CaffeineCacheManager.class })
protected static class CaffeineLoadBalancerCacheManagerConfiguration {
@Bean(autowireCandidate = false)
@ConditionalOnMissingBean
LoadBalancerCacheManager caffeineLoadBalancerCacheManager(LoadBalancerCacheProperties cacheProperties) {
//生成CaffeineBasedLoadBalancerCacheManager管理類
return new CaffeineBasedLoadBalancerCacheManager(cacheProperties);
}
}
通過查看CaffeineBasedLoadBalancerCacheManager內(nèi)部初始化邏輯,最終可以看到setCacheNames這個方法,該方法就會執(zhí)行就是我們上文中所說的以cachingServiceInstanceListSupplierCache為key,以caffeineCache這個緩存為value的鍵值對初始化工作:
public CaffeineCacheManager(String... cacheNames) {
//基于
setCacheNames(Arrays.asList(cacheNames));
}
public void setCacheNames(@Nullable Collection<String> cacheNames) {
if (cacheNames != null) {
for (String name : cacheNames) {
this.cacheMap.put(name, createCaffeineCache(name));
}
this.dynamic = false;
}
else {
this.dynamic = true;
}
}
初始化后的調(diào)試結(jié)果如下,我們可以很直觀的看到這個記錄服務(wù)實例的緩存鍵值對:
3. loadbalancer如何基于緩存完成服務(wù)調(diào)用
默認情況下loadbalancer緩存是沒有任何信息的,假設(shè)我們nacos-consumer即服務(wù)消費者發(fā)起對nacos-provider的調(diào)用,loadbalancer是如何拿到服務(wù)實例的信息呢?
實際上,在loadbalancer初始化的時候,服務(wù)實例查詢組件ServiceInstanceListSupplier內(nèi)部聚合了nacos的服務(wù)查詢組件DiscoveryClientServiceInstanceListSupplier,所以當我們通過feign發(fā)起調(diào)用時,loadbalancer的執(zhí)行步驟為:
- loadbalancer代理會先通過ServiceInstanceListSupplier到緩存中查看是否存在服務(wù)提供者nacos-provider的信息,如果不為空直接返回調(diào)用即可,如果不存在則執(zhí)行步驟2。
- 嘗試到基于nacos服務(wù)查詢組件DiscoveryClientServiceInstanceListSupplier查看是否存在nacos-provider如果有直接返回。
- ServiceInstanceListSupplier基于nacos服務(wù)組件的結(jié)果拿到實例信息,將其緩存起來,并基于負載均衡策略返回服務(wù)實例給服務(wù)消費者進行調(diào)用。
基于上圖我們給出loadbalancer自動裝配的服務(wù)查詢組件源碼,可以看到ServiceInstanceListSupplier通過withBlockingDiscoveryClient方法聚合了nacos服務(wù)查詢組件:
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
@Conditional(DefaultConfigurationCondition.class)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
//ServiceInstanceListSupplier通過withBlockingDiscoveryClient方法聚合了nacos服務(wù)查詢組件
return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context);
}
當我們發(fā)起服務(wù)調(diào)用時,feign代理會走到FeignBlockingLoadBalancerClient上,其內(nèi)部會執(zhí)行如下步驟:
- 通過loadBalancerClient的choose嘗試上文的多級緩存查詢服務(wù)示例的步驟,并完成負載均衡選取服務(wù)實例返回。
- 基于上述實例生成請求地址和參數(shù)。
- 發(fā)起請求并響應(yīng)結(jié)果給服務(wù)消費者。
@Override
public Response execute(Request request, Request.Options options) throws IOException {
//......
//嘗試從緩存中拿服務(wù),然后執(zhí)行負載均衡調(diào)用
ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
//......
//基于上述實例生成請求地址和參數(shù)
String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
Request newRequest = buildRequest(request, reconstructedUrl);
LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);
//發(fā)起請求
return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
supportedLifecycleProcessors, loadBalancerProperties.isUseRawStatusCodeInResponseData());
}
步入loadBalancerClient.choose的調(diào)用路徑,我們就可以看到上文所說的多級緩存查詢步驟:
- 到lb緩存即cacheManager通過cachingServiceInstanceListSupplierCache查詢是否存在服務(wù)提供者nacos-provider的實例。
- 如果沒有則到nacos中的查詢。
- 基于查詢結(jié)果寫入lb的cache中:
public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
this.serviceInstances = CacheFlux.lookup(key -> {
// 到lb緩存管理拿緩存
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
//......
//查看lb緩存有沒有
List<ServiceInstance> list = cache.get(key, List.class);
//如果沒有返回空
if (list == null || list.isEmpty()) {
return Mono.empty();
}
return Flux.just(list).materialize().collectList();
},
//若lb緩存沒有則觸發(fā)onCacheMissResume回調(diào),就會通過delegate.get()觸發(fā)nacos服務(wù)組件查詢
delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize()
.doOnNext(instances -> {//doOnNext得到nacos緩存后寫入lb緩存中
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
//.......
}
else {
//寫入lb緩存
cache.put(key, instances);
}
}).then());
}
4. loadbalancer緩存更新策略
默認情況下,lb緩存每35s完成一次更新,這也就意味著緩存在lb緩存中的服務(wù)實例信息只有在存入后的35s內(nèi)是有效的,為了避免定時輪詢更新服務(wù)實例的開銷,lb的緩存采用了一種惰性更新的思想。
假設(shè)我們此時此刻緩存將nacos-provider的實例信息緩存到lb裝配的CaffeineCache中,服務(wù)消費者在35s之后發(fā)起調(diào)用,此時CaffeineCache就會基于緩存服務(wù)實例的起始時間判斷緩存是否過期,如果發(fā)現(xiàn)過期則直接返回null,讓loadbalancer到nacos緩存中獲取nacos-provider實例信息并覆蓋掉當前過期的緩存:
對應(yīng)我們給出cache緩存默認過期時間的默認值,即位于LoadBalancerCacheProperties 中對應(yīng)ttl 的賦值:
@ConfigurationProperties("spring.cloud.loadbalancer.cache")
public class LoadBalancerCacheProperties {
//.......
private Duration ttl = Duration.ofSeconds(35);
//......
}
為方便說明,我們再次貼出lb緩存查詢的源碼,如下所示, cache.get(key, List.class)這一段就是從Loadbalancer的緩存中獲取服務(wù)實例的信息,如果過期也會返回null,然后到nacos緩存中獲取信息并更新過期緩存:
public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
this.serviceInstances = CacheFlux.lookup(key -> {
// 到lb緩存管理拿緩存
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
//......
//查看lb緩存有沒有查詢服務(wù)實例信息,如果不存在或者過期則返回null
List<ServiceInstance> list = cache.get(key, List.class);
//如果沒有返回空
if (list == null || list.isEmpty()) {
return Mono.empty();
}
return Flux.just(list).materialize().collectList();
},
//若lb緩存沒有則觸發(fā)onCacheMissResume回調(diào),就會通過delegate.get()觸發(fā)nacos服務(wù)組件查詢
delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize()
.doOnNext(instances -> {//doOnNext得到nacos緩存后寫入lb緩存中
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
//.......
}
else {
//寫入lb緩存,并更新寫入時間
cache.put(key, instances);
}
}).then());
}
步入Loadbalancer緩存CaffeineCache的get方法,最終就會來到BoundedLocalCache的getIfPresent,這段就是查詢緩存并判斷過期的核心實現(xiàn),對應(yīng)步驟為:
- 基于要調(diào)用的服務(wù)實例的字符串(以本文示例來說就是nacos-provider)作為key進行查詢,并得到一個node。
- 通過node的writeTime比對當前時間now判斷是否過期。
- 如果過期返回null,讓loadbalancer到nacos緩存中獲取最新的值并覆蓋掉當前緩存。
- 如果沒過期則直接返回。
對應(yīng)的我們也給出這段說明的源碼,讀者可結(jié)合表述和源碼注釋理解上述步驟:
public @Nullable V getIfPresent(Object key, boolean recordStats) {
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
//......
//獲取服務(wù)實例信息
V value = node.getValue();
//......
//判斷是否過期,如果過期則返回null
if (hasExpired(node, now) || (collectValues() && (value == null))) {
//......
return null;
}
//......
//沒過期直接返回緩存中的服務(wù)實例信息
return value;
}
二、關(guān)于loadbalancer的一些注意事項
經(jīng)過上述的說明相信筆者對于loadbalancer的底層工作機制有所了解,所以需要做灰度發(fā)布或者服務(wù)平滑下線的場景,我們建議將loadbalancer緩存直接禁用,一律采用nacos緩存,這一點筆者在之前的文章中也分析過nacos客戶端的中的服務(wù)實例緩存是實時刷新的,只要服務(wù)端感知到服務(wù)下線就會以RPC的方式通知nacos客戶端更新緩存。
對應(yīng)的我們也給出禁用緩存的配置:
spring.cloud.loadbalancer.cache.enabled=false
三、小結(jié)
本文結(jié)合源碼的方式深入分析了SpringCloud Loadbalancer負載均衡調(diào)用時所涉及的:
- Loadbalancer緩存同步
- Loadbalancer如何進行過期緩存刪除
- Loadbalancer如何基于裝飾者模式和nacos緩存結(jié)合
由此得出Loadbalancer緩存實時性上的存在的風(fēng)險,希望對你有幫助。