使用 Go 構(gòu)建分布式系統(tǒng):基于 gRPC 的主從節(jié)點架構(gòu)
在現(xiàn)代軟件開發(fā)領(lǐng)域,分布式系統(tǒng)已經(jīng)變得至關(guān)重要。它們使服務(wù)能夠擴展、處理大量數(shù)據(jù)并提供高可用性。本文將指導(dǎo)您使用 Golang 構(gòu)建一個簡單的分布式系統(tǒng),該系統(tǒng)利用主節(jié)點和單個工作節(jié)點,并使用 gRPC 協(xié)議進行通信。
這種架構(gòu)非常適合數(shù)據(jù)處理、并行計算和大規(guī)模處理工作負載等分布式任務(wù)。我們將介紹如何設(shè)置主從結(jié)構(gòu)、建立基于 gRPC 的通信,以及實現(xiàn)簡單的任務(wù)分配和執(zhí)行流程。
系統(tǒng)概述
我們的分布式系統(tǒng)包含以下組件:
- 主節(jié)點: 控制器節(jié)點,負責將任務(wù)分配給工作節(jié)點。它跟蹤可用的工作節(jié)點、監(jiān)控任務(wù)狀態(tài)并管理任務(wù)分配。
- 工作節(jié)點: 執(zhí)行器節(jié)點,接收來自主節(jié)點的任務(wù),執(zhí)行計算并返回結(jié)果。
- gRPC 協(xié)議: gRPC(Google Remote Procedure Call)用于主節(jié)點和工作節(jié)點之間的通信,實現(xiàn)高效、高性能的通信。
前提條件
- 系統(tǒng)上已安裝 Go 1.13+。
- 用于生成 gRPC 代碼的 Protobuf 編譯器 (protoc)。
- gRPC-Go 和 Protobuf 庫。
go install google.golang.org/grpc
go install google.golang.org/protobuf/cmd/protoc-gen-go
go install google.golang.org/protobuf/cmd/protoc-gen-go-grpc設(shè)置 gRPC 服務(wù)定義
創(chuàng)建基于 gRPC 的分布式系統(tǒng)的第一步是在 .proto 文件中定義 gRPC 服務(wù)和消息。此文件概述了用于通信的服務(wù)、RPC 方法和消息結(jié)構(gòu)。
1. 在 Proto 文件中定義 gRPC 服務(wù)
創(chuàng)建一個名為 node.proto 的文件,內(nèi)容如下:
syntax = "proto3";
package core;
option go_package = ".;core";
message Request {
string action = 1;
}
message Response {
string data = 1;
}
service NodeService {
rpc ReportStatus(Request) returns (Response){};
rpc AssignTask(Request) returns (stream Response){};
}2. 從 Proto 文件生成 gRPC 代碼
使用 protoc 為我們的 gRPC 服務(wù)生成 Go 代碼:
mkdir core
protoc --go_out=./core --go-grpc_out=./core node.proto實現(xiàn) gRPC 服務(wù)端代碼
我們設(shè)置了一個 gRPC 服務(wù)器來報告狀態(tài),并通過命令通道持續(xù)發(fā)送客戶端任務(wù)。它使用 Go 的并發(fā)特性來處理實時命令通知。
package core
import "context"
type NodeServiceGrpcServer struct {
UnimplementedNodeServiceServer
CmdChannel chan string
}
func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) {
return &Response{Data: "ok"}, nil
}
func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error {
for {
select {
case cmd := <-n.CmdChannel:
if err := server.Send(&Response{Data: cmd}); err != nil {
return err
}
}
}
}
var server *NodeServiceGrpcServer
func GetNodeServiceGrpcServer() *NodeServiceGrpcServer {
if server == nil {
server = &NodeServiceGrpcServer{
CmdChannel: make(chan string),
}
}
return server
}實現(xiàn)主節(jié)點
主節(jié)點負責將任務(wù)分配給工作節(jié)點。它通過 gRPC 連接到工作節(jié)點,并使用 AssignTask 方法分配任務(wù)。
現(xiàn)在,讓我們在名為 node.go 的文件中實現(xiàn)主節(jié)點:我們使用 API 框架 gin 創(chuàng)建一個簡單的 API 服務(wù),該服務(wù)允許對 /tasks 的 POST 請求將命令發(fā)送到通道 CmdChannel 并傳遞給 NodeServiceGrpcServer。
package core
import (
"net"
"net/http"
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
)
type MasterNode struct {
api *gin.Engine
ln net.Listener
svr *grpc.Server
nodeSvr *NodeServiceGrpcServer
}
func (n *MasterNode) Init() (err error) {
n.ln, err = net.Listen("tcp", ":50051")
if err != nil {
return err
}
n.svr = grpc.NewServer()
n.nodeSvr = GetNodeServiceGrpcServer()
RegisterNodeServiceServer(node.svr, n.nodeSvr)
n.api = gin.Default()
n.api.POST("/tasks", func(c *gin.Context) {
var payload struct {
Cmd string `json:"cmd"`
}
if err := c.ShouldBindBodyWithJSON(&payload); err != nil {
c.AbortWithStatus(http.StatusBadRequest)
return
}
n.nodeSvr.CmdChannel <- payload.Cmd
c.AbortWithStatusJSON(200, http.StatusOK)
})
return nil
}
func (n *MasterNode) Start() {
go n.svr.Serve(n.ln)
_ = n.api.Run(":9092")
n.svr.Stop()
}
var node *MasterNode
func GetMasterNode() *MasterNode {
if node == nil {
node = &MasterNode{}
if err := node.Init(); err != nil {
panic(err)
}
}
return node
}實現(xiàn)工作節(jié)點
工作節(jié)點的職責是從主節(jié)點接收任務(wù)、處理任務(wù)并返回結(jié)果。
現(xiàn)在,讓我們在名為 worker_node.go 的文件中實現(xiàn)工作服務(wù)器:工作節(jié)點通過獲取的流從服務(wù)器(主節(jié)點)連續(xù)接收數(shù)據(jù)并執(zhí)行命令。
package core
import (
"context"
"fmt"
"os/exec"
"strings"
"google.golang.org/grpc"
)
type WokerNode struct {
conn *grpc.ClientConn
c NodeServiceClient
}
func (n *WokerNode) Init() (err error) {
n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
return err
}
n.c = NewNodeServiceClient(n.conn)
return nil
}
func (n *WokerNode) Start() {
fmt.Println("worker node started")
_, _ = n.c.ReportStatus(context.Background(), &Request{})
stream, _ := n.c.AssignTask(context.Background(), &Request{})
for {
res, err := stream.Recv()
if err != nil {
return
}
fmt.Print("received command: ", res.Data)
parts := strings.Split(res.Data, " ")
if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil {
fmt.Println(err)
}
}
}
var workerNode *WokerNode
func GetWorkerNode() *WokerNode {
if workerNode == nil {
workerNode = &WokerNode{}
if err := workerNode.Init(); err != nil {
panic(err)
}
}
return workerNode
}整合主從節(jié)點
我們創(chuàng)建一個 main.go,它位于 core 文件夾之外。main 函數(shù)接受一個參數(shù),并將其與 switch 語句進行比較,以確定是運行主節(jié)點還是工作節(jié)點。
package main
import (
"go-master-worker-node/core"
"os"
)
func main() {
nodeType := os.Args[1]
switch nodeType {
case "master":
core.GetMasterNode().Start()
case "worker":
core.GetWorkerNode().Start()
default:
panic("invalid node type")
}
}運行主節(jié)點和工作節(jié)點
啟動主節(jié)點:
go run main.go master啟動工作節(jié)點:
go run main.go worker使用 Curl 發(fā)送 POST 請求
我們可以使用 curl POST 方法發(fā)送命令,如下所示,我們向本地主機 9092 發(fā)送一個 touch 命令,路徑設(shè)置為“tasks”,這是主節(jié)點當前運行的位置。
發(fā)送 touch 命令:
curl -X POST -H "Content-Type: application/json" -d '{"cmd": "touch test.txt"}' http://localhost:9092/tasks結(jié)論
我們使用 Golang 構(gòu)建了一個基本的分布式系統(tǒng),該系統(tǒng)采用主從架構(gòu)并使用 gRPC 進行高效通信。在實際場景中,您可以使用更復(fù)雜的任務(wù)分配、負載均衡和錯誤處理來擴展此模型,以處理生產(chǎn)級別的分布式任務(wù)。
































