ZK(ZooKeeper)分布式鎖實現(xiàn)
準(zhǔn)備
本文會使用到 三臺 獨立服務(wù)器,可以自行提前搭建好。
不知道如何搭建的,可以看我之前 ZooKeeper集群 搭建:Zookeeper 集群部署的那些事兒
關(guān)于ZooKeeper 一些基礎(chǔ)命令可以看這篇:Zookeeper入門看這篇就夠了
前言
在平時我們對鎖的使用,在針對單個服務(wù),我們可以用 Java 自帶的一些鎖來實現(xiàn),資源的順序訪問,但是隨著業(yè)務(wù)的發(fā)展,現(xiàn)在基本上公司的服務(wù)都是多個,單純的 Lock或者Synchronize 只能解決單個JVM線程的問題,那么針對于單個服務(wù)的 Java 的鎖是無法滿足我們業(yè)務(wù)的需要的,為了解決多個服務(wù)跨服務(wù)訪問共享資源,于是就有了分布鎖,分布式鎖產(chǎn)生的原因就是集群。
正文
實現(xiàn)分布式鎖的方式有哪些呢?
- 分布式鎖的實現(xiàn)方式主要以(ZooKeeper、Reids、Mysql)這三種為主
今天我們主要講解的是使用 ZooKeeper來實現(xiàn)分布式鎖,ZooKeeper的應(yīng)用場景主要包含這幾個方面:
- 服務(wù)注冊與訂閱(共用節(jié)點)
- 分布式通知(監(jiān)聽ZNode)
- 服務(wù)命令(ZNode特性)
- 數(shù)據(jù)訂閱、發(fā)布(Watcher)
- 分布式鎖(臨時節(jié)點)
ZooKeeper實現(xiàn)分布式鎖,主要是得益于ZooKeeper 保證了數(shù)據(jù)的強一致性,鎖的服務(wù)可以分為兩大類:
保持獨占
所有試圖來獲取當(dāng)前鎖的客戶端,最終有且只有一個能夠成功得到當(dāng)前鎖的鑰匙,通常我們會把 ZooKeeper 上的節(jié)點(ZNode)看做一把鎖,通過 create臨時節(jié)點的方式來實現(xiàn),當(dāng)多個客戶端都去創(chuàng)建一把鎖的時候,那么只有成功創(chuàng)建了那個客戶端才能擁有這把鎖
控制時序
所有試圖獲取鎖的客戶端,都是被順序執(zhí)行,只是會有一個序號(zxid),我們會有一個節(jié)點,例如:/testLock,所有臨時節(jié)點都在這個下面去創(chuàng)建,ZK的父節(jié)點(/testLock) 維持了一個序號,這個是ZK自帶的屬性,他保證了子節(jié)點創(chuàng)建的時序性,從而也形成了每個客戶端的一個 全局時序
ZK鎖機(jī)制
在實現(xiàn)ZooKeeper 分布式鎖之前我們有必要了解一下,關(guān)于ZooKeeper分布式鎖機(jī)制的實現(xiàn)流程和原理,不然各位寶貝,出去面試的時候怎么和面試官侃侃而談~
臨時順序節(jié)點
基于ZooKeeper的臨時順序節(jié)點 ,ZooKeeper比較適合來實現(xiàn)分布式鎖:
- 順序發(fā)號器: ZooKeeper的每一個節(jié)點,都是自帶順序生成器:在每個節(jié)點下面創(chuàng)建臨時節(jié)點,新的子節(jié)點后面,會添加一個次序編號,這個生成的編號,會在上一次的編號進(jìn)行 +1 操作
- 有序遞增: ZooKeeper節(jié)點有序遞增,可以保證鎖的公平性,我們只需要在一個持久父節(jié)點下,創(chuàng)建對應(yīng)的臨時順序節(jié)點,每個線程在嘗試占用鎖之前,會調(diào)用watch,判斷自己當(dāng)前的序號是不是在當(dāng)前父節(jié)點最小,如果是,那么獲取鎖
- Znode監(jiān)聽: 每個線程在搶占所之前,會創(chuàng)建屬于當(dāng)前線程的ZNode節(jié)點,在釋放鎖的時候,會刪除創(chuàng)建的ZNode,當(dāng)我們創(chuàng)建的序號不是最小的時候,會等待watch通知,也就是上一個ZNode的狀態(tài)通知,當(dāng)前一個ZNode刪除的時候,會觸發(fā)回調(diào)機(jī)制,告訴下一個ZNode,你可以獲取鎖開始工作了
- 臨時節(jié)點自動刪除: ZooKeeper還有一個好處,當(dāng)我們客戶端斷開連接之后,我們出創(chuàng)建的臨時節(jié)點會進(jìn)行自動刪除操作,所以我們在使用分布式鎖的時候,一般都是會去創(chuàng)建臨時節(jié)點,這樣可以避免因為網(wǎng)絡(luò)異常等原因,造成的死鎖。
- 羊群效應(yīng): ZooKeeper節(jié)點的順序訪問性,后面監(jiān)聽前面的方式,可以有效的避免 羊群效應(yīng),什么是羊群效應(yīng):當(dāng)某一個節(jié)點掛掉了,所有的節(jié)點都要去監(jiān)聽,然后做出回應(yīng),這樣會給服務(wù)器帶來比較大壓力,如果有了臨時順序節(jié)點,當(dāng)一個節(jié)點掛掉了,只有它后面的那一個節(jié)點才做出反應(yīng)。
我們現(xiàn)在看一下下面一張圖:
在上圖中, ZooKeeper里面有一把鎖節(jié)點 testLock,這個鎖就是 ZooKeeper的一個節(jié)點,當(dāng)兩個客戶端來獲取這把鎖的時候,會對 ZooKeeper進(jìn)行加鎖的請求,也就是我們所說的 臨時順序節(jié)點。
當(dāng)我們在 /testLock目錄下創(chuàng)建了一個順序臨時節(jié)點后,ZK會自動對這個臨時節(jié)點維護(hù) 一個節(jié)點序號,并且這個節(jié)點是遞增的,比如我們 clientA 創(chuàng)建了一個臨時順序節(jié)點,ZK內(nèi)部會生成一個序號:/lock0000000001,那么 clientB 也生成了一個臨時順序節(jié)點,ZK會生成一個序號為 /lock0000000002,在這里數(shù)字都是依次遞增的,從1開始遞增,ZK內(nèi)部會維護(hù)這個順序。
下圖所示:
這時候,ClientA會進(jìn)行監(jiān)聽判斷,在父節(jié)點下,我是不是最小的,如果是的話,那么俺就可以加鎖了,因為我是最小的,其他的都比我大。我自己可以進(jìn)行加鎖,你已經(jīng)是一個成熟的臨時節(jié)點了,要學(xué)會自己加鎖??龋敲碯K是怎么進(jìn)行判斷的呢?寶貝,您往下看:
這個是 cleintA已經(jīng)加鎖完成了,這個時候 clientB也要過來加鎖,那么他也要在 /testLock,創(chuàng)建一個屬于自己的臨時節(jié)點,那么這個時候他的序號就會變成 /lock0000000002,如下圖所示:
這個時候就會出現(xiàn)我們前面所講的,clientB 在加鎖的時候會判斷,自己是不是最小的,一看在當(dāng)前父節(jié)點下不是最小的,啊~我還挺大的,還有比我小的!!!
加鎖失敗呀,咳咳,這個時候呢,clientB 就會去偷窺clientA,氣氛逐漸曖昧起來,啊不是,是按照順序去監(jiān)聽前一個節(jié)點(clientA),是否完成工作了,如果完成了,clientB才可以進(jìn)行加鎖工作,寶貝,你往下看圖片:
clientA 加鎖成功后,會進(jìn)行自己的業(yè)務(wù)處理,當(dāng) clientA 處理完工作后,說我完事了,下一個,那么 clientA 是怎么完事的呢,他多長時間?不是,具體流程是怎樣的?小農(nóng)你不對勁,說什么呢!!!真羞澀
上面我們不是說了,當(dāng) clientB 加鎖失敗后,會給前一個節(jié)點(clientA)加上一個監(jiān)聽,當(dāng)clientA被刪除以后,就表示有人釋放了鎖,這個時候就會通知 clientB重新去獲取鎖。
這個時候clientB重新獲取鎖的時候,發(fā)現(xiàn)自己就是當(dāng)前父節(jié)點下面最小的那個,于是clientB就開始加鎖,開始工作等一系列操作,當(dāng)clientB 完事以后,釋放鎖,也說了一句,下一個。
如下圖所示:
當(dāng)然除了 clientA、clientB還有C\D\E等,這字母看著好奇怪又好熟悉,原理都是一樣的,都是最小節(jié)點進(jìn)行解鎖,如果不是,監(jiān)聽前一個節(jié)點是否釋放,如果釋放了,再次嘗試加鎖。如果前一節(jié)節(jié)點釋放了,自己就是最小了,就排到前面去了,有點類似于 銀行取號 的操作。
代碼實現(xiàn)
使用ZooKeeper 創(chuàng)建臨時順序節(jié)點來實現(xiàn)分布式鎖,大體的流程就是 先創(chuàng)建一個持久父節(jié)點,在當(dāng)前節(jié)點下,創(chuàng)建臨時順序節(jié)點,找出最小的序列號,獲取分布式鎖,程序業(yè)務(wù)完成之后釋放鎖,通知下一個節(jié)點進(jìn)行操作,使用的是watch來監(jiān)控節(jié)點的變化,然后依次下一個最小序列節(jié)點進(jìn)行操作。
首先我們需要創(chuàng)建一個持久父類節(jié)點:我這里是 /mxn
WatchCallBack
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
- /**
- * @program: mxnzookeeper
- * @ClassName WatchCallBack
- * @description:
- * @author: 微信搜索:牧小農(nóng)
- * @create: 2021-10-23 10:48
- * @Version 1.0
- **/
- public class WatchCallBack implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {
- ZooKeeper zk ;
- String threadName;
- CountDownLatch cc = new CountDownLatch(1);
- String pathName;
- public String getPathName() {
- return pathName;
- }
- public void setPathName(String pathName) {
- this.pathName = pathName;
- }
- public String getThreadName() {
- return threadName;
- }
- public void setThreadName(String threadName) {
- this.threadName = threadName;
- }
- public ZooKeeper getZk() {
- return zk;
- }
- public void setZk(ZooKeeper zk) {
- this.zk = zk;
- }
- /** @Author 牧小農(nóng)
- * @Description //TODO 嘗試加鎖方法
- * @Date 16:14 2021/10/24
- * @Param
- * @return
- **/
- public void tryLock(){
- try {
- System.out.println(threadName + " 開始創(chuàng)建。。。。");
- //創(chuàng)建一個順序臨時節(jié)點
- zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"abc");
- //阻塞當(dāng)前,監(jiān)聽前一個節(jié)點是否釋放鎖
- cc.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- /** @Author 牧小農(nóng)
- * @Description //TODO 解鎖方法
- * @Date 16:14 2021/10/24
- * @Param
- * @return
- **/
- public void unLock(){
- try {
- //釋放鎖,刪除臨時節(jié)點
- zk.delete(pathName,-1);
- //結(jié)束工作
- System.out.println(threadName + " 結(jié)束工作了....");
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void process(WatchedEvent event) {
- //如果第一個節(jié)點釋放了鎖,那么第二個就會收到回調(diào)
- //告訴它前一個節(jié)點釋放了,你可以開始嘗試獲取鎖
- switch (event.getType()) {
- case None:
- break;
- case NodeCreated:
- break;
- case NodeDeleted:
- //當(dāng)前節(jié)點重新獲取鎖
- zk.getChildren("/",false,this ,"sdf");
- break;
- case NodeDataChanged:
- break;
- case NodeChildrenChanged:
- break;
- }
- }
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- if(name != null ){
- System.out.println(threadName +" 線程創(chuàng)建了一個節(jié)點為 : " + name );
- pathName = name ;
- //監(jiān)聽前一個節(jié)點
- zk.getChildren("/",false,this ,"sdf");
- }
- }
- //getChildren call back
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- //節(jié)點按照編號,升序排列
- Collections.sort(children);
- //對節(jié)點進(jìn)行截取例如 /lock0000000022 截取后就是 lock0000000022
- int i = children.indexOf(pathName.substring(1));
- //是不是第一個,也就是說是不是最小的
- if(i == 0){
- //是第一個
- System.out.println(threadName +" 現(xiàn)在我是最小的....");
- try {
- zk.setData("/",threadName.getBytes(),-1);
- cc.countDown();
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }else{
- //不是第一個
- //監(jiān)聽前一個節(jié)點 看它是不是完成了工作進(jìn)行釋放鎖了
- zk.exists("/"+children.get(i-1),this,this,"sdf");
- }
- }
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- //判斷是否失敗exists
- }
- }
TestLock
- import com.mxn.zookeeper.config.ZKUtils;
- import org.apache.zookeeper.ZooKeeper;
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
- /**
- * @program: mxnzookeeper
- * @ClassName TestLock
- * @description:
- * @author: 微信搜索:牧小農(nóng)
- * @create: 2021-10-23 10:45
- * @Version 1.0
- **/
- public class TestLock {
- ZooKeeper zk ;
- @Before
- public void conn (){
- zk = ZKUtils.getZK();
- }
- @After
- public void close (){
- try {
- zk.close();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- @Test
- public void lock(){
- //創(chuàng)建十個線程
- for (int i = 0; i < 10; i++) {
- new Thread(){
- @Override
- public void run() {
- WatchCallBack watchCallBack = new WatchCallBack();
- watchCallBack.setZk(zk);
- String threadName = Thread.currentThread().getName();
- watchCallBack.setThreadName(threadName);
- //線程進(jìn)行搶鎖操作
- watchCallBack.tryLock();
- try {
- //進(jìn)行業(yè)務(wù)邏輯處理
- System.out.println(threadName+" 開始處理業(yè)務(wù)邏輯了...");
- Thread.sleep(200);
- }catch (Exception e){
- e.printStackTrace();
- }
- //釋放鎖
- watchCallBack.unLock();
- }
- }.start();
- }
- while(true){
- }
- }
- }
運行結(jié)果:
- Thread-1 線程創(chuàng)建了一個節(jié)點為 : /lock0000000112
- Thread-5 線程創(chuàng)建了一個節(jié)點為 : /lock0000000113
- Thread-2 線程創(chuàng)建了一個節(jié)點為 : /lock0000000114
- Thread-6 線程創(chuàng)建了一個節(jié)點為 : /lock0000000115
- Thread-9 線程創(chuàng)建了一個節(jié)點為 : /lock0000000116
- Thread-4 線程創(chuàng)建了一個節(jié)點為 : /lock0000000117
- Thread-7 線程創(chuàng)建了一個節(jié)點為 : /lock0000000118
- Thread-3 線程創(chuàng)建了一個節(jié)點為 : /lock0000000119
- Thread-8 線程創(chuàng)建了一個節(jié)點為 : /lock0000000120
- Thread-0 線程創(chuàng)建了一個節(jié)點為 : /lock0000000121
- Thread-1 現(xiàn)在我是最小的....
- Thread-1 開始處理業(yè)務(wù)邏輯了...
- Thread-1 結(jié)束工作了....
- Thread-5 現(xiàn)在我是最小的....
- Thread-5 開始處理業(yè)務(wù)邏輯了...
- Thread-5 結(jié)束工作了....
- Thread-2 現(xiàn)在我是最小的....
- Thread-2 開始處理業(yè)務(wù)邏輯了...
- Thread-2 結(jié)束工作了....
- Thread-6 現(xiàn)在我是最小的....
- Thread-6 開始處理業(yè)務(wù)邏輯了...
- Thread-6 結(jié)束工作了....
- Thread-9 現(xiàn)在我是最小的....
- Thread-9 開始處理業(yè)務(wù)邏輯了...
- Thread-9 結(jié)束工作了....
- Thread-4 現(xiàn)在我是最小的....
- Thread-4 開始處理業(yè)務(wù)邏輯了...
- Thread-4 結(jié)束工作了....
- Thread-7 現(xiàn)在我是最小的....
- Thread-7 開始處理業(yè)務(wù)邏輯了...
- Thread-7 結(jié)束工作了....
- Thread-3 現(xiàn)在我是最小的....
- Thread-3 開始處理業(yè)務(wù)邏輯了...
- Thread-3 結(jié)束工作了....
- Thread-8 現(xiàn)在我是最小的....
- Thread-8 開始處理業(yè)務(wù)邏輯了...
- Thread-8 結(jié)束工作了....
- Thread-0 現(xiàn)在我是最小的....
- Thread-0 開始處理業(yè)務(wù)邏輯了...
- Thread-0 結(jié)束工作了....
總結(jié)
ZK分布式鎖,能夠有效的解決分布式、不可重入的問題,在上面的案例中我, 沒有實現(xiàn)可重入鎖,但是實現(xiàn)起來也不麻煩,只需要帶上線程信息等唯一標(biāo)識,判斷一下就可以了
ZK實現(xiàn)分布式鎖具有天然的優(yōu)勢,臨時順序節(jié)點,可以有效的避免死鎖問題,讓客戶端斷開,那么就會刪除當(dāng)前臨時節(jié)點,讓下一個節(jié)點進(jìn)行工作。
如果文中有錯誤或者不了解的地方,歡迎留言,小農(nóng)看見了會第一時間回復(fù)大家,大家加油
我是牧小農(nóng),一個卑微的打工人,如果覺得文中的內(nèi)容對你有幫助,記得一鍵三連啊,你們的三連是小農(nóng)最大的動力。







































