使用Kafka和MongoDB進(jìn)行Go異步處理
在我前面的博客文章 “我的***個 Go 微服務(wù):使用 MongoDB 和 Docker 多階段構(gòu)建” 中,我創(chuàng)建了一個 Go 微服務(wù)示例,它發(fā)布一個 REST 式的 http 端點,并將從 HTTP POST 中接收到的數(shù)據(jù)保存到 MongoDB 數(shù)據(jù)庫。
在這個示例中,我將數(shù)據(jù)的保存和 MongoDB 分離,并創(chuàng)建另一個微服務(wù)去處理它。我還添加了 Kafka 為消息層服務(wù),這樣微服務(wù)就可以異步處理它自己關(guān)心的東西了。
如果你有時間去看,我將這個博客文章的整個過程錄制到 這個視頻中了 :)
下面是這個使用了兩個微服務(wù)的簡單的異步處理示例的上層架構(gòu)圖。

rest-kafka-mongo-microservice-draw-io
微服務(wù) 1 —— 是一個 REST 式微服務(wù),它從一個 /POST http 調(diào)用中接收數(shù)據(jù)。接收到請求之后,它從 http 請求中檢索數(shù)據(jù),并將它保存到 Kafka。保存之后,它通過 /POST 發(fā)送相同的數(shù)據(jù)去響應(yīng)調(diào)用者。
微服務(wù) 2 —— 是一個訂閱了 Kafka 中的一個主題的微服務(wù),微服務(wù) 1 的數(shù)據(jù)保存在該主題。一旦消息被微服務(wù)消費之后,它接著保存數(shù)據(jù)到 MongoDB 中。
在你繼續(xù)之前,我們需要能夠去運行這些微服務(wù)的幾件東西:
- 下載 Kafka —— 我使用的版本是 kafka_2.11-1.1.0
- 安裝 librdkafka —— 不幸的是,這個庫應(yīng)該在目標(biāo)系統(tǒng)中
- 安裝 Kafka Go 客戶端
- 運行 MongoDB。你可以去看我的 以前的文章 中關(guān)于這一塊的內(nèi)容,那篇文章中我使用了一個 MongoDB docker 鏡像。
我們開始吧!
首先,啟動 Kafka,在你運行 Kafka 服務(wù)器之前,你需要運行 Zookeeper。下面是示例:
$ cd /<download path>/kafka_2.11-1.1.0$ bin/zookeeper-server-start.sh config/zookeeper.properties
接著運行 Kafka —— 我使用 9092 端口連接到 Kafka。如果你需要改變端口,只需要在 config/server.properties 中配置即可。如果你像我一樣是個新手,我建議你現(xiàn)在還是使用默認(rèn)端口。
$ bin/kafka-server-start.sh config/server.properties
Kafka 跑起來之后,我們需要 MongoDB。它很簡單,只需要使用這個 docker-compose.yml 即可。
version: '3'services:mongodb:image: mongoports:- "27017:27017"volumes:- "mongodata:/data/db"networks:- network1volumes:mongodata:networks:network1:
使用 Docker Compose 去運行 MongoDB docker 容器。
docker-compose up
這里是微服務(wù) 1 的相關(guān)代碼。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:
rest-to-kafka/rest-kafka-sample.go
func jobsPostHandler(w http.ResponseWriter, r *http.Request) {//Retrieve body from http requestb, err := ioutil.ReadAll(r.Body)defer r.Body.Close()if err != nil {panic(err)}//Save data into Job structvar _job Joberr = json.Unmarshal(b, &_job)if err != nil {http.Error(w, err.Error(), 500)return}saveJobToKafka(_job)//Convert job struct into jsonjsonString, err := json.Marshal(_job)if err != nil {http.Error(w, err.Error(), 500)return}//Set content-type http headerw.Header().Set("content-type", "application/json")//Send back data as responsew.Write(jsonString)}func saveJobToKafka(job Job) {fmt.Println("save to kafka")jsonString, err := json.Marshal(job)jobString := string(jsonString)fmt.Print(jobString)p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})if err != nil {panic(err)}// Produce messages to topic (asynchronously)topic := "jobs-topic1"for _, word := range []string{string(jobString)} {p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value: []byte(word),}, nil)}}
這里是微服務(wù) 2 的代碼。在這個代碼中最重要的東西是從 Kafka 中消費數(shù)據(jù),保存部分我已經(jīng)在前面的博客文章中討論過了。這里代碼的重點部分是從 Kafka 中消費數(shù)據(jù):
kafka-to-mongo/kafka-mongo-sample.go
func main() {//Create MongoDB sessionsession := initialiseMongo()mongoStore.session = sessionreceiveFromKafka()}func receiveFromKafka() {fmt.Println("Start receiving from Kafka")c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id": "group-id-1","auto.offset.reset": "earliest",})if err != nil {panic(err)}c.SubscribeTopics([]string{"jobs-topic1"}, nil)for {msg, err := c.ReadMessage(-1)if err == nil {fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))job := string(msg.Value)saveJobToMongo(job)} else {fmt.Printf("Consumer error: %v (%v)\n", err, msg)break}}c.Close()}func saveJobToMongo(jobString string) {fmt.Println("Save to MongoDB")col := mongoStore.session.DB(database).C(collection)//Save data into Job structvar _job Jobb := []byte(jobString)err := json.Unmarshal(b, &_job)if err != nil {panic(err)}//Insert job into MongoDBerrMongo := col.Insert(_job)if errMongo != nil {panic(errMongo)}fmt.Printf("Saved to MongoDB : %s", jobString)}
我們來演示一下,運行微服務(wù) 1。確保 Kafka 已經(jīng)運行了。
$ go run rest-kafka-sample.go
我使用 Postman 向微服務(wù) 1 發(fā)送數(shù)據(jù)。

Screenshot-2018-04-29-22.20.33
這里是日志,你可以在微服務(wù) 1 中看到。當(dāng)你看到這些的時候,說明已經(jīng)接收到了來自 Postman 發(fā)送的數(shù)據(jù),并且已經(jīng)保存到了 Kafka。

Screenshot-2018-04-29-22.22.00
因為我們尚未運行微服務(wù) 2,數(shù)據(jù)被微服務(wù) 1 只保存在了 Kafka。我們來消費它并通過運行的微服務(wù) 2 來將它保存到 MongoDB。
$ go run kafka-mongo-sample.go
現(xiàn)在,你將在微服務(wù) 2 上看到消費的數(shù)據(jù),并將它保存到了 MongoDB。

Screenshot-2018-04-29-22.24.15
檢查一下數(shù)據(jù)是否保存到了 MongoDB。如果有數(shù)據(jù),我們成功了!

Screenshot-2018-04-29-22.26.39
完整的源代碼可以在這里找到:
https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice































