偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

揭開 Raft 的神秘面紗,和ApacheRatis 了解Raft 組件的使用

開發(fā) 前端
相比 Paxos, Raft 一直以來就是以易于理解著稱。今天我們以一年 Raft 使用者的角度,來看一下,別人根據(jù) Raft 論文實現(xiàn)了之后,我們一般要怎么樣使用。

[[337764]]

相比 Paxos, Raft 一直以來就是以易于理解著稱。今天我們以一年 Raft 使用者的角度,來看一下,別人根據(jù) Raft 論文實現(xiàn)了之后,我們一般要怎么樣使用。

俗話說,要想知道梨子的味道,就要親口嘗一嘗,沒吃過豬肉,也要見一見豬跑。否則別人再怎么樣形容,你可能還以為是像貓狗一類毛茸茸。

在 Raft 官網(wǎng)里長長的列表就能發(fā)現(xiàn),實現(xiàn) Raft 的框架目前不少。Java 里我大概看了螞蟻的 SOFARaft 和 Apache 的 Ratis。這次我們以 Ratis 為例,揭開面紗,來看看到底要怎樣使用。

當然,下面具體提到的例子,也是這些組件中自帶的 example。

一、編譯

github下載 Ratis 直接 mvn clean package 即可,如果編譯過程中出錯,可以先clean install ratis-proto

二、示例

Ratis 自帶的示例有三個:

  • arithmetic
  • counter
  • filestore

在 ratis-examples 模塊中,對于 arithmetic 和 filestore比較方便,可以通過main/bin目錄下的 shell 腳本快速啟動 Server 和 Client 來進行測試。

對于Raft,咱們都知道是需要多實例組成集群才能測試,你啟動一個實例沒啥用,連選主都成問題。Bin 目錄下的 start-all 支持 example 的名稱以及對應(yīng)的命令。比如 filestore server 代表是啟動 filestore 這個應(yīng)用的server。對應(yīng)的命令參數(shù)會在相應(yīng)example里的 cli 中解析。同時會一次性啟動三個server,組成一個集群并在周期內(nèi)完成選舉。

而對于 counter 這個示例,并沒有相應(yīng)的腳本來快速啟動三個server,這個我們可以通過命令行或者在IDE里以參數(shù)的形式啟動。

三、分析

下面我們來示例里看下 Raft Server 是怎樣工作的。

對于 counter 示例來說,我們啟動的時候,需要傳入一個參數(shù),代表當前的server是第幾個,目的在于,要從 peers 列表中得知該用哪個IP + 端口去啟動它。這里我們能發(fā)現(xiàn),這個 peers 列表,是在代碼內(nèi)提前設(shè)置好的。當然你說動態(tài)配置啥的,也沒啥問題,另外兩個示例是通過shell 腳本里common 中的配置傳入的。

所以,第一步我們看到, Raft Server 在啟動的時候,會通過「配置」的形式,來知道 peer 之間的存在,這樣才能彼此通信,讓別人給自己投票或者給別人投票,完成 Term 內(nèi)的選舉。另外,才能接收到 Leader 傳過來的 Log ,并且應(yīng)用到本地。

第二步,我們來看下 Client 和 集群之間是如何通信的。整個 Raft 集群可能有多個實例,我們知道必須通過 Leader 來完成寫操作。那怎樣知道誰是Leader?有什么辦法?

一般常見的思路有:

  • 在寫之前,先去集群內(nèi)查一下,誰是 Leader,然后再寫
  • 隨機拿一個寫,不行再換一個,不停的試,總會有一個成功。

當然方式二這樣試下去效率不太高。所以會在這個隨機試一次之后,集群會將當前的 Leader 信息返回給 Client,然后 Client 直接通過這個建立連接進行通信即可。

在 Ratis 里, Client 調(diào)用非 Leader 節(jié)點會收到 Server 拋出的一個異常,異常中會包含一個稱為 suggestLeader 的信息,表示當前正確的 Leader,按這個連上去就行。當然,如果如果在此過程中發(fā)生的 Leader 的變更,那就會有一個新的suggestLeader 返回來,再次重試。

我們來看 Counter 這個示例中的實現(xiàn)。

Server 和 Client 的共用的Common 代碼中,包含 peers 的聲明

  1. public final class CounterCommon { 
  2.   public static final List<RaftPeer> PEERS = new ArrayList<>(3); 
  3.  
  4.   static { 
  5.     PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n1"), "127.0.0.1:6000")); 
  6.     PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n2"), "127.0.0.1:6001")); 
  7.     PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n3"), "127.0.0.1:6002")); 
  8.   } 

這里聲明了三個節(jié)點。

通過命令行啟動時,會直接把index 傳進來, index 取值1-3。

  1. java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex} 

然后在Server 啟動的時候,拿到對應(yīng)的配置信息。

  1. //find current peer object based on application parameter 
  2.     RaftPeer currentPeer = 
  3.         CounterCommon.PEERS.get(Integer.parseInt(args[0]) - 1); 

再設(shè)置存儲目錄

  1. //set the storage directory (different for each peer) in RaftProperty object 
  2.     File raftStorageDir = new File("./" + currentPeer.getId().toString()); 
  3.     RaftServerConfigKeys.setStorageDir(properties, 
  4.         Collections.singletonList(raftStorageDir)) 

重點看這里,每個 Server 都會有一個狀態(tài)機「CounterStateMachine」,平時我們的「業(yè)務(wù)邏輯」都放到這里

  1. //create the counter state machine which hold the counter value 
  2.     CounterStateMachine counterStateMachine = new CounterStateMachine(); 

客戶端發(fā)送的命令,會在這個狀態(tài)機中被執(zhí)行,同時這些命令又以Log 的形式復(fù)制給其它節(jié)點,各個節(jié)點的Log 又會在它自己的狀態(tài)機里執(zhí)行,從而保證各個節(jié)點狀態(tài)的一致。

 

最后根據(jù)這些配置,生成 Raft Server 實例并啟動。

  1. //create and start the Raft server 
  2.     RaftServer server = RaftServer.newBuilder() 
  3.         .setGroup(CounterCommon.RAFT_GROUP) 
  4.         .setProperties(properties) 
  5.         .setServerId(currentPeer.getId()) 
  6.         .setStateMachine(counterStateMachine) 
  7.         .build(); 
  8.     server.start(); 

CounterStateMachine 里,應(yīng)用計數(shù)的這一小段代碼,我們看先檢查了命令是否合法,然后執(zhí)行命令

  1. //check if the command is valid 
  2.     String logData = entry.getStateMachineLogEntry().getLogData() 
  3.         .toString(Charset.defaultCharset()); 
  4.     if (!logData.equals("INCREMENT")) { 
  5.       return CompletableFuture.completedFuture( 
  6.           Message.valueOf("Invalid Command")); 
  7.     } 
  8.     //update the last applied term and index 
  9.     final long index = entry.getIndex(); 
  10.     updateLastAppliedTermIndex(entry.getTerm(), index); 
  11.  
  12.     //actual execution of the command: increment the counter 
  13.     counter.incrementAndGet(); 
  14.  
  15.     //return the new value of the counter to the client 
  16.     final CompletableFuture<Message> f = 
  17.         CompletableFuture.completedFuture(Message.valueOf(counter.toString())); 
  18.  
  19.     //if leader, log the incremented value and it's log index 
  20.     if (trx.getServerRole() == RaftProtos.RaftPeerRole.LEADER) { 
  21.       LOG.info("{}: Increment to {}"index, counter.toString()); 
  22.     } 

我們再來看 Client 的實現(xiàn)。

和 Server 類似,通過配置屬性,創(chuàng)建一個實例

  1. private static RaftClient buildClient() { 
  2.     RaftProperties raftProperties = new RaftProperties(); 
  3.     RaftClient.Builder builder = RaftClient.newBuilder() 
  4.         .setProperties(raftProperties) 
  5.         .setRaftGroup(CounterCommon.RAFT_GROUP) 
  6.         .setClientRpc( 
  7.             new GrpcFactory(new Parameters()) 
  8.                 .newRaftClientRpc(ClientId.randomId(), raftProperties)); 
  9.     return builder.build(); 
  10.   } 

然后就可以向Server發(fā)送命令開工了。

  1. raftClient.send(Message.valueOf("INCREMENT")); 

Counter 的狀態(tài)機支持INCREMENT 和 GET 兩個命令。所以example 最后執(zhí)行了一個 GET 的命令來獲取最終的計數(shù)結(jié)果

  1. RaftClientReply count = raftClient.sendReadOnly(Message.valueOf("GET")); 

四、內(nèi)部部分實現(xiàn)

RaftClientImpl 里,初期會從peers列表中選一個,當成leader 去請求。

  1. RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, 
  2.       RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) { 
  3.     this.clientId = clientId; 
  4.     this.clientRpc = clientRpc; 
  5.     this.peers = new ConcurrentLinkedQueue<>(group.getPeers()); 
  6.     this.groupId = group.getGroupId(); 
  7.     this.leaderId = leaderId != null? leaderId 
  8.         : !peers.isEmpty()? peers.iterator().next().getId(): null
  9.     ... 
  10.   } 

之后,會根據(jù)server 返回的不同異常分別處理。

  1. private RaftClientReply sendRequest(RaftClientRequest request) throws IOException { 
  2.     RaftClientReply reply; 
  3.     try { 
  4.       reply = clientRpc.sendRequest(request); 
  5.     } catch (GroupMismatchException gme) { 
  6.       throw gme; 
  7.     } catch (IOException ioe) { 
  8.       handleIOException(request, ioe); 
  9.     } 
  10.     reply = handleLeaderException(request, reply, null); 
  11.     reply = handleRaftException(reply, Function.identity()); 
  12.     return reply; 
  13.   } 

比如在 handleLeaderException 中,又分幾種情況,因為通過Client 來和 Server 進行通訊的時候,會隨機從peers里選擇一個,做為leader去請求,如果 Server 返回異常,說它不是leader,就用下面的代碼,隨機從另外的peer里選擇一個再去請求。

  1. final RaftPeerId oldLeader = request.getServerId(); 
  2.     final RaftPeerId curLeader = leaderId; 
  3.     final boolean stillLeader = oldLeader.equals(curLeader); 
  4.     if (newLeader == null && stillLeader) { 
  5.       newLeader = CollectionUtils.random(oldLeader, 
  6.           CollectionUtils.as(peers, RaftPeer::getId)); 
  7.     } 
  8.  
  9.  static <T> T random(final T given, Iterable<T> iteration) { 
  10.     Objects.requireNonNull(given, "given == null"); 
  11.     Objects.requireNonNull(iteration, "iteration == null"); 
  12.  
  13.     final List<T> list = StreamSupport.stream(iteration.spliterator(), false
  14.         .filter(e -> !given.equals(e)) 
  15.         .collect(Collectors.toList()); 
  16.     final int size = list.size(); 
  17.     return size == 0? null: list.get(ThreadLocalRandom.current().nextInt(size)); 
  18.   } 

是不是感覺很低效。如果這個時候,server 返回的信息里,告訴client 誰是 leader,那client 直接連上去就可以了是吧。

  1. /** 
  2.    * @return null if the reply is null or it has 
  3.    * {@link NotLeaderException} or {@link LeaderNotReadyException} 
  4.    * otherwise return the same reply. 
  5.    */ 
  6.   RaftClientReply handleLeaderException(RaftClientRequest request, RaftClientReply reply, 
  7.                                         Consumer<RaftClientRequest> handler) { 
  8.     if (reply == null || reply.getException() instanceof LeaderNotReadyException) { 
  9.       return null
  10.     } 
  11.     final NotLeaderException nle = reply.getNotLeaderException(); 
  12.     if (nle == null) { 
  13.       return reply; 
  14.     } 
  15.     return handleNotLeaderException(request, nle, handler); 
  16.   }
  1. RaftClientReply handleNotLeaderException(RaftClientRequest request, NotLeaderException nle, 
  2.       Consumer<RaftClientRequest> handler) { 
  3.     refreshPeers(nle.getPeers()); 
  4.     final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null 
  5.         : nle.getSuggestedLeader().getId(); 
  6.     handleIOException(request, nle, newLeader, handler); 
  7.     return null
  8.   } 

我們會看到,在異常的信息中,如果能夠提取出一個 suggestedLeader,這時候就會做為新的leaderId來使用,下次直接連接了。

本文轉(zhuǎn)載自微信公眾號「Tomcat那些事兒」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系Tomcat那些事兒公眾號。

 

責(zé)任編輯:武曉燕 來源: Tomcat那些事兒
相關(guān)推薦

2015-08-20 13:43:17

NFV網(wǎng)絡(luò)功能虛擬化

2010-05-17 09:13:35

2021-06-07 08:18:12

云計算云端阿里云

2014-03-12 11:11:39

Storage vMo虛擬機

2009-09-15 15:34:33

Google Fast

2016-04-06 09:27:10

runtime解密學(xué)習(xí)

2023-11-02 09:55:40

2010-05-26 19:12:41

SVN沖突

2009-06-01 09:04:44

Google WaveWeb

2018-03-01 09:33:05

軟件定義存儲

2024-08-19 08:07:52

2020-11-03 14:31:55

Ai人工智能深度學(xué)習(xí)

2017-10-16 05:56:00

2021-08-11 09:01:48

智能指針Box

2011-08-02 08:59:53

2021-07-28 21:49:01

JVM對象內(nèi)存

2021-09-17 15:54:41

深度學(xué)習(xí)機器學(xué)習(xí)人工智能

2010-06-17 10:53:25

桌面虛擬化

2020-04-14 10:44:01

區(qū)塊鏈滲透測試比特幣

2021-05-25 09:01:21

Linux命令Bash histor
點贊
收藏

51CTO技術(shù)棧公眾號