面試官:Zookeeper了解嗎?說(shuō)說(shuō)都有哪些使用場(chǎng)景?
前言
- Zookeeper特性與節(jié)點(diǎn)說(shuō)明
 - Zookeeper客戶端使用與集群原理
 
前兩篇講了Zookeeper的特性、客戶端使用和集群原理,因?yàn)?Zookeeper 是分布式系統(tǒng)中很常見的一個(gè)基礎(chǔ)系統(tǒng)。 而且問(wèn)的話常問(wèn)的就是說(shuō) zookeeper 的使用場(chǎng)景是什么? 看你知道不知道一些基本的使用場(chǎng)景。 但是其實(shí) Zookeeper 挖深了自然是可以問(wèn)的很深很深的。本文主要來(lái)聊聊 Zookeeper 主要的幾個(gè)使用場(chǎng)景。
- 分布式集群管理
 - 分布式注冊(cè)中心
 - 分布式JOB
 - 分布式鎖
 
分布式集群管理
分布式集群管理的需求
- 主動(dòng)查看線上服務(wù)節(jié)點(diǎn)
 - 查看服務(wù)節(jié)點(diǎn)資源使用情況
 - 服務(wù)離線通知
 - 服務(wù)資源(CPU、內(nèi)存、硬盤)超出閥值通知
 
架構(gòu)設(shè)計(jì)
節(jié)點(diǎn)結(jié)構(gòu)
- niuh-manger // 根節(jié)點(diǎn)
 - server00001 : //服務(wù)節(jié)點(diǎn) 1
 - server00002 ://服務(wù)節(jié)點(diǎn) 2
 - server........n ://服務(wù)節(jié)點(diǎn) n
 
服務(wù)狀態(tài)信息
- ip
 - cpu
 - memory
 - disk
 
功能實(shí)現(xiàn)
數(shù)據(jù)生成與上報(bào)
- 創(chuàng)建臨時(shí)節(jié)點(diǎn):
 - 定時(shí)變更節(jié)點(diǎn)狀態(tài)信息:
 
主動(dòng)查詢
- 實(shí)時(shí)查詢 zookeeper 獲取集群節(jié)點(diǎn)的狀態(tài)信息。
 
被動(dòng)通知
- 監(jiān)聽根節(jié)點(diǎn)下子節(jié)點(diǎn)的變化情況,如果CPU 等硬件資源低于警告位則發(fā)出警報(bào)。
 
關(guān)鍵示例代碼
- package com.niuh.os;
 - import com.fasterxml.jackson.core.JsonProcessingException;
 - import com.fasterxml.jackson.databind.ObjectMapper;
 - import org.I0Itec.zkclient.ZkClient;
 - import java.lang.instrument.Instrumentation;
 - import java.lang.management.ManagementFactory;
 - import java.lang.management.MemoryUsage;
 - import java.net.InetAddress;
 - import java.net.UnknownHostException;
 - public class Agent {
 - private static Agent ourInstance = new Agent();
 - private String server = "127.0.0.1:2181";
 - private ZkClient zkClient;
 - private static final String rootPath = "/niuh-manger";
 - private static final String servicePath = rootPath + "/service";
 - private String nodePath; ///niuh-manger/service0000001 當(dāng)前節(jié)點(diǎn)路徑
 - private Thread stateThread;
 - public static Agent getInstance() {
 - return ourInstance;
 - }
 - private Agent() {
 - }
 - // javaagent 數(shù)據(jù)監(jiān)控
 - public static void premain(String args, Instrumentation instrumentation) {
 - Agent.getInstance().init();
 - }
 - public void init() {
 - zkClient = new ZkClient(server, 5000, 10000);
 - System.out.println("zk連接成功" + server);
 - // 創(chuàng)建根節(jié)點(diǎn)
 - buildRoot();
 - // 創(chuàng)建臨時(shí)節(jié)點(diǎn)
 - createServerNode();
 - // 啟動(dòng)更新的線程
 - stateThread = new Thread(() -> {
 - while (true) {
 - updateServerNode();
 - try {
 - Thread.sleep(5000);
 - } catch (InterruptedException e) {
 - e.printStackTrace();
 - }
 - }
 - }, "zk_stateThread");
 - stateThread.setDaemon(true);
 - stateThread.start();
 - }
 - // 數(shù)據(jù)寫到 當(dāng)前的臨時(shí)節(jié)點(diǎn)中去
 - public void updateServerNode() {
 - zkClient.writeData(nodePath, getOsInfo());
 - }
 - // 生成服務(wù)節(jié)點(diǎn)
 - public void createServerNode() {
 - nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo());
 - System.out.println("創(chuàng)建節(jié)點(diǎn):" + nodePath);
 - }
 - // 更新服務(wù)節(jié)點(diǎn)狀態(tài)
 - public String getOsInfo() {
 - OsBean bean = new OsBean();
 - bean.lastUpdateTime = System.currentTimeMillis();
 - bean.ip = getLocalIp();
 - bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu();
 - MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
 - bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024;
 - bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024;
 - bean.pid = ManagementFactory.getRuntimeMXBean().getName();
 - ObjectMapper mapper = new ObjectMapper();
 - try {
 - return mapper.writeValueAsString(bean);
 - } catch (JsonProcessingException e) {
 - throw new RuntimeException(e);
 - }
 - }
 - public static String getLocalIp() {
 - InetAddress addr = null;
 - try {
 - addr = InetAddress.getLocalHost();
 - } catch (UnknownHostException e) {
 - throw new RuntimeException(e);
 - }
 - return addr.getHostAddress();
 - }
 - public void buildRoot() {
 - if (!zkClient.exists(rootPath)) {
 - zkClient.createPersistent(rootPath);
 - }
 - }
 - }
 
實(shí)現(xiàn)效果
啟動(dòng)參數(shù)設(shè)置
運(yùn)行測(cè)試用例:
- package com.niuh.test;
 - import com.niuh.os.Agent;
 - import org.junit.Ignore;
 - import org.junit.Test;
 - public class AgentTest {
 - @Test
 - @Ignore
 - public void initTest() {
 - Agent.premain(null, null);
 - runCPU(2); //20% 占用
 - try {
 - Thread.sleep(Long.MAX_VALUE);
 - } catch (InterruptedException e) {
 - e.printStackTrace();
 - }
 - }
 - //
 - private void runCPU(int count) {
 - for (int i = 0; i < count; i++) {
 - new Thread(() -> {
 - while (true) {
 - long bac = 1000000;
 - bac = bac >> 1;
 - }
 - }).start();
 - ;
 - }
 - }
 - }
 
控制臺(tái)輸出:
- CPU 報(bào)警...22.55120088850181
 - CPU 報(bào)警...46.06592086097357CPU 報(bào)警...47.87206766163349CPU 報(bào)警...49.49176420213768CPU 報(bào)警...48.967942479969004CPU 報(bào)警...49.193921607021565CPU 報(bào)警...48.806604284784676CPU 報(bào)警...48.63229912951865CPU 報(bào)警...49.34509647972038CPU 報(bào)警...47.07551108884401CPU 報(bào)警...49.18489236134496CPU 報(bào)警...49.903007346777066CPU 報(bào)警...49.28868795953268// 關(guān)閉測(cè)試用例服務(wù)已下線:OsBean{ip='192.168.43.11', cpu=49.28868795953268, usedMemorySize=56, usableMemorySize=3641, pid='47192@hejianhui', lastUpdateTime=1602056208842}
 
本Demo不適用在生產(chǎn)環(huán)境,示例Demo涉及組件zookeeper-agent、zookeeper-web。源代碼提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。
分布式注冊(cè)中心
在單體式服務(wù)中,通常是由多個(gè)客戶端去調(diào)用一個(gè)服務(wù),只要在客戶端中配置唯一服務(wù)節(jié)點(diǎn)地址即可,當(dāng)升級(jí)到分布式后,服務(wù)節(jié)點(diǎn)變多,像一線大廠服務(wù)節(jié)點(diǎn)更是上萬(wàn)之多,這么多節(jié)點(diǎn)不可能手動(dòng)配置在客戶端,這里就需要一個(gè)中間服務(wù),專門用于幫助客戶端發(fā)現(xiàn)服務(wù)節(jié)點(diǎn),即許多技術(shù)書籍經(jīng)常提到的服務(wù)發(fā)現(xiàn)。
一個(gè)完整的注冊(cè)中心涵蓋以下功能特性:
- 服務(wù)注冊(cè):提供者上線時(shí)將自提供的服務(wù)提交給注冊(cè)中心。
 - 服務(wù)注銷:通知注冊(cè)心提供者下線。
 - 服務(wù)訂閱:動(dòng)態(tài)實(shí)時(shí)接收服務(wù)變更消息。
 - 可靠:注冊(cè)服務(wù)本身是集群的,數(shù)據(jù)冗余存儲(chǔ)。避免單點(diǎn)故障,及數(shù)據(jù)丟失。
 - 容錯(cuò):當(dāng)服務(wù)提供者出現(xiàn)宕機(jī),斷電等極情況時(shí),注冊(cè)中心能夠動(dòng)態(tài)感知并通知客戶端服務(wù)提供者的狀態(tài)。
 
Dubbo 對(duì) Zookeeper的使用
阿里著名的開源項(xiàng)目Dubbo 是一個(gè)基于JAVA的RCP框架,其中必不可少的注冊(cè)中心可基于多種第三方組件實(shí)現(xiàn),但其官方推薦的還是Zookeeper作為注冊(cè)中心服務(wù)。
Dubbo Zookeeper注冊(cè)中心存儲(chǔ)結(jié)構(gòu)
節(jié)點(diǎn)說(shuō)明
流程說(shuō)明
- 服務(wù)提供者啟動(dòng)時(shí): 向 /dubbo/com.foo.BarService/providers 目錄下寫入自己的 URL 地址
 - 服務(wù)消費(fèi)者啟動(dòng)時(shí): 訂閱 /dubbo/com.foo.BarService/providers 目錄下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目錄下寫入自己的 URL 地址
 - 監(jiān)控中心啟動(dòng)時(shí): 訂閱 /dubbo/com.foo.BarService 目錄下的所有提供者和消費(fèi)者 URL 地址。
 
示例Demo
服務(wù)端代碼
- package com.niuh.zk.dubbo;
 - import com.alibaba.dubbo.config.ApplicationConfig;
 - import com.alibaba.dubbo.config.ProtocolConfig;
 - import com.alibaba.dubbo.config.RegistryConfig;
 - import com.alibaba.dubbo.config.ServiceConfig;
 - import java.io.IOException;
 - public class Server {
 - public void openServer(int port) {
 - // 構(gòu)建應(yīng)用
 - ApplicationConfig config = new ApplicationConfig();
 - config.setName("simple-app");
 - // 通信協(xié)議
 - ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", port);
 - protocolConfig.setThreads(200);
 - ServiceConfig<UserService> serviceConfig = new ServiceConfig();
 - serviceConfig.setApplication(config);
 - serviceConfig.setProtocol(protocolConfig);
 - serviceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
 - serviceConfig.setInterface(UserService.class);
 - UserServiceImpl ref = new UserServiceImpl();
 - serviceConfig.setRef(ref);
 - //開始提供服務(wù) 開張做生意
 - serviceConfig.export();
 - System.out.println("服務(wù)已開啟!端口:"+serviceConfig.getExportedUrls().get(0).getPort());
 - ref.setPort(serviceConfig.getExportedUrls().get(0).getPort());
 - }
 - public static void main(String[] args) throws IOException {
 - new Server().openServer(-1);
 - System.in.read();
 - }
 - }
 
客戶端代碼
- package com.niuh.zk.dubbo;
 - import com.alibaba.dubbo.config.ApplicationConfig;
 - import com.alibaba.dubbo.config.ReferenceConfig;
 - import com.alibaba.dubbo.config.RegistryConfig;
 - import java.io.IOException;
 - public class Client { UserService service; // URL 遠(yuǎn)程服務(wù)的調(diào)用地址 public UserService buildService(String url) { ApplicationConfig config = new ApplicationConfig("young-app");
 - // 構(gòu)建一個(gè)引用對(duì)象 ReferenceConfig<UserService> referenceConfig = new ReferenceConfig<UserService>(); referenceConfig.setApplication(config);
 - referenceConfig.setInterface(UserService.class); // referenceConfig.setUrl(url); referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
 - referenceConfig.setTimeout(5000);
 - // 透明化 this.service = referenceConfig.get(); return service;
 - } static int i = 0;
 - public static void main(String[] args) throws IOException { Client client1 = new Client(); client1.buildService("");
 - String cmd; while (!(cmd = read()).equals("exit")) {
 - UserVo u = client1.service.getUser(Integer.parseInt(cmd)); System.out.println(u); } } private static String read() throws IOException {
 - byte[] b = new byte[1024];
 - int size = System.in.read(b);
 - return new String(b, 0, size).trim();
 - }}
 
查詢 zk 實(shí)際存儲(chǔ)內(nèi)容:
- /dubbo
 - /dubbo/com.niuh.zk.dubbo.UserService/dubbo/com.niuh.zk.dubbo.UserService/configurators/dubbo/com.niuh.zk.dubbo.UserService/routers/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo://192.168.43.11:20880/com.niuh.zk.dubbo.UserService?anyhost=true&application=simple-app&dubbo=2.6.2&generic=false&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=48302&side=provider&threads=200×tamp=1602057895881/dubbo/com.niuh.zk.dubbo.UserService/consumers/dubbo/com.niuh.zk.dubbo.UserService/consumers/consumer://192.168.43.11com.niuh.zk.dubbo.UserService?application=young-app&category=consumers&check=false&dubbo=2.6.2&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=49036&side=consumer&timeout=5000×tamp=1602075359549
 
示例Demo涉及組件zookeeper-dubbo。源代碼提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。
分布式JOB
分布式JOB需求
多個(gè)服務(wù)節(jié)點(diǎn)只允許其中一個(gè)主節(jié)點(diǎn)運(yùn)行JOB任務(wù)。
當(dāng)主節(jié)點(diǎn)掛掉后能自動(dòng)切換主節(jié)點(diǎn),繼續(xù)執(zhí)行JOB任務(wù)。
架構(gòu)設(shè)計(jì)
node結(jié)構(gòu)
- niuh-master
 - server0001:master
 - server0002:slave
 - server000n:slave
 
選舉流程
服務(wù)啟動(dòng):
- 在niuh-maste下創(chuàng)建server子節(jié)點(diǎn),值為slave
 - 獲取所有niuh-master 下所有子節(jié)點(diǎn)
 - 判斷是否存在master 節(jié)點(diǎn)
 - 如果沒有設(shè)置自己為master節(jié)點(diǎn)
 
子節(jié)點(diǎn)刪除事件觸發(fā):
- 獲取所有niuh-master 下所有子節(jié)點(diǎn)
 - 判斷是否存在master 節(jié)點(diǎn)
 - 如果沒有設(shè)置最小值序號(hào)為master 節(jié)點(diǎn)
 
示例Demo
- package com.niuh.zookeeper.master;
 - import org.I0Itec.zkclient.ZkClient;
 - import java.util.Map;
 - import java.util.stream.Collectors;
 - public class MasterResolve {
 - private String server = "127.0.0.1:2181";
 - private ZkClient zkClient;
 - private static final String rootPath = "/niuh-master";
 - private static final String servicePath = rootPath + "/service";
 - private String nodePath;
 - private volatile boolean master = false;
 - private static MasterResolve resolve;
 - private MasterResolve() {
 - zkClient = new ZkClient(server, 2000, 5000);
 - buildRoot(); createServerNode(); } public static MasterResolve getInstance() {
 - if (resolve == null) {
 - resolve= new MasterResolve();
 - } return resolve;
 - } // 構(gòu)建根節(jié)點(diǎn)
 - public void buildRoot() {
 - if (!zkClient.exists(rootPath)) {
 - zkClient.createPersistent(rootPath);
 - }
 - }
 - // 創(chuàng)建server節(jié)點(diǎn)
 - public void createServerNode() {
 - nodePath = zkClient.createEphemeralSequential(servicePath, "slave");
 - System.out.println("創(chuàng)建service節(jié)點(diǎn):" + nodePath);
 - initMaster();
 - initListener();
 - }
 - private void initMaster() {
 - boolean existMaster = zkClient.getChildren(rootPath)
 - .stream()
 - .map(p -> rootPath + "/" + p)
 - .map(p -> zkClient.readData(p))
 - .anyMatch(d -> "master".equals(d));
 - if (!existMaster) {
 - doElection();
 - System.out.println("當(dāng)前當(dāng)選master");
 - }
 - }
 - private void initListener() {
 - zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> {
 - doElection();// 執(zhí)行選舉
 - });
 - }
 - // 執(zhí)行選舉
 - public void doElection() {
 - Map<String, Object> childData = zkClient.getChildren(rootPath)
 - .stream()
 - .map(p -> rootPath + "/" + p)
 - .collect(Collectors.toMap(p -> p, p -> zkClient.readData(p)));
 - if (childData.containsValue("master")) {
 - return;
 - }
 - childData.keySet().stream().sorted().findFirst().ifPresent(p -> {
 - if (p.equals(nodePath)) { // 設(shè)置最小值序號(hào)為master 節(jié)點(diǎn)
 - zkClient.writeData(nodePath, "master");
 - master = true;
 - System.out.println("當(dāng)前當(dāng)選master" + nodePath);
 - }
 - });
 - }
 - public static boolean isMaster() {
 - return getInstance().master;
 - }
 - }
 
示例Demo涉及組件zookeeper-master。源代碼提交在 github :https://github.com/Niuh-Frame/niuh-zookeeper。
分布式鎖
鎖的的基本概念
開發(fā)中鎖的概念并不陌生,通過(guò)鎖可以實(shí)現(xiàn)在多個(gè)線程或多個(gè)進(jìn)程間在爭(zhēng)搶資源時(shí),能夠合理的分配置資源的所有權(quán)。在單體應(yīng)用中我們可以通過(guò) synchronized 或 ReentrantLock 來(lái)實(shí)現(xiàn)鎖。但在分布式系統(tǒng)中,僅僅是加synchronized 是不夠的,需要借助第三組件來(lái)實(shí)現(xiàn)。比如一些簡(jiǎn)單的做法是使用關(guān)系型數(shù)據(jù)行級(jí)鎖來(lái)實(shí)現(xiàn)不同進(jìn)程之間的互斥,但大型分布式系統(tǒng)的性能瓶頸往往集中在數(shù)據(jù)庫(kù)操作上。為了提高性能得采用如Redis、Zookeeper之內(nèi)的組件實(shí)現(xiàn)分布式鎖。
共享鎖:也稱作只讀鎖,當(dāng)一方獲得共享鎖之后,其它方也可以獲得共享鎖。但其只允許讀取。在共享鎖全部釋放之前,其它方不能獲得寫鎖。
排它鎖:也稱作讀寫鎖,獲得排它鎖后,可以進(jìn)行數(shù)據(jù)的讀寫。在其釋放之前,其它方不能獲得任何鎖。
鎖的獲取
某銀行賬戶,可以同時(shí)進(jìn)行帳戶信息的讀取,但讀取期間不能修改帳戶數(shù)據(jù)。其賬戶ID為:888
獲得讀鎖流程
- 基于資源ID創(chuàng)建臨時(shí)序號(hào)讀鎖節(jié)點(diǎn) /lock/888.R0000000002 Read
 - 獲取 /lock 下所有子節(jié)點(diǎn),判斷其最小的節(jié)點(diǎn)是否為讀鎖,如果是則獲鎖成功
 - 最小節(jié)點(diǎn)不是讀鎖,則阻塞等待。添加lock/ 子節(jié)點(diǎn)變更監(jiān)聽。
 - 當(dāng)節(jié)點(diǎn)變更監(jiān)聽觸發(fā),執(zhí)行第2步
 
數(shù)據(jù)結(jié)構(gòu)
獲得寫鎖
- 基于資源ID創(chuàng)建臨時(shí)序號(hào)寫鎖節(jié)點(diǎn) /lock/888.R0000000002 Write
 - 獲取 /lock 下所有子節(jié)點(diǎn),判斷其最小的節(jié)點(diǎn)是否為自己,如果是則獲鎖成功
 - 最小節(jié)點(diǎn)不是自己,則阻塞等待。添加lock/ 子節(jié)點(diǎn)變更監(jiān)聽。
 - 當(dāng)節(jié)點(diǎn)變更監(jiān)聽觸發(fā),執(zhí)行第2步
 
釋放鎖
讀取完畢后,手動(dòng)刪除臨時(shí)節(jié)點(diǎn),如果獲鎖期間宕機(jī),則會(huì)在會(huì)話失效后自動(dòng)刪除。
關(guān)于羊群效應(yīng)
在等待鎖獲得期間,所有等待節(jié)點(diǎn)都在監(jiān)聽 Lock節(jié)點(diǎn),一但lock 節(jié)點(diǎn)變更所有等待節(jié)點(diǎn)都會(huì)被觸發(fā),然后在同時(shí)反查L(zhǎng)ock 子節(jié)點(diǎn)。如果等待對(duì)例過(guò)大會(huì)使用Zookeeper承受非常大的流量壓力。
為了改善這種情況,可以采用監(jiān)聽鏈表的方式,每個(gè)等待隊(duì)列只監(jiān)聽前一個(gè)節(jié)點(diǎn),如果前一個(gè)節(jié)點(diǎn)釋放鎖的時(shí)候,才會(huì)被觸發(fā)通知。這樣就形成了一個(gè)監(jiān)聽鏈表。
示例Demo
- package com.niuh.zookeeper.lock;
 - import org.I0Itec.zkclient.IZkDataListener;
 - import org.I0Itec.zkclient.ZkClient;
 - import java.util.List;
 - import java.util.stream.Collectors;
 - public class ZookeeperLock {
 - private String server = "127.0.0.1:2181";
 - private ZkClient zkClient;
 - private static final String rootPath = "/niuh-lock1";
 - public ZookeeperLock() {
 - zkClient = new ZkClient(server, 5000, 20000);
 - buildRoot(); } // 構(gòu)建根節(jié)點(diǎn)
 - public void buildRoot() {
 - if (!zkClient.exists(rootPath)) {
 - zkClient.createPersistent(rootPath);
 - }
 - }
 - // 獲取鎖
 - public Lock lock(String lockId, long timeout) {
 - // 創(chuàng)建臨時(shí)節(jié)點(diǎn)
 - Lock lockNode = createLockNode(lockId);
 - lockNode = tryActiveLock(lockNode);// 嘗試激活鎖
 - if (!lockNode.isActive()) {
 - try {
 - synchronized (lockNode) {
 - lockNode.wait(timeout); // 線程鎖住
 - }
 - } catch (InterruptedException e) {
 - throw new RuntimeException(e);
 - }
 - }
 - if (!lockNode.isActive()) {
 - throw new RuntimeException(" lock timeout");
 - }
 - return lockNode;
 - }
 - // 釋放鎖
 - public void unlock(Lock lock) {
 - if (lock.isActive()) {
 - zkClient.delete(lock.getPath());
 - }
 - }
 - // 嘗試激活鎖
 - private Lock tryActiveLock(Lock lockNode) {
 - // 獲取根節(jié)點(diǎn)下面所有的子節(jié)點(diǎn)
 - List<String> list = zkClient.getChildren(rootPath)
 - .stream()
 - .sorted()
 - .map(p -> rootPath + "/" + p)
 - .collect(Collectors.toList()); // 判斷當(dāng)前是否為最小節(jié)點(diǎn)
 - String firstNodePath = list.get(0);
 - // 最小節(jié)點(diǎn)是不是當(dāng)前節(jié)點(diǎn)
 - if (firstNodePath.equals(lockNode.getPath())) {
 - lockNode.setActive(true);
 - } else {
 - String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
 - zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
 - @Override
 - public void handleDataChange(String dataPath, Object data) throws Exception {
 - }
 - @Override
 - public void handleDataDeleted(String dataPath) throws Exception {
 - // 事件處理 與心跳 在同一個(gè)線程,如果Debug時(shí)占用太多時(shí)間,將導(dǎo)致本節(jié)點(diǎn)被刪除,從而影響鎖邏輯。
 - System.out.println("節(jié)點(diǎn)刪除:" + dataPath);
 - Lock lock = tryActiveLock(lockNode);
 - synchronized (lockNode) {
 - if (lock.isActive()) {
 - lockNode.notify(); // 釋放了
 - }
 - }
 - zkClient.unsubscribeDataChanges(upNodePath, this);
 - }
 - });
 - }
 - return lockNode;
 - }
 - public Lock createLockNode(String lockId) {
 - String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w");
 - return new Lock(lockId, nodePath);
 - }
 - }
 
示例Demo涉及組件zookeeper-lock。源代碼提交在 github :https://github.com/Niuh-Frame/niuh-zookeeper。





























 
 
 


 
 
 
 