AIOps系列 | 開發(fā)一個(gè) K8s Chat 命令行工具
在前面我們介紹了[[03.大模型入門實(shí)戰(zhàn)]]和 [[04.Agent入門實(shí)戰(zhàn)]],了解了 AI 開發(fā)的基本流程,本章節(jié)我們將使用討論如何將 Kubernetes 和 AI 結(jié)合起來。
本文主要從以下幾個(gè)方面進(jìn)行介紹:
- 怎么和 Kubernetes 進(jìn)行交互
- Cobra 工具庫介紹
- 開發(fā) Chat K8s 命令行工具
Tips:文章只是起到拋磚引玉的作用,做大做強(qiáng)還需各位自己努力。
怎么和 Kubernetes 進(jìn)行交互
Kubernetes 集群是云原生時(shí)代中最重要的基礎(chǔ)設(shè)施,它里面運(yùn)行著成千上萬的 Pods,在日常的運(yùn)維工作中,SRE 常用 kubectl 命令與它進(jìn)行交互。
kubectl 是官方開發(fā)的客戶端,已經(jīng)將所有常用的操作集成到里面了。但是, 如果我們要通過接口與 Kubernetes 進(jìn)行交互,就需要使用官方的一個(gè)核心庫 Client-go。
client-go 是 Kubernetes 官方提供的 Go 語言客戶端庫,用于與 Kubernetes API Server 進(jìn)行交互。它讓你可以通過編程的方式創(chuàng)建、更新、刪除和查詢 Kubernetes 資源(如 Pod、Deployment、Service 等),是開發(fā) Kubernetes 控制器、Operator、自定義調(diào)度器或運(yùn)維工具的核心依賴。
client-go 的核心組件主要有:
- Clientset:最常用的客戶端集合,包含對(duì) core、apps、networking 等 API 訪問。
- Informer:監(jiān)聽資源變化,如 add、update、delete,避免頻繁輪詢。
- Lister:快速從本地緩存讀取資源,主要是配合 Informer 使用。
- Workqueue:處理異步任務(wù)隊(duì)列,常用于控制器中
- ...
下面我們舉一個(gè)簡單的例子獲取所有 Pod 列表。
(1)初始化一個(gè)項(xiàng)目
mkdir k8s-pod-list && cd k8s-pod-list
go mod init k8s-pod-list(2)安裝 client-go
go get k8s.io/client-go/kubernetes
go get k8s.io/client-go/tools/clientcmd
go get k8s.io/client-go/rest(3)編寫代碼
package main
import (
"context"
"fmt"
"log"
"path/filepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
// 1. 構(gòu)建 kubeconfig 路徑(默認(rèn) ~/.kube/config)
var kubeconfig string
if home := homedir.HomeDir(); home != "" {
kubeconfig = filepath.Join(home, ".kube", "config")
}
// 2. 加載 kubeconfig 文件,生成配置
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatal(err)
}
// 3. 創(chuàng)建 Kubernetes 客戶端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
// 4. 獲取所有 Pod
pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Fatal(err)
}
// 5. 打印每個(gè) Pod 的命名空間和名稱
fmt.Printf("Found %d pods:\n", len(pods.Items))
for _, pod := range pods.Items {
fmt.Printf("Namespace: %s, Pod: %s\n", pod.Namespace, pod.Name)
}
}(4)運(yùn)行 go run main.go,輸出如下:
> go run main.go
Found 713 pods:
Namespace: apo, Pod: apo-alertmanager-554fd4bbfc-mjp6q
Namespace: apo, Pod: apo-altinity-clickhouse-operator-cc87bb785-j9t9q
Namespace: apo, Pod: apo-backend-594c764b94-nd4zw
Namespace: apo, Pod: apo-collector-76979647fd-cmgg2
Namespace: apo, Pod: apo-front-5fbf5f4566-4rdh9
Namespace: apo, Pod: apo-grafana-57b98b9d59-f9k4h
Namespace: apo, Pod: apo-jaeger-collector-6d7bbc945d-6p95x
Namespace: apo, Pod: apo-odigos-instrumentor-8689cd45c6-xgvnsclient-go 是與 Kubernetes 交互的 Go 語言標(biāo)準(zhǔn)工具,它是構(gòu)建云原生工具的基礎(chǔ),適合用于開發(fā)自動(dòng)化運(yùn)維系統(tǒng)、Kubernetes 插件、監(jiān)控工具等。
本文我們希望的是開發(fā)一個(gè) Kubernetes 工具,單純使用 client-go 略顯麻煩,我們還需要使用另外一個(gè) CLI 工具庫——Cobra,它可以為我們開發(fā)提升很大的效率。
Cobra 工具庫介紹
Cobra 是 Go 語言中最流行的命令行工具庫,用于構(gòu)建功能強(qiáng)大、結(jié)構(gòu)清晰的現(xiàn)代 CLI(Command-Line Interface)應(yīng)用程序。它被廣泛用于許多知名開源項(xiàng)目中,例如:
- Kubernetes (kubectl)
- Helm
- Docker (docker CLI)
它的核心特性有:
- Command:支持命令及子命令,如 app server,app config set。
- Flags:支持全局和局部 flag,如 --config,-v。
- Args:命令的參數(shù),一般用來表示操作的對(duì)象,比如 kubectl get pod,其中 pod 就是操作的對(duì)象。
- 可以自動(dòng)生成 help,輸入 app --help 自動(dòng)輸出幫助信息。
- 可以 shell 環(huán)境下的自動(dòng)補(bǔ)全。
對(duì)于 Command,每個(gè)命令都是一個(gè) &cobra.Command{},比如:
&cobra.Command{
Use: "hello", // 命令用法
Short: "簡短描述",
Long: "長描述",
Run: func(cmd *cobra.Command, args []string) {
// 執(zhí)行邏輯
},
}而 Flags 分為 全局 Flag 和 局部 Flag,取決你的 Flag 是假如的 rootCmd 還是 其他命令。
// 局部 flag
helloCmd.Flags().StringVarP(&name, "name", "n", "default", "姓名")
// 全局 flag(加到 rootCmd)
rootCmd.PersistentFlags().Bool("verbose", false, "啟用詳細(xì)日志")另外,F(xiàn)lags 支持 String、Bool、Int、Float 類型,如下:
- String 類型
StringVar(&variable, "flag", "default", "description"),不帶縮寫
StringVarP(&variable, "flag", ”shorthand“, "default", "description"),帶縮寫
StringSliceVar(&variable, "flag", []string{}, "description"),可以接手?jǐn)?shù)組,cli可以傳入多個(gè)flag
StringVarP(&variable, "flag", ”shorthand“, []string{}, "description")
- Bool 類型
BoolVar(&variable, "flag", "default", "description"),不帶縮寫
BoolVarP(&variable, "flag", ”shorthand“, "default", "description"),帶縮寫
- Int 類型
IntVar(&variable, "flag", "default", "description"),不帶縮寫
IntVarP(&variable, "flag", ”shorthand“, "default", "description"),帶縮寫
- Float 類型
FloatVar(&variable, "flag", "default", "description"),不帶縮寫
FloatVarP(&variable, "flag", ”shorthand“, "default", "description"),帶縮寫
下面我們開發(fā)一個(gè)最簡單的 hello命令。
(1)安裝 Cobra
go get -u github.com/spf13/cobra/cobra(2)初始化項(xiàng)目
mkdir hello && cd hello
go mod init hello
cobra-cli init這會(huì)自動(dòng)生成:
- cmd/root.go —— 主命令
- main.go —— 入口
- 自動(dòng)支持 myapp --help
(3)添加子命令
cobra-cli add hello(4)編輯 cmd/hello.go,添加邏輯
package cmd
import (
"fmt"
"github.com/spf13/cobra"
)
var name string
var helloCmd = &cobra.Command{
Use: "hello",
Short: "打印問候語",
Long: `一個(gè)簡單的命令,用于向某人問好`,
Run: func(cmd *cobra.Command, args []string) {
if name == "" {
name = "World"
}
fmt.Printf("Hello, %s!\n", name)
},
}
func init() {
rootCmd.AddCommand(helloCmd)
helloCmd.Flags().StringVarP(&name, "name", "n", "", "輸入你的名字")
}(5)運(yùn)行程序,查看輸出。
go run main.go hello
# 輸出: Hello, World!
go run main.go hello --name=Jokerbai
# 輸出: Hello, Jokerbai!
go run main.go hello -n Bob
# 輸出: Hello, Bob!
go run main.go hello --help
# 一個(gè)簡單的命令,用于向某人問好
#
# Usage:
# cobra-hello hello [flags]
#
# Flags:
# -h, --help help for hello
# -n, --name string 輸入你的名字通過上面簡單的示例,我們不難發(fā)現(xiàn)通過 Cobra,我們可以快速搭建起命令工具的架子,后續(xù)只需要結(jié)合業(yè)務(wù)對(duì)這個(gè)架子進(jìn)行裝修,省去不少麻煩。
開發(fā) Chat K8s 命令行工具
上面我們對(duì)本文需要的基礎(chǔ)工具進(jìn)行了簡單介紹,接下來我們將進(jìn)入第一個(gè)主題:開發(fā) Chat K8s 命令行工具。
我們這個(gè)工具需要實(shí)現(xiàn)的效果是:
- Command:k8scopilot ask chatgpt 進(jìn)入交互模式
- 可以進(jìn)行資源的部署,比如:
幫我部署一個(gè) Deploy,鏡像是 nginx
部署一個(gè) Pod,鏡像是 nginx
- 可以查詢資源信息,比如:
查一下 default ns 的 Pod、SVC、Deploy 資源
- 可以刪除資源,比如:
刪除 default ns 下的 nginx deploy
下面我們進(jìn)入正式的開發(fā)。
1.創(chuàng)建一個(gè) k8scopilot目錄,使用 cobra-cli 初始化項(xiàng)目
mkdir k8scopilot && cd k8scopilot
go mod init
cobra-cli initr2.添加第一個(gè) ask 命令
cobra-cli add ask3.接著,添加一個(gè) chatgpt 命令,它是 ask 的子命令
cobra-cli add chatgpt -p "askCmd"4.在 chatgpt 命令中實(shí)現(xiàn)一個(gè)簡單的功能:
- 當(dāng)用戶執(zhí)行 k8scopilot ask chatgpt 進(jìn)入控制臺(tái)輸入
- 然后用戶可以在控制臺(tái)中進(jìn)行對(duì)話
- 當(dāng)用戶輸入 exit 退出
(1)增加一個(gè) startChat 方法,用于處理生成對(duì)話回復(fù)
func startChat() {
scanner := bufio.NewScanner(os.Stdin)
fmt.Println("我是 K8S Copilot,你可以向我咨詢 K8S 相關(guān)問題")
for {
fmt.Print("> ")
if scanner.Scan() {
input := scanner.Text()
if input == "exit" {
fmt.Println("退出對(duì)話")
break
}
if input == "" {
continue
}
fmt.Println("你輸入的是:", input)
}
}
}效果如下:
> .\k8scopilot.exe ask chatgpt
我是 K8S Copilot,你可以向我咨詢 K8S 相關(guān)問題
> 部署資源
你輸入的是: 部署資源
> exit
退出對(duì)話(2)封裝 OpenAI 客戶端,方便后續(xù)調(diào)用大模型。創(chuàng)建一個(gè) utils 目錄,然后創(chuàng)建 openai.go 完成客戶端封裝。
package utils
import (
"context"
"errors"
"net/http"
"os"
"time"
"github.com/sashabaranov/go-openai"
)
const defaultBaseURL = "https://vip.apiyi.com/v1"
const defaultApiKey = "sk-xxxx"
type OpenAI struct {
Client *openai.Client
ctx context.Context
}
func NewOpenAI() *OpenAI {
apiKey := os.Getenv("OPENAI_API_KEY")
if apiKey == "" {
apiKey = defaultApiKey
}
apiBase := os.Getenv("OPENAI_API_BASE")
if apiBase == "" {
apiBase = defaultBaseURL
}
config := openai.DefaultConfig(apiKey)
config.BaseURL = apiBase
config.HTTPClient = &http.Client{
Timeout: 30 * time.Second,
}
client := openai.NewClientWithConfig(config)
return &OpenAI{
Client: client,
ctx: context.Background(),
}
}
func (op *OpenAI) SendMessage(prompt string, content string) (string, error) {
req := openai.ChatCompletionRequest{
Model: openai.GPT4oMini,
Messages: []openai.ChatCompletionMessage{
{
Role: "system",
Content: prompt,
},
{
Role: "user",
Content: content,
},
},
}
resp, err := op.Client.CreateChatCompletion(op.ctx, req)
if err != nil {
return "", err
}
if len(resp.Choices) == 0 {
return "", errors.New("no response")
}
return resp.Choices[0].Message.Content, nil
}我們定義了 NewOpenAI 方法用來初始化 openai,然后再定義了一個(gè) SendMessage 方法用來和 GPT 進(jìn)行交互。
(3)實(shí)現(xiàn) FunctionCall,在需要根據(jù)用戶輸入執(zhí)行不同的操作的時(shí)候,我們?cè)凇洞竽P腿腴T實(shí)戰(zhàn)》章節(jié)有講過 FunctionCall,該項(xiàng)目我們將使用其實(shí)現(xiàn)。
這里只實(shí)現(xiàn)三個(gè)功能:
- 生成 YAML 清單并部署
- 查詢 K8s 資源
- 刪除 K8s 資源
在 chatgpt.go 中實(shí)現(xiàn)。
首先,定義一個(gè)工具函數(shù) createTools,用于定義 openai 的工具集。
func createTools() []openai.Tool {
// 用來生成 K8s YAML 并部署資源
f1 := openai.FunctionDefinition{
Name: "generateAndDeployResource",
Description: "生成 K8s YAML 并部署資源",
Parameters: jsonschema.Definition{
Type: jsonschema.Object,
Properties: map[string]jsonschema.Definition{
"user_input": {
Type: jsonschema.String,
Description: "用戶輸出的文本內(nèi)容,要求包含資源類型和鏡像",
},
},
Required: []string{"user_input"}, // 修復(fù):應(yīng)該是 user_input 而不是 location
},
}
// 用來查詢 K8s 資源
f2 := openai.FunctionDefinition{
Name: "queryResource",
Description: "查詢 K8s 資源",
Parameters: jsonschema.Definition{
Type: jsonschema.Object,
Properties: map[string]jsonschema.Definition{
"namespace": {
Type: jsonschema.String,
Description: "資源所在的命名空間",
},
"resource_type": {
Type: jsonschema.String,
Description: "K8s 資源標(biāo)準(zhǔn)類型,例如 Pod、Deployment、Service 等",
},
},
Required: []string{"namespace", "resource_type"}, // 添加必需參數(shù)
},
}
// 用來刪除 K8s 資源
f3 := openai.FunctionDefinition{
Name: "deleteResource",
Description: "刪除 K8s 資源",
Parameters: jsonschema.Definition{
Type: jsonschema.Object,
Properties: map[string]jsonschema.Definition{
"namespace": {
Type: jsonschema.String,
Description: "資源所在的命名空間",
},
"resource_type": {
Type: jsonschema.String,
Description: "K8s 資源標(biāo)準(zhǔn)類型,例如 Pod、Deployment、Service 等",
},
"resource_name": {
Type: jsonschema.String,
Description: "資源名稱",
},
},
Required: []string{"namespace", "resource_type", "resource_name"}, // 添加必需參數(shù)
},
}
return []openai.Tool{
{Type: openai.ToolTypeFunction, Function: &f1},
{Type: openai.ToolTypeFunction, Function: &f2},
{Type: openai.ToolTypeFunction, Function: &f3},
}
}接著,我們定義 functionCalling 方法,用于將工具注冊(cè)到大模型以及執(zhí)行工具調(diào)用。
func functionCalling(input string, client *utils.OpenAI) string {
tools := createTools()
dialogue := []openai.ChatCompletionMessage{
{Role: openai.ChatMessageRoleUser, Content: input},
}
// 調(diào)用 OpenAI API
resp, err := client.Client.CreateChatCompletion(context.TODO(),
openai.ChatCompletionRequest{
Model: openai.GPT4TurboPreview,
Messages: dialogue,
Tools: tools,
},
)
if err != nil {
return fmt.Sprintf("OpenAI API 調(diào)用失敗: %v", err)
}
// 驗(yàn)證響應(yīng)
if len(resp.Choices) == 0 {
return "OpenAI 返回了空的響應(yīng)"
}
msg := resp.Choices[0].Message
// 如果沒有工具調(diào)用,直接返回消息內(nèi)容
if len(msg.ToolCalls) == 0 {
if msg.Content != "" {
return msg.Content
}
return "抱歉,我無法理解您的請(qǐng)求,請(qǐng)?zhí)峁└唧w的信息。"
}
// 處理多個(gè)工具調(diào)用(雖然當(dāng)前邏輯假設(shè)只有一個(gè))
if len(msg.ToolCalls) > 1 {
return "抱歉,當(dāng)前不支持同時(shí)執(zhí)行多個(gè)操作,請(qǐng)一次只執(zhí)行一個(gè)操作。"
}
// 執(zhí)行工具調(diào)用
toolCall := msg.ToolCalls[0]
result, err := callFunction(client, toolCall.Function.Name, toolCall.Function.Arguments)
if err != nil {
return fmt.Sprintf("執(zhí)行操作失敗: %v", err)
}
return result
}在 functionCalling 中,我們調(diào)用了 callFunction 方法執(zhí)行工具調(diào)用,下面實(shí)現(xiàn) callFuntion ,它將根據(jù) OpenAI 返回的消息,調(diào)用對(duì)應(yīng)的函數(shù)。
// 根據(jù) OpenAI 返回的消息,調(diào)用對(duì)應(yīng)的函數(shù)
func callFunction(client *utils.OpenAI, name, arguments string) (string, error) {
switch name {
case "generateAndDeployResource":
params := struct {
UserInput string `json:"user_input"`
}{}
if err := json.Unmarshal([]byte(arguments), ?ms); err != nil {
return "", fmt.Errorf("解析生成部署資源參數(shù)失敗: %v", err)
}
return generateAndDeployResource(client, params.UserInput)
case "queryResource":
params := struct {
Namespace string `json:"namespace"`
ResourceType string `json:"resource_type"`
}{}
if err := json.Unmarshal([]byte(arguments), ?ms); err != nil {
return "", fmt.Errorf("解析查詢資源參數(shù)失敗: %v", err)
}
return queryResource(params.Namespace, params.ResourceType)
case "deleteResource":
params := struct {
Namespace string `json:"namespace"`
ResourceType string `json:"resource_type"`
ResourceName string `json:"resource_name"`
}{}
if err := json.Unmarshal([]byte(arguments), ?ms); err != nil {
return "", fmt.Errorf("解析刪除資源參數(shù)失敗: %v", err)
}
return deleteResource(params.Namespace, params.ResourceType, params.ResourceName)
default:
return "", fmt.Errorf("未知的函數(shù): %s", name)
}
}接下來,我們需要實(shí)現(xiàn)具體的操作方法,它們分別是:
- generateAndDeployResource
- queryResource
- deleteResource
在這之前,我們需要在 utils 中封裝 client-go,方便后續(xù)調(diào)用。
// utils/client_go.go
package utils
import (
"fmt"
"path/filepath"
"strings"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
// ClientGo encapsulates both clientset and dynamicClient for Kubernetes operations
type ClientGo struct {
Clientset *kubernetes.Clientset
DynamicClient dynamic.Interface
DiscoveryClient discovery.DiscoveryInterface
}
// NewClientGo initializes a new ClientGo instance with the provided kubeconfig path
func NewClientGo(kubeconfig string) (*ClientGo, error) {
// Handle ~ in the kubeconfig path
if strings.HasPrefix(kubeconfig, "~") {
homeDir := homedir.HomeDir()
kubeconfig = filepath.Join(homeDir, kubeconfig[1:])
}
// Build the configuration from the kubeconfig file
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, fmt.Errorf("failed to build kubeconfig: %w", err)
}
// Create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create clientset: %w", err)
}
// Create the dynamic client
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create dynamic client: %w", err)
}
// Create DiscoveryClient
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create discovery client: %w", err)
}
return &ClientGo{
Clientset: clientset,
DynamicClient: dynamicClient,
DiscoveryClient: discoveryClient,
}, nil
}然后,我們?cè)賮韺?shí)現(xiàn)具體的方法。
其中,generateAndDeployResource 是用來生成 YAML 清單以及部署資源的,其邏輯就是調(diào)用大模型生成純凈的 K8s YAML 資源,然后使用 Client-go 完成資源部署。
// 生成 K8s YAML 并部署資源
func generateAndDeployResource(client *utils.OpenAI, userInput string) (string, error) {
// 生成 YAML 內(nèi)容
yamlContent, err := client.SendMessage("你現(xiàn)在是一個(gè) K8s 資源生成器,根據(jù)用戶輸入生成 K8s YAML,注意除了 YAML 內(nèi)容以外不要輸出任何內(nèi)容,此外不要把 YAML 放在 ``` 代碼快里", userInput)
if err != nil {
return "", fmt.Errorf("生成 YAML 失敗: %v", err)
}
// 創(chuàng)建 Kubernetes 客戶端
clientGo, err := utils.NewClientGo(kubeconfig) // kubeconfig 是一個(gè)全局 Flag
if err != nil {
return "", fmt.Errorf("創(chuàng)建 Kubernetes 客戶端失敗: %v", err)
}
// 獲取 API 資源
resources, err := restmapper.GetAPIGroupResources(clientGo.DiscoveryClient)
if err != nil {
return "", fmt.Errorf("獲取 API 資源失敗: %v", err)
}
// 將 YAML 轉(zhuǎn)成 unstructured 對(duì)象
unstructuredObj := &unstructured.Unstructured{}
_, _, err = scheme.Codecs.UniversalDeserializer().Decode([]byte(yamlContent), nil, unstructuredObj)
if err != nil {
return "", fmt.Errorf("解析 YAML 失敗: %v", err)
}
// 創(chuàng)建 REST mapper
mapper := restmapper.NewDiscoveryRESTMapper(resources)
// 從 unstructuredObj 中提取 GVK
gvk := unstructuredObj.GroupVersionKind()
// 用 GVK 轉(zhuǎn) GVR
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return "", fmt.Errorf("映射資源類型失敗: %v", err)
}
// 設(shè)置默認(rèn)命名空間
namespace := unstructuredObj.GetNamespace()
if namespace == "" {
namespace = "default"
}
// 部署資源
_, err = clientGo.DynamicClient.Resource(mapping.Resource).Namespace(namespace).Create(context.TODO(), unstructuredObj, metav1.CreateOptions{})
if err != nil {
return "", fmt.Errorf("部署資源失敗: %v", err)
}
resourceName := unstructuredObj.GetName()
resourceKind := unstructuredObj.GetKind()
return fmt.Sprintf("? 成功部署 %s/%s 到命名空間 %s\n\n生成的 YAML:\n%s", resourceKind, resourceName, namespace, yamlContent), nil
}對(duì)于 queryResource 是用來查詢資源信息的,不同的資源信息有不同的 GVR,如下:
// 查詢 K8s 資源
func queryResource(namespace, resourceType string) (string, error) {
// 創(chuàng)建 Kubernetes 客戶端
clientGo, err := utils.NewClientGo(kubeconfig)
if err != nil {
return "", fmt.Errorf("創(chuàng)建 Kubernetes 客戶端失敗: %v", err)
}
// 標(biāo)準(zhǔn)化資源類型
resourceType = strings.ToLower(resourceType)
var gvr schema.GroupVersionResource
var displayName string
switch resourceType {
case "deployment", "deployments":
gvr = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
displayName = "Deployment"
case "service", "services", "svc":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
displayName = "Service"
case "pod", "pods":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
displayName = "Pod"
case "configmap", "configmaps", "cm":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}
displayName = "ConfigMap"
case "secret", "secrets":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
displayName = "Secret"
default:
return "", fmt.Errorf("不支持的資源類型: %s。支持的類型: deployment, service, pod, configmap, secret", resourceType)
}
// 查詢資源
resourceList, err := clientGo.DynamicClient.Resource(gvr).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return "", fmt.Errorf("查詢資源失敗: %v", err)
}
// 格式化輸出結(jié)果
if len(resourceList.Items) == 0 {
return fmt.Sprintf("在命名空間 '%s' 中未找到任何 %s 資源", namespace, displayName), nil
}
result := fmt.Sprintf("在命名空間 '%s' 中找到 %d 個(gè) %s 資源:\n\n", namespace, len(resourceList.Items), displayName)
for i, item := range resourceList.Items {
result += fmt.Sprintf("%d. %s\n", i+1, item.GetName())
// 添加一些額外信息
if creationTime := item.GetCreationTimestamp(); !creationTime.IsZero() {
result += fmt.Sprintf(" 創(chuàng)建時(shí)間: %s\n", creationTime.Format("2006-01-02 15:04:05"))
}
result += "\n"
}
return result, nil
}最后,deleteResource 是用來刪除資源信息的,和 queryResource 邏輯差不多,如下:
// 刪除 K8s 資源
func deleteResource(namespace, resourceType, resourceName string) (string, error) {
// 創(chuàng)建 Kubernetes 客戶端
clientGo, err := utils.NewClientGo(kubeconfig)
if err != nil {
return "", fmt.Errorf("創(chuàng)建 Kubernetes 客戶端失敗: %v", err)
}
// 標(biāo)準(zhǔn)化資源類型
resourceType = strings.ToLower(resourceType)
var gvr schema.GroupVersionResource
var displayName string
switch resourceType {
case "deployment", "deployments":
gvr = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
displayName = "Deployment"
case "service", "services", "svc":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
displayName = "Service"
case "pod", "pods":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
displayName = "Pod"
case "configmap", "configmaps", "cm":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}
displayName = "ConfigMap"
case "secret", "secrets":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
displayName = "Secret"
default:
return "", fmt.Errorf("不支持的資源類型: %s。支持的類型: deployment, service, pod, configmap, secret", resourceType)
}
// 檢查資源是否存在
_, err = clientGo.DynamicClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), resourceName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("資源 %s/%s 在命名空間 '%s' 中不存在: %v", displayName, resourceName, namespace, err)
}
// 刪除資源
err = clientGo.DynamicClient.Resource(gvr).Namespace(namespace).Delete(context.TODO(), resourceName, metav1.DeleteOptions{})
if err != nil {
return "", fmt.Errorf("刪除資源失敗: %v", err)
}
return fmt.Sprintf("成功刪除 %s/%s 從命名空間 '%s'", displayName, resourceName, namespace), nil
}(4)我們將 functionCall 和 startChat 結(jié)合起來。
首先,增加一個(gè) processInput 方法,主要用來初始化 openai,并將用戶輸入傳遞給大模型。
func processInput(input string) string {
// 1. 初始化 openai
client, err := utils.NewOpenAIClient()
if err != nil {
return err.Error()
}
// 2. 封裝 utils/openai.go,調(diào)用 OpenAI API 得到回復(fù)
// response, err := client.SendMessage("你好", input)
// 3. 調(diào)用 OpenAI Function calling
response := functionCalling(input, client)
return response
}最后,在 startChat 中調(diào)用 processInput 即可。
func startChat() {
scanner := bufio.NewScanner(os.Stdin)
fmt.Println("我是 K8S Copilot,你可以向我咨詢 K8S 相關(guān)問題")
for {
fmt.Print("> ")
if scanner.Scan() {
input := scanner.Text()
if input == "exit" {
fmt.Println("退出對(duì)話")
break
}
if input == "" {
continue
}
response := processInput(input)
fmt.Println(response)
}
}
}(5)執(zhí)行效果如下
afeac18b0fefb8fd12f032d165061797 MD5
總結(jié)
本文探討了如何將人工智能技術(shù)與 Kubernetes 運(yùn)維實(shí)踐相結(jié)合,成功構(gòu)建了一個(gè)名為 k8scopilot 的智能命令行助手。通過整合 client-go、Cobra 和 OpenAI 的 Function Calling 等關(guān)鍵技術(shù),我們實(shí)現(xiàn)了一個(gè)能夠理解自然語言指令并執(zhí)行相應(yīng) Kubernetes 操作的 AI Agent。
我們首先介紹了與 Kubernetes 集群交互的核心工具 client-go,它是所有自動(dòng)化操作的基石。接著,我們利用 Cobra 庫高效地構(gòu)建了結(jié)構(gòu)清晰、易于擴(kuò)展的命令行界面。最后,通過 OpenAI 的大語言模型,特別是其 Function Calling 能力,我們將模糊的自然語言轉(zhuǎn)換為精確的、可編程的 API 調(diào)用,實(shí)現(xiàn)了從“人理解機(jī)器命令”到“機(jī)器理解人”的范式轉(zhuǎn)變。




























