如何在 .NET 中使用 Kafka
本文轉(zhuǎn)載自微信公眾號「碼農(nóng)讀書」,作者 碼農(nóng)讀書。轉(zhuǎn)載本文請聯(lián)系碼農(nóng)讀書公眾號。
Kafka 是一個開源的,分布式的,可擴展的,高性能的發(fā)布訂閱模式的消息中間件,如果你要構(gòu)建一個處理海量數(shù)據(jù)的系統(tǒng),那么 Kafka 將會是一個非常好的選擇,這篇文章我們將會討論如何基于 Kakfa 構(gòu)建一個發(fā)布訂閱模式的程序。
Kafka 架構(gòu)
這一節(jié)中,先來看看 Kafka 的基礎(chǔ)架構(gòu)以及相關(guān)術(shù)語,大體來說 Kafka 由下面幾個組件組成。
- Kafka Cluster 一個或者多個服務(wù)器組成的集群
- Producer 一個用于發(fā)布消息的組件。
- Consumer 一個用于獲取并處理消息的組件。
- ZooKeeper 一個中心化的協(xié)調(diào)組件,常用于保存分布式環(huán)境下各個節(jié)點的配置信息。
在 Kafka 中,數(shù)據(jù)的基本單元是 message,它是一個 key-value 鍵值對,kafka 會將所有的 message 轉(zhuǎn)換為 byte[],值得注意的是:生產(chǎn)者 和 消費者 以及 cluster 集群之間都是采用 tcp 協(xié)議通訊的,kafka 集群中的每一臺機器都被稱為代理(broker),你可以非常容易的向集群添加機器實現(xiàn)容量的橫向擴展。
下面的圖展示了 kafka 的基礎(chǔ)架構(gòu)。
kafka 中的 topic 表示 message 的邏輯集合,如果不明白的話,你可以認為 topic 就是 category (分類),category 下自然就是歸類的 message,這些 message 是由 生產(chǎn)者 產(chǎn)生。
kafka server 中會包含一個或者多個 topics,每一個 topics 又可以包含一個或者多個 partitions(分區(qū)),partition 被定義為一個有序的消息序列,值得注意的是 partitions 是 kafka 能夠動態(tài)擴展的關(guān)鍵,換句話說 partition 可以分布在多個 kafka server 上,具體操作流程為:kafka 中的 生產(chǎn)者 將 message 推送到指定的 topic,訂閱該 topic 的 消費者 就可以拿到該消息。
Kafka 和 RabbitMQ 比較
Kafka 和 RabbitMQ 都是非常流行的,開源的 消息中間件,那什么時候應(yīng)該選擇 Kakfa 而不是 RabbitMQ 呢?主要考慮如下幾點。
- RabbitMQ 是由高性能語言 Erlang 編寫的,它擁有豐富的 路由機制 和強大的 消息確認機制, 同時 RabbitMQ 還提供了一個可視化的 WebUI 界面,可以通過它監(jiān)視 RabbitMQ 的運行狀態(tài),但如果你有大規(guī)模部署的需求,RabbitMQ 就沒有 Kafka 好使了,因為后者的擴容只需要增加 partitions 就可以了。
- RabbitMQ Cluster 會存在經(jīng)典的 腦裂問題,需要使用單獨的插件支持(federations)。
- Kafka 在性能上遠超 RabbitMQ,單節(jié)點的 Kafka 能夠處理 10w/s 條記錄,而 RabbitMQ 大概只能處理 2w/s 條記錄。
構(gòu)建 生產(chǎn)者 和 消費者
這一節(jié)我們來討論如何為 Kafka 構(gòu)建生產(chǎn)者和消費者,這就需要構(gòu)建兩個 Console 程序分別充當各自角色,大家可以用 nuget 安裝一下 kafka-net,命令如下:
- Install-Package kafka-net
構(gòu)建 生產(chǎn)者 Console
- static void Main(string[] args)
- {
- string payload ="Welcome to Kafka!";
- string topic ="IDGTestTopic";
- Message msg = new Message(payload);
- Uri uri = new Uri("http://localhost:9092");
- var options = new KafkaOptions(uri);
- var router = new BrokerRouter(options);
- var client = new Producer(router);
- client.SendMessageAsync(topic, new List<Message> { msg }).Wait();
- Console.ReadLine();
- }
構(gòu)建 消費者 Console
- static void Main(string[] args)
- {
- string topic ="IDGTestTopic";
- Uri uri = new Uri("http://localhost:9092");
- var options = new KafkaOptions(uri);
- var router = new BrokerRouter(options);
- var consumer = new Consumer(new ConsumerOptions(topic, router));
- foreach (var message in consumer.Consume())
- {
- Console.WriteLine(Encoding.UTF8.GetString(message.Value));
- }
- Console.ReadLine();
- }
最后可以依次將 生產(chǎn)者 和 消費者 程序啟動起來,然后你就會看到 消費者 Console 上顯示:Welcome to Kafka! 。
其實在開源世界中有太多的消息中間件,比如:RabbitMQ, MSMQ, IBM MQ Series 等等,現(xiàn)在的 Kafka 不僅僅是 消息中間件 了,而是用于大數(shù)據(jù)的 流式處理平臺,Kafka 也常常用于 IOT 程序,日志聚合 和 其他低延遲,強消息保證 等場景,如果你的應(yīng)用程序需要一個快速并可擴展的消息中間件,kafka 將會是一個非常好的選擇,后續(xù)我會分享更多的關(guān)于 kafka 的文章。
譯文鏈接:https://www.infoworld.com/article/3215165/how-to-use-apache-kafka-messaging-in-net.html
























