偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

面試官:Zookeeper了解嗎?說(shuō)說(shuō)都有哪些使用場(chǎng)景?

開(kāi)發(fā)
本文主要來(lái)聊聊 Zookeeper 主要的幾個(gè)使用場(chǎng)景。

 前言

  • Zookeeper特性與節(jié)點(diǎn)說(shuō)明
  • Zookeeper客戶(hù)端使用與集群原理

前兩篇講了Zookeeper的特性、客戶(hù)端使用和集群原理,因?yàn)?Zookeeper 是分布式系統(tǒng)中很常見(jiàn)的一個(gè)基礎(chǔ)系統(tǒng)。 而且問(wèn)的話(huà)常問(wèn)的就是說(shuō) zookeeper 的使用場(chǎng)景是什么? 看你知道不知道一些基本的使用場(chǎng)景。 但是其實(shí) Zookeeper 挖深了自然是可以問(wèn)的很深很深的。本文主要來(lái)聊聊 Zookeeper 主要的幾個(gè)使用場(chǎng)景。

  1. 分布式集群管理
  2. 分布式注冊(cè)中心
  3. 分布式JOB
  4. 分布式鎖

分布式集群管理
分布式集群管理的需求

  1. 主動(dòng)查看線(xiàn)上服務(wù)節(jié)點(diǎn)
  2. 查看服務(wù)節(jié)點(diǎn)資源使用情況
  3. 服務(wù)離線(xiàn)通知
  4. 服務(wù)資源(CPU、內(nèi)存、硬盤(pán))超出閥值通知

架構(gòu)設(shè)計(jì)

節(jié)點(diǎn)結(jié)構(gòu)

  1. niuh-manger // 根節(jié)點(diǎn)
  2. server00001 : //服務(wù)節(jié)點(diǎn) 1
  3. server00002 ://服務(wù)節(jié)點(diǎn) 2
  4. server........n ://服務(wù)節(jié)點(diǎn) n

服務(wù)狀態(tài)信息

  1. ip
  2. cpu
  3. memory
  4. disk

功能實(shí)現(xiàn)
數(shù)據(jù)生成與上報(bào)

  1. 創(chuàng)建臨時(shí)節(jié)點(diǎn):
  2. 定時(shí)變更節(jié)點(diǎn)狀態(tài)信息:

主動(dòng)查詢(xún)

  • 實(shí)時(shí)查詢(xún) zookeeper 獲取集群節(jié)點(diǎn)的狀態(tài)信息。

被動(dòng)通知

  • 監(jiān)聽(tīng)根節(jié)點(diǎn)下子節(jié)點(diǎn)的變化情況,如果CPU 等硬件資源低于警告位則發(fā)出警報(bào)。

關(guān)鍵示例代碼

  1. package com.niuh.os; 
  2. import com.fasterxml.jackson.core.JsonProcessingException; 
  3. import com.fasterxml.jackson.databind.ObjectMapper; 
  4. import org.I0Itec.zkclient.ZkClient; 
  5. import java.lang.instrument.Instrumentation; 
  6. import java.lang.management.ManagementFactory; 
  7. import java.lang.management.MemoryUsage; 
  8. import java.net.InetAddress; 
  9. import java.net.UnknownHostException; 
  10. public class Agent { 
  11.     private static Agent ourInstance = new Agent(); 
  12.     private String server = "127.0.0.1:2181"
  13.     private ZkClient zkClient; 
  14.     private static final String rootPath = "/niuh-manger"
  15.     private static final String servicePath = rootPath + "/service"
  16.     private String nodePath; ///niuh-manger/service0000001 當(dāng)前節(jié)點(diǎn)路徑 
  17.     private Thread stateThread; 
  18.     public static Agent getInstance() { 
  19.         return ourInstance; 
  20.     } 
  21.     private Agent() { 
  22.     } 
  23.     // javaagent 數(shù)據(jù)監(jiān)控 
  24.     public static void premain(String args, Instrumentation instrumentation) { 
  25.         Agent.getInstance().init(); 
  26.     } 
  27.     public void init() { 
  28.         zkClient = new ZkClient(server, 5000, 10000); 
  29.         System.out.println("zk連接成功" + server); 
  30.         // 創(chuàng)建根節(jié)點(diǎn) 
  31.         buildRoot(); 
  32.         // 創(chuàng)建臨時(shí)節(jié)點(diǎn) 
  33.         createServerNode(); 
  34.         // 啟動(dòng)更新的線(xiàn)程 
  35.         stateThread = new Thread(() -> { 
  36.             while (true) { 
  37.                 updateServerNode(); 
  38.                 try { 
  39.                     Thread.sleep(5000); 
  40.                 } catch (InterruptedException e) { 
  41.                     e.printStackTrace(); 
  42.                 } 
  43.             } 
  44.         }, "zk_stateThread"); 
  45.         stateThread.setDaemon(true); 
  46.         stateThread.start(); 
  47.     } 
  48.     // 數(shù)據(jù)寫(xiě)到 當(dāng)前的臨時(shí)節(jié)點(diǎn)中去 
  49.     public void updateServerNode() { 
  50.         zkClient.writeData(nodePath, getOsInfo()); 
  51.     } 
  52.     // 生成服務(wù)節(jié)點(diǎn) 
  53.     public void createServerNode() { 
  54.         nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo()); 
  55.         System.out.println("創(chuàng)建節(jié)點(diǎn):" + nodePath); 
  56.     } 
  57.     // 更新服務(wù)節(jié)點(diǎn)狀態(tài) 
  58.     public String getOsInfo() { 
  59.         OsBean bean = new OsBean(); 
  60.         bean.lastUpdateTime = System.currentTimeMillis(); 
  61.         bean.ip = getLocalIp(); 
  62.         bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu(); 
  63.         MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); 
  64.         bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024; 
  65.         bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024; 
  66.         bean.pid = ManagementFactory.getRuntimeMXBean().getName(); 
  67.         ObjectMapper mapper = new ObjectMapper(); 
  68.         try { 
  69.             return mapper.writeValueAsString(bean); 
  70.         } catch (JsonProcessingException e) { 
  71.             throw new RuntimeException(e); 
  72.         } 
  73.     } 
  74.     public static String getLocalIp() { 
  75.         InetAddress addr = null
  76.         try { 
  77.             addr = InetAddress.getLocalHost(); 
  78.         } catch (UnknownHostException e) { 
  79.             throw new RuntimeException(e); 
  80.         } 
  81.         return addr.getHostAddress(); 
  82.     } 
  83.     public void buildRoot() { 
  84.         if (!zkClient.exists(rootPath)) { 
  85.             zkClient.createPersistent(rootPath); 
  86.         } 
  87.     } 

實(shí)現(xiàn)效果
啟動(dòng)參數(shù)設(shè)置

 

運(yùn)行測(cè)試用例:

  1. package com.niuh.test; 
  2. import com.niuh.os.Agent; 
  3. import org.junit.Ignore
  4. import org.junit.Test; 
  5. public class AgentTest { 
  6.     @Test 
  7.     @Ignore 
  8.     public void initTest() { 
  9.         Agent.premain(nullnull); 
  10.         runCPU(2); //20% 占用 
  11.         try { 
  12.             Thread.sleep(Long.MAX_VALUE); 
  13.         } catch (InterruptedException e) { 
  14.             e.printStackTrace(); 
  15.         } 
  16.     } 
  17.     // 
  18.     private void runCPU(int count) { 
  19.         for (int i = 0; i < count; i++) { 
  20.             new Thread(() -> { 
  21.                 while (true) { 
  22.                     long bac = 1000000; 
  23.                     bac = bac >> 1; 
  24.                 } 
  25.             }).start(); 
  26.             ; 
  27.         } 
  28.     } 

控制臺(tái)輸出:

  1. CPU 報(bào)警...22.55120088850181 
  2. CPU 報(bào)警...46.06592086097357CPU 報(bào)警...47.87206766163349CPU 報(bào)警...49.49176420213768CPU 報(bào)警...48.967942479969004CPU 報(bào)警...49.193921607021565CPU 報(bào)警...48.806604284784676CPU 報(bào)警...48.63229912951865CPU 報(bào)警...49.34509647972038CPU 報(bào)警...47.07551108884401CPU 報(bào)警...49.18489236134496CPU 報(bào)警...49.903007346777066CPU 報(bào)警...49.28868795953268// 關(guān)閉測(cè)試用例服務(wù)已下線(xiàn):OsBean{ip='192.168.43.11', cpu=49.28868795953268, usedMemorySize=56, usableMemorySize=3641, pid='47192@hejianhui', lastUpdateTime=1602056208842} 

本Demo不適用在生產(chǎn)環(huán)境,示例Demo涉及組件zookeeper-agent、zookeeper-web。源代碼提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。

分布式注冊(cè)中心
在單體式服務(wù)中,通常是由多個(gè)客戶(hù)端去調(diào)用一個(gè)服務(wù),只要在客戶(hù)端中配置唯一服務(wù)節(jié)點(diǎn)地址即可,當(dāng)升級(jí)到分布式后,服務(wù)節(jié)點(diǎn)變多,像一線(xiàn)大廠(chǎng)服務(wù)節(jié)點(diǎn)更是上萬(wàn)之多,這么多節(jié)點(diǎn)不可能手動(dòng)配置在客戶(hù)端,這里就需要一個(gè)中間服務(wù),專(zhuān)門(mén)用于幫助客戶(hù)端發(fā)現(xiàn)服務(wù)節(jié)點(diǎn),即許多技術(shù)書(shū)籍經(jīng)常提到的服務(wù)發(fā)現(xiàn)。

一個(gè)完整的注冊(cè)中心涵蓋以下功能特性:

  • 服務(wù)注冊(cè):提供者上線(xiàn)時(shí)將自提供的服務(wù)提交給注冊(cè)中心。
  • 服務(wù)注銷(xiāo):通知注冊(cè)心提供者下線(xiàn)。
  • 服務(wù)訂閱:動(dòng)態(tài)實(shí)時(shí)接收服務(wù)變更消息。
  • 可靠:注冊(cè)服務(wù)本身是集群的,數(shù)據(jù)冗余存儲(chǔ)。避免單點(diǎn)故障,及數(shù)據(jù)丟失。
  • 容錯(cuò):當(dāng)服務(wù)提供者出現(xiàn)宕機(jī),斷電等極情況時(shí),注冊(cè)中心能夠動(dòng)態(tài)感知并通知客戶(hù)端服務(wù)提供者的狀態(tài)。

Dubbo 對(duì) Zookeeper的使用
阿里著名的開(kāi)源項(xiàng)目Dubbo 是一個(gè)基于JAVA的RCP框架,其中必不可少的注冊(cè)中心可基于多種第三方組件實(shí)現(xiàn),但其官方推薦的還是Zookeeper作為注冊(cè)中心服務(wù)。

Dubbo Zookeeper注冊(cè)中心存儲(chǔ)結(jié)構(gòu)

節(jié)點(diǎn)說(shuō)明

流程說(shuō)明

  1. 服務(wù)提供者啟動(dòng)時(shí): 向 /dubbo/com.foo.BarService/providers 目錄下寫(xiě)入自己的 URL 地址
  2. 服務(wù)消費(fèi)者啟動(dòng)時(shí): 訂閱 /dubbo/com.foo.BarService/providers 目錄下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目錄下寫(xiě)入自己的 URL 地址
  3. 監(jiān)控中心啟動(dòng)時(shí): 訂閱 /dubbo/com.foo.BarService 目錄下的所有提供者和消費(fèi)者 URL 地址。

示例Demo
服務(wù)端代碼

  1. package com.niuh.zk.dubbo; 
  2. import com.alibaba.dubbo.config.ApplicationConfig; 
  3. import com.alibaba.dubbo.config.ProtocolConfig; 
  4. import com.alibaba.dubbo.config.RegistryConfig; 
  5. import com.alibaba.dubbo.config.ServiceConfig; 
  6. import java.io.IOException; 
  7. public class Server { 
  8.     public void openServer(int port) { 
  9.         // 構(gòu)建應(yīng)用 
  10.         ApplicationConfig config = new ApplicationConfig(); 
  11.         config.setName("simple-app"); 
  12.         // 通信協(xié)議 
  13.         ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", port); 
  14.         protocolConfig.setThreads(200); 
  15.         ServiceConfig<UserService> serviceConfig = new ServiceConfig(); 
  16.         serviceConfig.setApplication(config); 
  17.         serviceConfig.setProtocol(protocolConfig); 
  18.         serviceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); 
  19.         serviceConfig.setInterface(UserService.class); 
  20.         UserServiceImpl ref = new UserServiceImpl(); 
  21.         serviceConfig.setRef(ref); 
  22.         //開(kāi)始提供服務(wù)  開(kāi)張做生意 
  23.         serviceConfig.export(); 
  24.         System.out.println("服務(wù)已開(kāi)啟!端口:"+serviceConfig.getExportedUrls().get(0).getPort()); 
  25.         ref.setPort(serviceConfig.getExportedUrls().get(0).getPort()); 
  26.     } 
  27.     public static void main(String[] args) throws IOException { 
  28.         new Server().openServer(-1); 
  29.         System.in.read(); 
  30.     } 

客戶(hù)端代碼

  1. package com.niuh.zk.dubbo; 
  2. import com.alibaba.dubbo.config.ApplicationConfig; 
  3. import com.alibaba.dubbo.config.ReferenceConfig; 
  4. import com.alibaba.dubbo.config.RegistryConfig; 
  5. import java.io.IOException; 
  6. public class Client {    UserService service;    // URL 遠(yuǎn)程服務(wù)的調(diào)用地址    public UserService buildService(String url) {        ApplicationConfig config = new ApplicationConfig("young-app"); 
  7.         // 構(gòu)建一個(gè)引用對(duì)象        ReferenceConfig<UserService> referenceConfig = new ReferenceConfig<UserService>();        referenceConfig.setApplication(config); 
  8.         referenceConfig.setInterface(UserService.class);        // referenceConfig.setUrl(url);        referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); 
  9.         referenceConfig.setTimeout(5000); 
  10.         // 透明化        this.service = referenceConfig.get();        return service; 
  11.     }    static int i = 0; 
  12.     public static void main(String[] args) throws IOException {        Client client1 = new Client();        client1.buildService(""); 
  13.         String cmd;        while (!(cmd = read()).equals("exit")) { 
  14.             UserVo u = client1.service.getUser(Integer.parseInt(cmd));            System.out.println(u);        }    }    private static String read() throws IOException { 
  15.         byte[] b = new byte[1024]; 
  16.         int size = System.in.read(b); 
  17.         return new String(b, 0, size).trim(); 
  18.     }} 

查詢(xún) zk 實(shí)際存儲(chǔ)內(nèi)容:

  1. /dubbo 
  2. /dubbo/com.niuh.zk.dubbo.UserService/dubbo/com.niuh.zk.dubbo.UserService/configurators/dubbo/com.niuh.zk.dubbo.UserService/routers/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo://192.168.43.11:20880/com.niuh.zk.dubbo.UserService?anyhost=true&application=simple-app&dubbo=2.6.2&generic=false&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=48302&side=provider&threads=200×tamp=1602057895881/dubbo/com.niuh.zk.dubbo.UserService/consumers/dubbo/com.niuh.zk.dubbo.UserService/consumers/consumer://192.168.43.11com.niuh.zk.dubbo.UserService?application=young-app&category=consumers&check=false&dubbo=2.6.2&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=49036&side=consumer&timeout=5000×tamp=1602075359549 

示例Demo涉及組件zookeeper-dubbo。源代碼提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。

分布式JOB
分布式JOB需求
多個(gè)服務(wù)節(jié)點(diǎn)只允許其中一個(gè)主節(jié)點(diǎn)運(yùn)行JOB任務(wù)。
當(dāng)主節(jié)點(diǎn)掛掉后能自動(dòng)切換主節(jié)點(diǎn),繼續(xù)執(zhí)行JOB任務(wù)。
架構(gòu)設(shè)計(jì)

node結(jié)構(gòu)

  1. niuh-master
  2. server0001:master
  3. server0002:slave
  4. server000n:slave

選舉流程
服務(wù)啟動(dòng):

  1. 在niuh-maste下創(chuàng)建server子節(jié)點(diǎn),值為slave
  2. 獲取所有niuh-master 下所有子節(jié)點(diǎn)
  3. 判斷是否存在master 節(jié)點(diǎn)
  4. 如果沒(méi)有設(shè)置自己為master節(jié)點(diǎn)

子節(jié)點(diǎn)刪除事件觸發(fā):

  1. 獲取所有niuh-master 下所有子節(jié)點(diǎn)
  2. 判斷是否存在master 節(jié)點(diǎn)
  3. 如果沒(méi)有設(shè)置最小值序號(hào)為master 節(jié)點(diǎn)

示例Demo

  1. package com.niuh.zookeeper.master; 
  2. import org.I0Itec.zkclient.ZkClient; 
  3. import java.util.Map; 
  4. import java.util.stream.Collectors; 
  5. public class MasterResolve { 
  6.     private String server = "127.0.0.1:2181"
  7.     private ZkClient zkClient; 
  8.     private static final String rootPath = "/niuh-master"
  9.     private static final String servicePath = rootPath + "/service"
  10.     private String nodePath; 
  11.     private volatile boolean master = false
  12.     private static MasterResolve resolve; 
  13.     private MasterResolve() { 
  14.         zkClient = new ZkClient(server, 2000, 5000); 
  15.         buildRoot();        createServerNode();    }    public static MasterResolve getInstance() { 
  16.         if (resolve == null) { 
  17.             resolve= new MasterResolve(); 
  18.         }        return resolve; 
  19.     }    // 構(gòu)建根節(jié)點(diǎn) 
  20.     public void buildRoot() { 
  21.         if (!zkClient.exists(rootPath)) { 
  22.             zkClient.createPersistent(rootPath); 
  23.         } 
  24.     } 
  25.     // 創(chuàng)建server節(jié)點(diǎn) 
  26.     public void createServerNode() { 
  27.         nodePath = zkClient.createEphemeralSequential(servicePath, "slave"); 
  28.         System.out.println("創(chuàng)建service節(jié)點(diǎn):" + nodePath); 
  29.         initMaster(); 
  30.         initListener(); 
  31.     } 
  32.     private void initMaster() { 
  33.         boolean existMaster = zkClient.getChildren(rootPath) 
  34.                 .stream() 
  35.                 .map(p -> rootPath + "/" + p) 
  36.                 .map(p -> zkClient.readData(p)) 
  37.                 .anyMatch(d -> "master".equals(d)); 
  38.         if (!existMaster) { 
  39.             doElection(); 
  40.             System.out.println("當(dāng)前當(dāng)選master"); 
  41.         } 
  42.     } 
  43.     private void initListener() { 
  44.         zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> { 
  45.             doElection();//  執(zhí)行選舉 
  46.         }); 
  47.     } 
  48.     // 執(zhí)行選舉 
  49.     public void doElection() { 
  50.         Map<String, Object> childData = zkClient.getChildren(rootPath) 
  51.                 .stream() 
  52.                 .map(p -> rootPath + "/" + p) 
  53.                 .collect(Collectors.toMap(p -> p, p -> zkClient.readData(p))); 
  54.         if (childData.containsValue("master")) { 
  55.             return
  56.         } 
  57.         childData.keySet().stream().sorted().findFirst().ifPresent(p -> { 
  58.             if (p.equals(nodePath)) { // 設(shè)置最小值序號(hào)為master 節(jié)點(diǎn) 
  59.                 zkClient.writeData(nodePath, "master"); 
  60.                 master = true
  61.                 System.out.println("當(dāng)前當(dāng)選master" + nodePath); 
  62.             } 
  63.         }); 
  64.     } 
  65.     public static boolean isMaster() { 
  66.         return getInstance().master; 
  67.     } 

示例Demo涉及組件zookeeper-master。源代碼提交在 github :https://github.com/Niuh-Frame/niuh-zookeeper。

分布式鎖
鎖的的基本概念
開(kāi)發(fā)中鎖的概念并不陌生,通過(guò)鎖可以實(shí)現(xiàn)在多個(gè)線(xiàn)程或多個(gè)進(jìn)程間在爭(zhēng)搶資源時(shí),能夠合理的分配置資源的所有權(quán)。在單體應(yīng)用中我們可以通過(guò) synchronized 或 ReentrantLock 來(lái)實(shí)現(xiàn)鎖。但在分布式系統(tǒng)中,僅僅是加synchronized 是不夠的,需要借助第三組件來(lái)實(shí)現(xiàn)。比如一些簡(jiǎn)單的做法是使用關(guān)系型數(shù)據(jù)行級(jí)鎖來(lái)實(shí)現(xiàn)不同進(jìn)程之間的互斥,但大型分布式系統(tǒng)的性能瓶頸往往集中在數(shù)據(jù)庫(kù)操作上。為了提高性能得采用如Redis、Zookeeper之內(nèi)的組件實(shí)現(xiàn)分布式鎖。

共享鎖:也稱(chēng)作只讀鎖,當(dāng)一方獲得共享鎖之后,其它方也可以獲得共享鎖。但其只允許讀取。在共享鎖全部釋放之前,其它方不能獲得寫(xiě)鎖。

排它鎖:也稱(chēng)作讀寫(xiě)鎖,獲得排它鎖后,可以進(jìn)行數(shù)據(jù)的讀寫(xiě)。在其釋放之前,其它方不能獲得任何鎖。

鎖的獲取
某銀行賬戶(hù),可以同時(shí)進(jìn)行帳戶(hù)信息的讀取,但讀取期間不能修改帳戶(hù)數(shù)據(jù)。其賬戶(hù)ID為:888

獲得讀鎖流程

  1. 基于資源ID創(chuàng)建臨時(shí)序號(hào)讀鎖節(jié)點(diǎn) /lock/888.R0000000002 Read
  2. 獲取 /lock 下所有子節(jié)點(diǎn),判斷其最小的節(jié)點(diǎn)是否為讀鎖,如果是則獲鎖成功
  3. 最小節(jié)點(diǎn)不是讀鎖,則阻塞等待。添加lock/ 子節(jié)點(diǎn)變更監(jiān)聽(tīng)。
  4. 當(dāng)節(jié)點(diǎn)變更監(jiān)聽(tīng)觸發(fā),執(zhí)行第2步

數(shù)據(jù)結(jié)構(gòu)

獲得寫(xiě)鎖

  1. 基于資源ID創(chuàng)建臨時(shí)序號(hào)寫(xiě)鎖節(jié)點(diǎn) /lock/888.R0000000002 Write
  2. 獲取 /lock 下所有子節(jié)點(diǎn),判斷其最小的節(jié)點(diǎn)是否為自己,如果是則獲鎖成功
  3. 最小節(jié)點(diǎn)不是自己,則阻塞等待。添加lock/ 子節(jié)點(diǎn)變更監(jiān)聽(tīng)。
  4. 當(dāng)節(jié)點(diǎn)變更監(jiān)聽(tīng)觸發(fā),執(zhí)行第2步

釋放鎖
讀取完畢后,手動(dòng)刪除臨時(shí)節(jié)點(diǎn),如果獲鎖期間宕機(jī),則會(huì)在會(huì)話(huà)失效后自動(dòng)刪除。

關(guān)于羊群效應(yīng)
在等待鎖獲得期間,所有等待節(jié)點(diǎn)都在監(jiān)聽(tīng) Lock節(jié)點(diǎn),一但lock 節(jié)點(diǎn)變更所有等待節(jié)點(diǎn)都會(huì)被觸發(fā),然后在同時(shí)反查L(zhǎng)ock 子節(jié)點(diǎn)。如果等待對(duì)例過(guò)大會(huì)使用Zookeeper承受非常大的流量壓力。

為了改善這種情況,可以采用監(jiān)聽(tīng)鏈表的方式,每個(gè)等待隊(duì)列只監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn),如果前一個(gè)節(jié)點(diǎn)釋放鎖的時(shí)候,才會(huì)被觸發(fā)通知。這樣就形成了一個(gè)監(jiān)聽(tīng)鏈表。

示例Demo

  1. package com.niuh.zookeeper.lock; 
  2. import org.I0Itec.zkclient.IZkDataListener; 
  3. import org.I0Itec.zkclient.ZkClient; 
  4. import java.util.List; 
  5. import java.util.stream.Collectors; 
  6. public class ZookeeperLock { 
  7.     private String server = "127.0.0.1:2181"
  8.     private ZkClient zkClient; 
  9.     private static final String rootPath = "/niuh-lock1"
  10.     public ZookeeperLock() { 
  11.         zkClient = new ZkClient(server, 5000, 20000); 
  12.         buildRoot();    }    // 構(gòu)建根節(jié)點(diǎn) 
  13.     public void buildRoot() { 
  14.         if (!zkClient.exists(rootPath)) { 
  15.             zkClient.createPersistent(rootPath); 
  16.         } 
  17.     } 
  18.     // 獲取鎖 
  19.     public Lock lock(String lockId, long timeout) { 
  20.         // 創(chuàng)建臨時(shí)節(jié)點(diǎn) 
  21.         Lock lockNode = createLockNode(lockId); 
  22.         lockNode = tryActiveLock(lockNode);// 嘗試激活鎖 
  23.         if (!lockNode.isActive()) { 
  24.             try { 
  25.                 synchronized (lockNode) { 
  26.                     lockNode.wait(timeout); // 線(xiàn)程鎖住 
  27.                 } 
  28.             } catch (InterruptedException e) { 
  29.                 throw new RuntimeException(e); 
  30.             } 
  31.         } 
  32.         if (!lockNode.isActive()) { 
  33.             throw new RuntimeException(" lock  timeout"); 
  34.         } 
  35.         return lockNode; 
  36.     } 
  37.     // 釋放鎖 
  38.     public void unlock(Lock lock) { 
  39.         if (lock.isActive()) { 
  40.             zkClient.delete(lock.getPath()); 
  41.         } 
  42.     } 
  43.     // 嘗試激活鎖 
  44.     private Lock tryActiveLock(Lock lockNode) { 
  45.         // 獲取根節(jié)點(diǎn)下面所有的子節(jié)點(diǎn) 
  46.         List<String> list = zkClient.getChildren(rootPath) 
  47.                 .stream() 
  48.                 .sorted() 
  49.                 .map(p -> rootPath + "/" + p) 
  50.                 .collect(Collectors.toList());      // 判斷當(dāng)前是否為最小節(jié)點(diǎn) 
  51.         String firstNodePath = list.get(0); 
  52.         // 最小節(jié)點(diǎn)是不是當(dāng)前節(jié)點(diǎn) 
  53.         if (firstNodePath.equals(lockNode.getPath())) { 
  54.             lockNode.setActive(true); 
  55.         } else { 
  56.             String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1); 
  57.             zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() { 
  58.                 @Override 
  59.                 public void handleDataChange(String dataPath, Object data) throws Exception { 
  60.                 } 
  61.                 @Override 
  62.                 public void handleDataDeleted(String dataPath) throws Exception { 
  63.                     // 事件處理 與心跳 在同一個(gè)線(xiàn)程,如果Debug時(shí)占用太多時(shí)間,將導(dǎo)致本節(jié)點(diǎn)被刪除,從而影響鎖邏輯。 
  64.                     System.out.println("節(jié)點(diǎn)刪除:" + dataPath); 
  65.                      Lock lock = tryActiveLock(lockNode); 
  66.                     synchronized (lockNode) { 
  67.                         if (lock.isActive()) { 
  68.                             lockNode.notify(); // 釋放了 
  69.                         } 
  70.                     } 
  71.                     zkClient.unsubscribeDataChanges(upNodePath, this); 
  72.                 } 
  73.             }); 
  74.         } 
  75.         return lockNode; 
  76.     } 
  77.     public Lock createLockNode(String lockId) { 
  78.         String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w"); 
  79.         return new Lock(lockId, nodePath); 
  80.     } 

示例Demo涉及組件zookeeper-lock。源代碼提交在 github :https://github.com/Niuh-Frame/niuh-zookeeper。

 

責(zé)任編輯:姜華 來(lái)源: 今日頭條
相關(guān)推薦

2021-05-31 10:35:34

TCPWebSocket協(xié)議

2021-07-12 08:35:24

組件應(yīng)用場(chǎng)景

2021-09-16 07:52:18

算法應(yīng)用場(chǎng)景

2021-07-07 08:36:45

React應(yīng)用場(chǎng)景

2022-06-10 13:56:42

Java

2021-11-05 07:47:56

代理模式對(duì)象

2021-11-09 08:51:13

模式命令面試

2021-11-10 07:47:49

組合模式場(chǎng)景

2021-08-16 08:33:26

git

2021-11-04 06:58:32

策略模式面試

2021-11-03 14:10:28

工廠(chǎng)模式場(chǎng)景

2024-05-29 14:34:07

2021-09-06 10:51:27

TypeScript類(lèi)JavaScript

2021-09-28 07:12:09

測(cè)試路徑

2021-11-11 16:37:05

模板模式方法

2021-11-22 23:50:59

責(zé)任鏈模式場(chǎng)景

2021-06-08 08:33:23

NodeStream數(shù)據(jù)

2021-09-29 07:24:20

場(chǎng)景數(shù)據(jù)

2021-06-07 09:41:48

NodeBuffer 網(wǎng)絡(luò)協(xié)議

2021-09-10 06:50:03

TypeScript裝飾器應(yīng)用
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)