偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Kafka 遷移工具 MirrorMaker2 原理起底

開發(fā) 架構(gòu)
負(fù)載均衡方式會(huì)引起比較明顯的驚群效應(yīng),比如在 Kafka Connect 集群擴(kuò)縮容的時(shí)候,不是新擴(kuò)縮容的節(jié)點(diǎn)也會(huì)出現(xiàn)較長的 stop-the-world 問題,在 K8s 環(huán)境中如果有節(jié)點(diǎn)需要進(jìn)行滾動(dòng)升級(jí),也會(huì)出現(xiàn)類似的問題。這種負(fù)載均衡方式在 Kafka 中稱之為 Eager Rebalance。

注意:本文內(nèi)容截止到 2024 年 2 月 26 日發(fā)布的 Kafka 3.7.0 版本。

MirrorMaker2(后文簡(jiǎn)稱 MM2)在 2019 年 12 月隨 Kafka 2.4.0 一起推出。顧名思義,是為了解決 Kafka 集群之間數(shù)據(jù)復(fù)制和數(shù)據(jù)同步的問題而誕生的 Kafka 官方的數(shù)據(jù)復(fù)制工具。在實(shí)際生產(chǎn)中,經(jīng)常被用來實(shí)現(xiàn) Kafka 數(shù)據(jù)的備份,遷移和災(zāi)備等目的。

在此也預(yù)告一下,AutoMQ 基于 MM2 的遷移產(chǎn)品化功能也即將和大家見面,可以幫助用戶更好更快從自建 Kafka 遷移到 AutoMQ,歡迎大家屆時(shí)使用。

1、 安裝部署 

MM2 一共有三種部署模式,dedicated mode,standalone mode 和 Kafka connect mode。

部署模式

Dedicated mode

直接部署 Kafka MM2,啟動(dòng)命令如下:

./bin/connect-mirror-maker.sh connect-mirror-maker.properties

此時(shí) MM2 依然是基于 Kafka Connect,對(duì)外封裝掉了 Kafka Connect 的復(fù)雜度,與此同時(shí)也支持分布式部署。One-line 直接拉起 MM2 以及背后的 Kafka Connect,不過相比較來說也喪失掉了一些 Kafka Connect 的靈活性(閹割了 Kafka Connect 對(duì)外的 RESTful API)。

Standalone mode

Standalone mode 更像是為測(cè)試環(huán)境設(shè)計(jì)的,并不支持分布式部署。這一點(diǎn)在 KIP-382[1] 中也有說明。因?yàn)椴皇且粋€(gè)生產(chǎn)可用的版本,在此不作多贅述。

Kafka Connect mode

此時(shí)整個(gè) MM2 的部署是需要一個(gè)現(xiàn)成的 Kafka Connect 集群的,MM2 會(huì)在 Kafka Connect 上部署自己的 Connector 來完成整個(gè)遷移過程。因?yàn)?Kafka Connect mode 是 MM2 最復(fù)雜的部署模式,而且無論是 Dedicated mode 還是 Kafka Connect mode,背后的原理都是一樣,只是前者進(jìn)行了封裝,因此了解 MM2 在 Kafka Connect 上的工作流程最有利于我們對(duì) MM2 有全局了解。

Kafka Connect 在 Kafka 0.9.0 版本中進(jìn)行推出,旨在簡(jiǎn)化數(shù)據(jù)集成和數(shù)據(jù)流管道的構(gòu)建,同時(shí)提供了一種可拓展,可靠的方式來連接 Kafka 與外部系統(tǒng)?;谶@樣的設(shè)計(jì),MM2 基于 Kafka Connect 進(jìn)行實(shí)現(xiàn)是非常自然的事情。

我們可以把基于 Kafka Connect mode 進(jìn)行部署的 MM2 里的調(diào)度資源分為以下幾種:

? Worker:一個(gè) MM2 或者 Kafka Connect 進(jìn)程,是進(jìn)行分布式部署時(shí)的基本單位。

? Connector:?jiǎn)蝹€(gè) Worker 內(nèi)部執(zhí)行遷移任務(wù)的連接器,一個(gè) Worker 內(nèi)可以有多個(gè) Connector,每個(gè) Connector 負(fù)責(zé)相對(duì)獨(dú)立的功能。

? Task:Connector 將需要遷移的任務(wù)進(jìn)行切分,Task 是并發(fā)執(zhí)行的最小單位。

Kafka Connect 集群

在 Kafka Connect Mode 下,我們需要先準(zhǔn)備一個(gè) Kafka Connect 集群,在每個(gè)節(jié)點(diǎn)上執(zhí)行以下命令即可啟動(dòng) Kafka Connect 集群。

./bin/connect-distributed.sh config/connect-distributed.properties

在 Kafka Connect 集群部署完成之后,我們可以利用 Kafka Connect 提供的 RESTful API 來啟動(dòng) MM2 所需要的所有 Connectors。默認(rèn)情況下,Kafka Connect 提供的端口為 8083。即使 Kafka Connect 集群中有多個(gè)節(jié)點(diǎn),但是執(zhí)行下列的命令只需要向集群中的任一節(jié)點(diǎn)發(fā)起請(qǐng)求即可。

Connector

假設(shè)節(jié)點(diǎn) IP 為本機(jī),啟動(dòng)三個(gè) Connector 的命令如下(實(shí)際上向當(dāng)前 Kafka Connect 集群中的任一節(jié)點(diǎn)發(fā)起請(qǐng)求即可):

# MirrorSourceConnector
curl -X POST -H "Content-Type: application/json" --data @mirror-source-connector.properties http://127.0.0.1:8083/connectors
# MirrorCheckpointConnector
curl -X POST -H "Content-Type: application/json" --data @mirror-checkpoint-connector.properties http://127.0.0.1:8083/connectors
# MirrorHeartbeatConnector
curl -X POST -H "Content-Type: application/json" --data @mirror-heartbeat-connector.properties http://127.0.0.1:8083/connectors

其中 mirror-source-connector.properties,mirror-checkpoint-connector.properties 和 mirror-heartbeat-connector.properties 為對(duì)應(yīng) Connector 的配置文件。

在啟動(dòng)完 Connector 之后,我們還可以使用以下命令查看當(dāng)前 Kafka Connect 集群中已經(jīng)存在的 Connectors。

$ curl http://127.0.0.1:8083/connectors
["mm2-heartbeat-connector","mm2-source-connector","mm2-checkpoint-connector"]%

更多關(guān)于 Kafka Connect RESTful API 的細(xì)節(jié),可以參考 Kafka Connect 101: Kafka Connect's REST API[2]。

2、工作流   

從上文可以看到,在 MM2 中,有三個(gè) Connector,它們負(fù)責(zé)完成整個(gè)副本復(fù)制過程,這三個(gè) Connector 包括:

? MirrorSourceConnector:同步源集群中 topic 的消息數(shù)據(jù)到目標(biāo)集群。

? MirrorCheckpointConnector:將源集群的消費(fèi)位點(diǎn)翻譯并同步到目標(biāo)集群。

? MirrorHeartbeatConnector:定時(shí)往源集群中發(fā)送心跳,驗(yàn)證和監(jiān)控兩個(gè)集群之間連接和遷移任務(wù)的運(yùn)行情況。

對(duì)于 MirrorSourceConnector 和 MirrorCheckpointConnector 提供有 JMX 監(jiān)控信息,可以幫助對(duì)遷移進(jìn)度和遷移健康狀況有全局了解。

MM2 會(huì)創(chuàng)建以下幾種 Topic(除 heartbeats 之外,所有的 Topic 都會(huì)被創(chuàng)建在 target 集群上):

? connect-configs:存儲(chǔ) MM2 中 connector 的配置信息。

? connect-offsets:存儲(chǔ) MM2 中 MirrorSourceConnector 和 MirrorCheckpointConnector 的消費(fèi)位點(diǎn)。

? connect-status:存儲(chǔ) MM2 中 connector 的狀態(tài)信息。

? mm2-offset-syncs.A.internal:存儲(chǔ)消息在源集群和目標(biāo)集群之間同步的 offset 映射信息(即 OffsetSync 消息)用于消費(fèi)位點(diǎn)翻譯。此 Topic 中的消息由 MirrorSourceConnector 發(fā)出(Topic 名中 A 表示源集群的 alias)。

? A.checkpoints.internal:存儲(chǔ) GroupId 同步的消費(fèi)進(jìn)度。具體存儲(chǔ)的信息包括 GroupId,Partition 以及在源集群和目標(biāo)集群的消費(fèi)位點(diǎn),此 Topic 中的信息由 MirrorCheckpointConnector 發(fā)出(Topic 名中 A 表示源集群的 alias)。

? heartbeats:定期往源集群發(fā)送心跳消息,這部分消息會(huì)被同步到目標(biāo)集群。此 Topic 中的消息體主要存儲(chǔ)簡(jiǎn)單的時(shí)間戳信息,其中的消息由 MirrorHeartbeatConnector 發(fā)出。

想要了解具體的 MM2 工作流,弄清楚 mm2-offset-syncs.A.internal 和 A.checkpoints.internal 兩個(gè) Topic 的作用尤為關(guān)鍵。

圖片圖片

消息同步與位點(diǎn)映射

MirrorSourceConnector 會(huì)從最早位點(diǎn)開始同步消息。在同步消息時(shí)會(huì)生成 OffsetSync 消息。OffsetSync 消息中記錄了被同步的消息的分區(qū)信息,在源集群和目標(biāo)集群上的位點(diǎn)映射信息。

記錄在 OffsetSync 消息中的位點(diǎn)映射信息是非常必要的,首先一條消息從源集群被同步到目標(biāo)集群上,前后的 offset 大概率是不同的,而且還有可能會(huì)出現(xiàn)消息重復(fù)和多個(gè)源集群的 topic 被同步到一個(gè)目標(biāo) topic 上的情況,而位點(diǎn)映射能最大程度上幫助我們將源集群的消息和目標(biāo)集群的消息對(duì)應(yīng)上。

這個(gè) OffsetSync 消息就被存儲(chǔ)在 mm2-offset-syncs.A.internal 中。但是并不是每同步一條消息就會(huì)生成一個(gè) OffsetSync 消息。默認(rèn)情況下每隔 100 條消息就會(huì)生成一個(gè) OffsetSync 消息,這里的參數(shù)可以使用 offset.lag.max 來進(jìn)行調(diào)節(jié)。關(guān)于 OffsetSync 消息的同步判斷,可以參照 org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState#update 的具體實(shí)現(xiàn)細(xì)節(jié)。

位點(diǎn)翻譯

MirrorCheckpointConnector 則會(huì)執(zhí)行具體的位點(diǎn)翻譯工作,它會(huì)消費(fèi) mm2-offset-syncs.A.internal 中的 OffsetSync 消息,然后將源集群上的消費(fèi)位點(diǎn)翻譯成目標(biāo)集群上的消費(fèi)位點(diǎn)并執(zhí)行 alterConsumerGroupOffsets 方法來重置消費(fèi)者位點(diǎn)。

因?yàn)?OffsetSync 沒有按照時(shí)間間隔同步的邏輯,導(dǎo)致的結(jié)果就是當(dāng)前分區(qū)最新的消息位點(diǎn)距離上一次同步的位點(diǎn)如果沒有超過 100,則不會(huì)生成新的 OffsetSync。而 MirrorCheckpointConnector 是根據(jù) OffsetSync 中的消息位點(diǎn)來同步消費(fèi)進(jìn)度的,這樣的結(jié)果就是目標(biāo)集群的消費(fèi)位點(diǎn)基本上不可能被完全同步,最多相比較于源集群會(huì)回退 100 個(gè)位點(diǎn)。但是在 3.7.0 以及之后的版本中,對(duì) OffsetSync 增加了按照時(shí)間同步的兜底邏輯,使得這個(gè)問題得到了解決[3]。

詳細(xì)來說,如果當(dāng)前消息距離之前的 OffsetSync 中的最新消息沒有超過 100 個(gè) offset,但是已經(jīng)有一段時(shí)間沒有進(jìn)行過 OffsetSync 消息的同步了,也會(huì)強(qiáng)行進(jìn)行一次 OffsetSync 消息的同步(由 offset.flush.internal.ms 參數(shù)控制,默認(rèn)為 10S)。

圖片圖片

可以通過以下命令方便地查看 OffsetSync 消息的內(nèi)容。

$ ./bin/kafka-console-consumer.sh --formatter "org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter" --bootstrap-server 127.0.0.1:9592 --from-beginning --topic mm2-offset-syncs.A.internal
OffsetSync{topicPartitinotallow=heartbeats-0, upstreamOffset=0, downstreamOffset=0}
OffsetSync{topicPartitinotallow=test-0-0, upstreamOffset=0, downstreamOffset=0}
OffsetSync{topicPartitinotallow=test-0-0, upstreamOffset=101, downstreamOffset=101}
OffsetSync{topicPartitinotallow=heartbeats-0, upstreamOffset=2, downstreamOffset=2}

針對(duì) MM2 中的 HeartbeatConnector,更多的時(shí)候則是起到一個(gè)觀測(cè)當(dāng)前 MM2 集群同步狀況的作用。使用以下命令可以查看 HeartbeatTopic 的內(nèi)容。

$ ./bin/kafka-console-consumer.sh --formatter "org.apache.kafka.connect.mirror.formatters.HeartbeatFormatter"  --bootstrap-server 127.0.0.1:9092 --from-beginning --topic heartbeats --property print.key=true
Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564822022}
Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564842185}
Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564862192}
Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564882197}
Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564902202}

這里每 20 秒會(huì)生成一條心跳消息,心跳消息包含一條當(dāng)時(shí)的時(shí)間戳。這樣通過在目標(biāo)集群查看被同步過來的 heartbeat Topic 中的消息,即可查看當(dāng)前消息同步狀況。

3、負(fù)載均衡 

在 Kafka Connect 中,一個(gè)獨(dú)立的 Kafka Connect 進(jìn)程我們稱之為一個(gè) worker。在分布式環(huán)境下,相同 group.id 的一組 worker 就形成了一個(gè) Kafka Connect 集群。

盡管在負(fù)載均衡的過程中,Connector 和 Task 都會(huì)參與,但是 Connector 和 Task 并不是正交的。Task 從屬于 Connector。Connector 參與負(fù)載均衡只是表示具體的 Connector 類中的邏輯會(huì)在哪個(gè) worker 中執(zhí)行。具體的實(shí)現(xiàn)邏輯可以參照 EagerAssigner#performTaskAssignment 中的內(nèi)容:

private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset,
                                                      Map<String, ExtendedWorkerState> memberConfigs,
                                                      WorkerCoordinator coordinator) {
    // 用于記錄 Connector 分配結(jié)果
    Map<String /* member */, Collection<String /* connector */>> connectorAssignments = new HashMap<>();
    // 用于記錄 Task 分配結(jié)果
    Map<String /* member */, Collection<ConnectorTaskId>> taskAssignments = new HashMap<>();


    List<String> connectorsSorted = sorted(coordinator.configSnapshot().connectors());
    // 使用一個(gè)環(huán)形迭代器,將 connector 和 task 分別分配給不同的 worker
    CircularIterator<String> memberIt = new CircularIterator<>(sorted(memberConfigs.keySet()));
    // 先分配 Connector
    for (String connectorId : connectorsSorted) {
        String connectorAssignedTo = memberIt.next();
        log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo);
        Collection<String> memberConnectors = connectorAssignments.computeIfAbsent(connectorAssignedTo, k -> new ArrayList<>());
        memberConnectors.add(connectorId);
    }
    // 在分配具體的 Task,延續(xù) member 迭代器中的順序
    for (String connectorId : connectorsSorted) {
        for (ConnectorTaskId taskId : sorted(coordinator.configSnapshot().tasks(connectorId))) {
            String taskAssignedTo = memberIt.next();
            log.trace("Assigning task {} to {}", taskId, taskAssignedTo);
            Collection<ConnectorTaskId> memberTasks = taskAssignments.computeIfAbsent(taskAssignedTo, k -> new ArrayList<>());
            memberTasks.add(taskId);
        }
    }
    // 序列化分配結(jié)果并返回
    ......
}

下圖展示了有 3 個(gè) Worker,1 個(gè) Connector 以及 5 個(gè) Task 時(shí)以及 Worker2 宕機(jī)前后的負(fù)載均衡情況。

圖片圖片

不過這種負(fù)載均衡方式會(huì)引起比較明顯的驚群效應(yīng),比如在 Kafka Connect 集群擴(kuò)縮容的時(shí)候,不是新擴(kuò)縮容的節(jié)點(diǎn)也會(huì)出現(xiàn)較長的 stop-the-world 問題,在 K8s 環(huán)境中如果有節(jié)點(diǎn)需要進(jìn)行滾動(dòng)升級(jí),也會(huì)出現(xiàn)類似的問題。這種負(fù)載均衡方式在 Kafka 中稱之為 Eager Rebalance。

后面 Kafka 提出了 Incremental Cooperative Rebalance[4],引入了一個(gè)延遲時(shí)間延后 rebalance 的過程。進(jìn)行了這樣的改進(jìn)之后,當(dāng)出現(xiàn)節(jié)點(diǎn)滾動(dòng)升級(jí)時(shí),負(fù)載均衡就不會(huì)馬上發(fā)生,因?yàn)楸簧?jí)的節(jié)點(diǎn)可能很快就回歸了,之前負(fù)載均衡的結(jié)果也能最大限度得到保留,對(duì)整體消息同步流程的影響也盡可能降到了最低。相比較來說,Eager Rebalance 可以很快就達(dá)到負(fù)載均衡的終態(tài),而 Incremental Cooperative Rebalance 則可以最大程度上降低滾動(dòng)升級(jí)等場(chǎng)景下對(duì)負(fù)載均衡帶來的全局影響。

參考資料

[1] KIP-382: MirrorMaker 2.0

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0

[2] COURSE: KAFKA CONNECT 101 Kafka Connect’s REST API

https://developer.confluent.io/courses/kafka-connect/rest-api/

[3] KAFKA-15906

https://issues.apache.org/jira/browse/KAFKA-15906

[4] Incremental Cooperative Rebalancing in Kafka Connect

https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect

[5] KIP-415: Incremental Cooperative Rebalancing in Kafka Connect

https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect

[6] KIP-545: support automated consumer offset sync across clusters in MM 2.0

https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0

[7] KIP-656: MirrorMaker2 Exactly-once Semantics

https://cwiki.apache.org/confluence/display/KAFKA/KIP-656%3A+MirrorMaker2+Exactly-once+Semantics

責(zé)任編輯:武曉燕 來源: AutoMQ
相關(guān)推薦

2020-09-13 13:26:10

Kafka消費(fèi)者控制器

2021-07-06 07:02:41

Vue 2 Vite 開發(fā)工具

2022-04-20 11:41:45

Kafka數(shù)據(jù)解決方案

2011-07-15 10:01:02

Active DireADMT

2010-03-29 15:42:33

遷移工具

2014-09-05 10:16:39

(ISC)2CISSP安全考試CISSP考試

2021-04-09 08:54:14

Kafka源碼架構(gòu)開發(fā)技術(shù)

2021-06-09 10:29:23

Kafka架構(gòu)組件

2015-07-02 14:15:28

云遷移應(yīng)用重構(gòu)頭號(hào)難題

2023-06-07 15:25:19

Kafka版本日志

2024-10-30 10:06:51

2009-05-11 19:03:10

BMCBSM自動(dòng)化

2021-12-07 07:32:09

kafka架構(gòu)原理

2023-09-27 12:22:50

Kafka架構(gòu)

2021-06-16 15:18:03

鴻蒙HarmonyOS應(yīng)用

2012-09-12 10:35:51

Hyper-V

2024-07-03 08:19:56

2019-09-16 12:55:27

HBaseKafka數(shù)據(jù)

2021-08-30 15:41:13

Kafka運(yùn)維數(shù)據(jù)

2012-06-26 09:49:23

Kindle Fire亞馬遜
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)