偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

反問面試官:如何實現(xiàn)集群內選主

開發(fā) 前端
本文主要演示了一個簡易的多Server的選主過程,以下代碼是一個簡單的基于Netty實現(xiàn)的集群選舉過程的示例。

面試官經(jīng)常喜歡問什么zookeeper選主原理、什么CAP理論、什么數(shù)據(jù)一致性。經(jīng)常都被問煩了,我就想問問面試官,你自己還會實現(xiàn)一個簡單的集群內選主呢?估計大部分面試官自己也寫不出來。

本篇使用 Java 和 Netty 實現(xiàn)簡單的集群選主過程的示例。

這個示例展示了多個節(jié)點通過投票選舉一個新的主節(jié)點的過程。Netty 用于節(jié)點間的通信,而每個節(jié)點則負責發(fā)起和響應選舉消息。

集群選主流程

1.選主流程

咱們且不說zookeeper如何選主,單說人類選主,也是采用少數(shù)服從多數(shù)的原則。人類選主時,中間會經(jīng)歷如下過程:

  • 如果我沒有熟悉的或者沒找到能力比我強的,首先投給自己一票。
  • 隨著時間推移,可能后面的人介紹了各自的特點和實力,那我可能會改投給別人。
  • 所有人將投票信息放入到統(tǒng)計箱中。
  • 最終票數(shù)最多的人是領導者。

同樣的,zookeeper在選主時,也是這樣的流程。假設有五大服務器:

  • 服務器1先給自身投票
  • 后續(xù)起來的服務器2也會投自身一票,然后服務器1觀察到服務器2的id比較大,則會改投服務器2
  • 后續(xù)起來的服務器3也會投自身一票,然后服務1和服務器2發(fā)現(xiàn)服務器3的id比較大,則都會改投服務器3。服務器3被確定為領導者。
  • 服務器4起來后也會投自身一票,然后發(fā)現(xiàn)服務器3已經(jīng)有3票了,立馬改投服務器3。
  • 服務器5與服務器4的操作一樣。

2.選主協(xié)議

在選主過程中采用的是超過半數(shù)的協(xié)議。在選主過程中,會需要如下幾類消息:

  • 投票請求:節(jié)點發(fā)出自己的投票請求。
  • 接受投票:其余節(jié)點作出判斷,如果覺得id較大,則接受投票。
  • 選舉勝出:當選主節(jié)點后,廣播勝出消息。

代碼實現(xiàn)

下面模擬3個節(jié)點的選主過程,核心步驟如下:

(1) 定義消息類型、消息對象、節(jié)點信息

public enum MessageType {
        VOTE_REQUEST, // 投票請求
        VOTE,         // 投票
        ELECTED       // 選舉完成后的勝出消息
}
    
public class ElectionMessage implements Serializable {
    private MessageType type;
    private int nodeId;   // 節(jié)點ID
    private long zxId;    // ZXID:類似于ZooKeeper中的邏輯時鐘,用于比較
    private int voteFor;  // 投票給的節(jié)點ID
}

public class ElectionNode {
    private int nodeId; // 當前節(jié)點ID
    private long zxId;  // 當前節(jié)點的ZXID
    private volatile int leaderId; // 當前選舉的Leader ID
    private String host;
    private int port;
    private ConcurrentHashMap<Integer, Integer> voteMap = new ConcurrentHashMap<>(); // 此節(jié)點對每個節(jié)點的投票情況
    private int totalNodes; // 集群總節(jié)點數(shù)
}

(2) 每個節(jié)點利用Netty啟動Server

public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new ObjectEncoder(),
                                    new ElectionHandler(ElectionNode.this));
                        }
                    });

            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("Node " + nodeId + " started on port " + port);

            // 啟動后開始選舉過程
            startElection();
//            future.channel().closeFuture().sync();


        } catch (Exception e) {

        } finally {
//            bossGroup.shutdownGracefully();
//            workerGroup.shutdownGracefully();
        }
    }

(3) 啟動后利用Netty發(fā)送投票請求

public void sendVoteRequest(String targetHost, int targetPort) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new ObjectEncoder(),
                                    new ElectionHandler(ElectionNode.this));
                        }
                    });

            ChannelFuture future = bootstrap.connect(targetHost, targetPort).sync();
            ElectionMessage voteRequest = new ElectionMessage(ElectionMessage.MessageType.VOTE_REQUEST, nodeId, zxId, nodeId);
            future.channel().writeAndFlush(voteRequest);
//            future.channel().closeFuture().sync();
        } catch (Exception e) {

        } finally {
//            group.shutdownGracefully();
        }
    }

(4) 節(jié)點接受到投票請求后,做相關處理

節(jié)點在收到消息后,做相關邏輯處理:處理投票請求、處理確認投票、處理選主結果。

處理投票請求:判斷是否是否接受投票信息。只有在主節(jié)點沒確定并且zxId較大時,才發(fā)送投票消息。如果接受了投票請求的話,則更新本地的投票邏輯,然后給投票節(jié)點發(fā)送接受投票的消息

處理確認投票:如果投票消息被接受了,則更新本地的投票邏輯。

處理選主結果:如果收到了選主結果的消息,則更新本地的主節(jié)點。

public class ElectionHandler extends ChannelInboundHandlerAdapter {
    private final ElectionNode node;

    public ElectionHandler(ElectionNode node) {
        this.node = node;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ElectionMessage electionMessage = (ElectionMessage) msg;
        System.out.println("Node " + node.getNodeId() + " received: " + electionMessage);

        if (electionMessage.getType() == ElectionMessage.MessageType.VOTE_REQUEST) {
            // 判斷是否是否接受投票信息。只有在主節(jié)點沒確定并且zxId較大時,才發(fā)送投票消息
            // 如果接受了投票請求的話,則更新本地的投票邏輯,然后給投票節(jié)點發(fā)送接受投票的消息
            if (electionMessage.getZxId() >= node.getZxId() && node.getLeaderId() == 0) {
                node.receiveVote(electionMessage.getNodeId());
                ElectionMessage voteMessage = new ElectionMessage(ElectionMessage.MessageType.VOTE, electionMessage.getNodeId(), electionMessage.getZxId(), electionMessage.getNodeId());
                ctx.writeAndFlush(voteMessage);
            } else {
                // 如果已經(jīng)確定主節(jié)點了,直接發(fā)送ELECTED消息
                sendLeaderInfo(ctx);
            }
        } else if (electionMessage.getType() == ElectionMessage.MessageType.VOTE) {
            // 如果投票消息被接受了,則更新本地的投票邏輯。
            if (electionMessage.getZxId() >= node.getZxId() && node.getLeaderId() == 0) {
                node.receiveVote(electionMessage.getNodeId());
            } else {
                // 如果已經(jīng)確定主節(jié)點了,直接發(fā)送ELECTED消息
                sendLeaderInfo(ctx);
            }
        } else if (electionMessage.getType() == ElectionMessage.MessageType.ELECTED) {
            if (node.getLeaderId() == 0) {
                node.setLeaderId(electionMessage.getVoteFor());
            }
        }
    }

(5) 接受別的節(jié)點的投票

這里是比較關鍵的一步,當確定接受某個節(jié)點時,則更新本地的投票數(shù),然后判斷投票數(shù)是否超過半數(shù),超過半數(shù)則確定主節(jié)點。同時,再將主節(jié)點廣播出去。

此時,其余節(jié)點接收到選主確認的消息后,都會更新自己的本地的主節(jié)點信息。

public void receiveVote(int nodeId) {
    voteMap.merge(nodeId, 1, Integer::sum);
    // 比較出votes里值,取出最大的那個對應的key
    int currentVotes = voteMap.values().stream().max(Integer::compareTo).get();

    if (currentVotes > totalNodes / 2 && leaderId == 0) {
        setLeaderId(nodeId);
        broadcastElected();
    }
}

(6) 廣播選主結果

/**
 * 廣播選舉結果
 */
private void broadcastElected() {
    for (int i = 1; i <= totalNodes; i++) {
        if (i != nodeId) {
            sendElectedMessage(host, 9000 + i);
        }
    }
}

/**
 * 發(fā)送選舉結果
 *
 * @param targetHost
 * @param targetPort
 */
public void sendElectedMessage(String targetHost, int targetPort) {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(
                                new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                new ObjectEncoder(),
                                new ElectionHandler(ElectionNode.this));
                    }
                });

        ChannelFuture future = bootstrap.connect(targetHost, targetPort).sync();
        ElectionMessage electedMessage = new ElectionMessage(ElectionMessage.MessageType.ELECTED, leaderId, zxId, leaderId);
        future.channel().writeAndFlush(electedMessage);
//            future.channel().closeFuture().sync();
    } catch (Exception e) {

    } finally {
//            group.shutdownGracefully();
    }
}

(7) 完整代碼

完整代碼:https://gitee.com/yclxiao/specialty/blob/master/javacore/src/main/java/com/ycl/election/ElectionHandler.java

總結

本文主要演示了一個簡易的多Server的選主過程,以上代碼是一個簡單的基于Netty實現(xiàn)的集群選舉過程的示例。在實際場景中,選舉邏輯遠比這個復雜,需要處理更多的網(wǎng)絡異常、重復消息、并發(fā)問題等。

責任編輯:趙寧寧 來源: 程序員半支煙
相關推薦

2024-09-24 10:28:22

2024-04-03 00:00:00

Redis集群代碼

2024-03-20 15:12:59

KafkaES中間件

2024-02-20 14:10:55

系統(tǒng)緩存冗余

2024-09-11 22:51:19

線程通訊Object

2023-11-20 10:09:59

2024-04-09 10:40:04

2024-01-19 14:03:59

Redis緩存系統(tǒng)Spring

2024-01-26 13:16:00

RabbitMQ延遲隊列docker

2024-10-22 16:39:07

2015-08-13 10:29:12

面試面試官

2021-05-20 08:54:16

Go面向對象

2021-12-15 06:58:13

List 集合LinkedHashS

2024-02-04 10:08:34

2023-02-16 08:10:40

死鎖線程

2021-05-20 08:34:03

CDN原理網(wǎng)絡

2024-12-25 15:44:15

2024-09-09 15:09:30

2021-10-26 10:29:45

掃碼登錄功能

2021-05-19 06:07:21

CSS 斜線效果技巧
點贊
收藏

51CTO技術棧公眾號