開發(fā) Operator 調(diào)度 GPU 實例資源池
本章節(jié)將引入一個新的概念——K8s Operator,它是 K8s 的一種擴展形式,可以幫助用戶以 K8s 聲明式 API 的方式管理應(yīng)用及服務(wù),Operator 定義了一組在 Kubernetes 集群中打包和部署復(fù)雜業(yè)務(wù)應(yīng)用的方法,主要是為解決特定應(yīng)用或服務(wù)關(guān)于如何運行、部署及出現(xiàn)問題時如何處理提供的一種特定的自定義方式。比如:
- 按需部署應(yīng)用服務(wù)
 - 實現(xiàn)應(yīng)用狀態(tài)的備份和還原,完成版本升級
 - 數(shù)據(jù)庫 schema 或額外的配置設(shè)置的改動
 
在 K8s 中我們使用的 Deployment、Daemonset、Statefulset 等這些都是 K8s 的資源,這些資源的創(chuàng)建、刪除、更新等動作都會被稱為事件,K8s 的 Controller Manager 負責(zé)事件的監(jiān)聽,并觸發(fā)對應(yīng)的動作來滿足期望,這種方式就是聲明式,即用戶只需要關(guān)心應(yīng)用程序的最終狀態(tài)。當我們在使用中發(fā)現(xiàn)有些資源并不能滿足日常的需求,對于這類需求可以使用 K8s 的自定義資源和 Operator 為應(yīng)用程序提供基于 K8s 的擴展。
在這其中,CRD 就是對自定義資源的描述,如果要自定義資源,就需要先定義好 CRD,也就是介紹這個資源有什么屬性,這些屬性的類型、結(jié)構(gòu)是怎樣的。
比如 PG 的 Operator 如下:
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: postgresqls.acid.zalan.do
  labels:
    app.kubernetes.io/name: postgres-operator
  annotations:
    "helm.sh/hook": crd-install
spec:
  group: acid.zalan.do
  names:
    kind: postgresql
    listKind: postgresqlList
    plural: postgresqls
    singular: postgresql
    shortNames:
    - pg  additionalPrinterColumns:
  - name: Team
    type: string
    description: Team responsible for Postgres CLuster
    JSONPath: .spec.teamId
  - name: Version
    type: string
    description: PostgreSQL version
    JSONPath: .spec.postgresql.version
  - name: Pods
    type: integer
    description: Number of Pods per Postgres cluster
    JSONPath: .spec.numberOfInstances
  - name: Volume
    type: string
    description: Size of the bound volume
    JSONPath: .spec.volume.sizeCRD 主要包括 apiVersion、kind、metadata 和 spec 四個部分。其中最關(guān)鍵的是 apiVersion 和 kind,apiVersion 表示資源所屬組織和版本,apiVersion 一般由 APIGourp 和 Version 組成,這里的 APIGourp 是http://apiextensions.k8s.io,Version 是 v1beta1,相關(guān)信息可以通過kubectl api-resoures查看。kind 表示資源類型,這里是CustomResourceDefinition,表示是一個自定義的資源描述。
本文我們將自己開發(fā)一個 Operator 來維護 GPU 資源池的穩(wěn)定,解決 AI 模型訓(xùn)練的基礎(chǔ)平臺的穩(wěn)定性。其架構(gòu)如下:

ee11ee9bb3ba2f232c0f78573956823f MD5
其中:
- GPU 資源池采用的是騰訊云的競價 GPU 實例
 - Operator 運行在 K8s 中,通過 SpootPool 控制 GPU 資源池的數(shù)量
 - 若云平臺釋放了某臺 GPU 實例,當 Operator 監(jiān)聽到資源池數(shù)量和期望的不匹配,會自動補充到期望數(shù)量
 
Operator 的開發(fā)有多種腳手架,常用的有 operator-sdk、kubebuilder 等,這里我們將使用 kubebuilder 來完成 Operator 的開發(fā)。
前置條件
- 準備一個可用的 K8s 集群,可以使用 kind、kubeadm、二進制等各種形式安裝,如果使用 kubeadm 安裝集群,可以參考 Kubernetes集群管理。
 - 安裝好 kubebuilder,可以參考 kubebuild快速安裝。
 - 準備好云平臺的 AK,這里是采用騰訊云,其他云類似。
 
快速開始
1.設(shè)計 CRD
在開發(fā)之前需要先設(shè)計好 CRD(就像業(yè)務(wù)開發(fā)前先設(shè)計好表結(jié)構(gòu)一樣),本文的 CRD 主要包含云平臺虛擬機的開通,包括最小和最大實例數(shù),以及騰訊云 SDK 所需要的各種參數(shù),比如地域、可用區(qū)、VPC、子網(wǎng)、安全組、鏡像等。
最后 CRD 設(shè)計如下:
apiVersion: devops.jokerbai.com/v1
kind: Spotpool
metadata:
  labels:
    app.kubernetes.io/name: spotpool
    app.kubernetes.io/managed-by: kustomize
  name: spotpool-sample
spec:
  secretId: 密鑰ID
  secretKey: 密鑰Key
  region: 區(qū)域
  availabilityZone: 可用區(qū)
  instanceType: 實例類型
  minimum: 最小實例數(shù)
  maximum: 最大實例數(shù)
  subnetId: 子網(wǎng)ID
  vpcId: VPC ID
  securityGroupIds:
    - 安全組
  imageId: 鏡像ID
  instanceChargeType: 實例付費類型2.初始化項目
定義好 CRD 字段之后,我們先使用 kubebuilder 初始化一個 Operator 項目,命令如下:
(1)初始化項目
mkdir spotpool && cd spotpool
kubebuilder init \
  --domain jokerbai.com \
  --repo github.com/joker-bai/spotpool \
  --project-name spotpool \
  --plugins go/v4 \
  --owner "Joker Bai"(2)創(chuàng)建 API
kubebuilder create api --group devops.jokerbai.com --version v1 --kind Spotpool(3)生成后的目錄結(jié)構(gòu)大致如下:
.
├── api
│   └── v1
│       ├── groupversion_info.go
│       ├── spotpool_types.go
│       └── zz_generated.deepcopy.go
├── bin
│   ├── controller-gen -> /root/workspace/godev/src/github.com/joker-bai/spotpool/bin/controller-gen-v0.18.0
│   └── controller-gen-v0.18.0
├── cmd
│   └── main.go
├── config
│   ├── crd
│   │   ├── kustomization.yaml
│   │   └── kustomizeconfig.yaml
│   ├── default
│   │   ├── cert_metrics_manager_patch.yaml
│   │   ├── kustomization.yaml
│   │   ├── manager_metrics_patch.yaml
│   │   └── metrics_service.yaml
│   ├── manager
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── network-policy
│   │   ├── allow-metrics-traffic.yaml
│   │   └── kustomization.yaml
│   ├── prometheus
│   │   ├── kustomization.yaml
│   │   ├── monitor_tls_patch.yaml
│   │   └── monitor.yaml
│   ├── rbac
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── metrics_auth_role_binding.yaml
│   │   ├── metrics_auth_role.yaml
│   │   ├── metrics_reader_role.yaml
│   │   ├── role_binding.yaml
│   │   ├── role.yaml
│   │   ├── service_account.yaml
│   │   ├── spotpool_admin_role.yaml
│   │   ├── spotpool_editor_role.yaml
│   │   └── spotpool_viewer_role.yaml
│   └── samples
│       ├── devops.jokerbai.com_v1_spotpool.yaml
│       └── kustomization.yaml
├── Dockerfile
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
├── internal
│   └── controller
│       ├── spotpool_controller.go
│       ├── spotpool_controller_test.go
│       └── suite_test.go
├── Makefile
├── PROJECT
├── README.md
└── test
    ├── e2e
    │   ├── e2e_suite_test.go
    │   └── e2e_test.go
    └── utils
        └── utils.go3.CRD 開發(fā)
(1)定義 API
在api/v1alpha1/spotpool_types.go中定義 CRD 的結(jié)構(gòu)體,如下:
package v1
import (
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.
// SpotpoolSpec defines the desired state of Spotpool
type SpotpoolSpec struct {
 // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
 // Important: Run "make" to regenerate code after modifying this file
 SecretId           string   `json:"secretId,omitempty"`
 SecretKey          string   `json:"secretKey,omitempty"`
 Region             string   `json:"region,omitempty"`
 AvaliableZone      string   `json:"availabilityZone,omitempty"`
 InstanceType       string   `json:"instanceType,omitempty"`
 SubnetId           string   `json:"subnetId,omitempty"`
 VpcId              string   `json:"vpcId,omitempty"`
 SecurityGroupId    []string `json:"securityGroupIds,omitempty"`
 ImageId            string   `json:"imageId,omitempty"`
 InstanceChargeType string   `json:"instanceChargeType,omitempty"`
 Minimum            int32    `json:"minimum,omitempty"`
 Maximum            int32    `json:"maximum,omitempty"`
}
// SpotpoolStatus defines the observed state of Spotpool
type SpotpoolStatus struct {
 // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
 // Important: Run "make" to regenerate code after modifying this file
 Size       int32              `json:"size,omitempty"`
 Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,rep,name=conditions"`
 Instances  []Instances        `json:"instances,omitempty"`
}
type Instances struct {
 InstanceId string `json:"instanceId,omitempty"`
 PublicIp   string `json:"publicIp,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// Spotpool is the Schema for the spotpools API
type Spotpool struct {
 metav1.TypeMeta   `json:",inline"`
 metav1.ObjectMeta `json:"metadata,omitempty"`
 Spec   SpotpoolSpec   `json:"spec,omitempty"`
 Status SpotpoolStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// SpotpoolList contains a list of Spotpool
type SpotpoolList struct {
 metav1.TypeMeta `json:",inline"`
 metav1.ListMeta `json:"metadata,omitempty"`
 Items           []Spotpool `json:"items"`
}
func init() {
 SchemeBuilder.Register(&Spotpool{}, &SpotpoolList{})
}在 SpotpoolSpec 中定義設(shè)計的 CRD 結(jié)構(gòu)體,這些字段都是創(chuàng)建虛擬機的必要字段。另外,在 SpotpoolStatus 中定義返回狀態(tài)里的信息,這里只需要 Instance 相關(guān)的信息。
(2)生成代碼
API 相關(guān)的代碼開發(fā)完后,執(zhí)行以下命令生成代碼:
make generate
make manifests4.Controller 開發(fā)
(1)開發(fā)控制器邏輯
控制器的主邏輯是:
- 從云平臺獲取運行的實例數(shù)
 - 判斷實例數(shù)和期望的實例數(shù)是否相等
 
- 如果小于期望值,則創(chuàng)建實例
 - 如果大于期望值,則刪除實例
 
所以主邏輯的代碼如下,修改internal/controller/spotpool_controller.go:
func (r *SpotpoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
 log := logf.FromContext(ctx)
 // 獲取用戶期望
 spotpool := &devopsjokerbaicomv1.Spotpool{}
 if err := r.Get(ctx, req.NamespacedName, spotpool); err != nil {
  log.Error(err, "unable to fetch spotspool")
 }
 // 從云平臺獲取獲取運行的實例
 runningVmList, err := r.getRunningInstanceIds(spotpool)
 if err != nil {
  log.Error(err, "get running vm instance failed")
  // 十秒后重試
  return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
 }
 runningCount := len(runningVmList)
 switch {
 case runningCount < int(spotpool.Spec.Minimum):
  // 創(chuàng)建實例擴容
  delta := spotpool.Spec.Minimum - int32(runningCount)
  log.Info("creating instances", "delta", delta)
  err = r.runInstances(spotpool, delta)
  if err != nil {
   log.Error(err, "unable to create instances")
   return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
  }
 case runningCount > int(spotpool.Spec.Maximum):
  // 刪除實例縮容
  delta := int32(runningCount) - spotpool.Spec.Maximum
  log.Info("terminating instances", "delta", delta)
  err = r.terminateInstances(spotpool, delta)
  if err != nil {
   log.Error(err, "unable to terminate instances")
   return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
  }
 }
 return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}其中:
r.getRunningInstanceIds(spotpool)用戶獲取云平臺運行的實例數(shù)r.runInstances(spotpool, delta)用于調(diào)用云平臺進行擴容r.terminateInstances(spotpool, delta)用于調(diào)用云平臺進行縮容
接下來分別實現(xiàn)上面的三個方法。
(1)首先,實現(xiàn) getRunningInstanceIds 方法
func (r *SpotpoolReconciler) getRunningInstanceIds(spotpool *devopsjokerbaicomv1.Spotpool) ([]string, error) {
 client, err := r.createCVMClient(spotpool.Spec)
 if err != nil {
  return nil, err
 }
 request := cvm.NewDescribeInstancesRequest()
 response, err := client.DescribeInstances(request)
 if err != nil {
  return nil, err
 }
 var instances []devopsjokerbaicomv1.Instances
 var runningInstanceIDs []string
 for _, instance := range response.Response.InstanceSet {
  if *instance.InstanceState == "RUNNING" || *instance.InstanceState == "PENDING" || *instance.InstanceState == "STARTING" {
   runningInstanceIDs = append(runningInstanceIDs, *instance.InstanceId)
  }
  // 檢查實例的公網(wǎng) IP,如果不存在公網(wǎng) IP,則繼續(xù)重試
  if len(instance.PublicIpAddresses) == 0 {
   return nil, fmt.Errorf("instance %s does not have public ip", *instance.InstanceId)
  }
  instances = append(instances, devopsjokerbaicomv1.Instances{
   InstanceId: *instance.InstanceId,
   PublicIp:   *instance.PublicIpAddresses[0],
  })
 }
 // 更新 status
 spotpool.Status.Instances = instances
 err = r.Status().Update(context.Background(), spotpool)
 if err != nil {
  return nil, err
 }
 return runningInstanceIDs, nil
}
// 獲取騰訊云 SDK client
func (r *SpotpoolReconciler) createCVMClient(spec devopsjokerbaicomv1.SpotpoolSpec) (*cvm.Client, error) {
 credential := common.NewCredential(spec.SecretId, spec.SecretKey)
 cpf := profile.NewClientProfile()
 cpf.HttpProfile.ReqMethod = "POST"
 cpf.HttpProfile.ReqTimeout = 30
 cpf.SignMethod = "HmacSHA1"
 client, err := cvm.NewClient(credential, spec.Region, cpf)
 if err != nil {
  return nil, err
 }
 return client, nil
}其中:
- 調(diào)用 
r.createCVMClient(spotpool.Spec)獲取騰訊云SDK client - 然后調(diào)用 
client.DescribeInstances(request)獲取實例詳細信息 - 最后通過判斷 
instance.InstanceStat和instance.PublicIpAddresses的狀態(tài)信息決定是否是需要的實例 - 最后返回實例列表信息
 
(2)實現(xiàn) r.runInstances(spotpool, delta) 用于調(diào)用云平臺進行擴容
func (r *SpotpoolReconciler) runInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32) error {
 client, err := r.createCVMClient(spotpool.Spec)
 if err != nil {
  return err
 }
 request := cvm.NewRunInstancesRequest()
 request.ImageId = common.StringPtr(spotpool.Spec.ImageId)
 request.Placement = &cvm.Placement{
  Zone: common.StringPtr(spotpool.Spec.AvaliableZone),
 }
 request.InstanceChargeType = common.StringPtr(spotpool.Spec.InstanceChargeType)
 request.InstanceCount = common.Int64Ptr(int64(count))
 request.InstanceName = common.StringPtr("spotpool" + time.Now().Format("20060102150405"))
 request.InstanceType = common.StringPtr(spotpool.Spec.InstanceType)
 request.InternetAccessible = &cvm.InternetAccessible{
  InternetChargeType:      common.StringPtr("BANDWIDTH_POSTPAID_BY_HOUR"),
  InternetMaxBandwidthOut: common.Int64Ptr(1),
  PublicIpAssigned:        common.BoolPtr(true),
 }
 request.LoginSettings = &cvm.LoginSettings{
  Password: common.StringPtr("Password123"),
 }
 request.SecurityGroupIds = common.StringPtrs(spotpool.Spec.SecurityGroupId)
 request.SystemDisk = &cvm.SystemDisk{
  DiskType: common.StringPtr("CLOUD_BSSD"),
  DiskSize: common.Int64Ptr(100),
 }
 request.VirtualPrivateCloud = &cvm.VirtualPrivateCloud{
  SubnetId: common.StringPtr(spotpool.Spec.SubnetId),
  VpcId:    common.StringPtr(spotpool.Spec.VpcId),
 }
 // print request
 fmt.Println(request.ToJsonString())
 // 創(chuàng)建實例
 response, err := client.RunInstances(request)
 if _, ok := err.(*errors.TencentCloudSDKError); ok {
  return err
 }
 // other errors
 if err != nil {
  return err
 }
 // 獲取到返回的 instancesid
 instanceIds := make([]string, 0, len(response.Response.InstanceIdSet))
 for _, instanceId := range response.Response.InstanceIdSet {
  instanceIds = append(instanceIds, *instanceId)
 }
 fmt.Println("run instances success", instanceIds)
 // 更新 status
 _, err = r.getRunningInstanceIds(spotpool)
 if err != nil {
  return err
 }
 return nil
}這個方法主要是調(diào)用 client.RunInstances(request) 進行實例創(chuàng)建,然后調(diào)用 r.getRunningInstanceIds(spotpool) 更新 status 的狀態(tài)信息。
(3)開發(fā)r.terminateInstances(spotpool, delta) 用于調(diào)用云平臺進行縮容
func (r *SpotpoolReconciler) terminateInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32) error {
 client, err := r.createCVMClient(spotpool.Spec)
 if err != nil {
  return err
 }
 runningInstances, err := r.getRunningInstanceIds(spotpool)
 if err != nil {
  return err
 }
 instancesIds := runningInstances[:count]
 request := cvm.NewTerminateInstancesRequest()
 request.InstanceIds = common.StringPtrs(instancesIds)
 // 獲取返回
 response, err := client.TerminateInstances(request)
 if _, ok := err.(*errors.TencentCloudSDKError); ok {
  return err
 }
 // other errors
 if err != nil {
  return err
 }
 fmt.Println("Terminate response: ", response)
 fmt.Println("terminate instances success", instancesIds)
 // 更新 status
 _, err = r.getRunningInstanceIds(spotpool)
 if err != nil {
  return err
 }
 return nil
}刪除實例和創(chuàng)建實例的實現(xiàn)邏輯類似,先調(diào)用 client.TerminateInstances(request) 進行刪除,然后調(diào)用 r.getRunningInstanceIds(spotpool) 更新狀態(tài)。
上面三個步驟完成了主要邏輯開發(fā),可以初步實現(xiàn)具體的效果,如果希望功能更健全,則需要對其進行開發(fā)優(yōu)化。
部署和測試
1.本地測試
# 安裝 CRD
make install
# 運行 controller
make run2.創(chuàng)建 Spotpool 實例測試
(1)創(chuàng)建 Spotpool 資源清單,編輯 config/samples/devops.jokerbai.com_v1_spotpool.yaml
apiVersion: devops.jokerbai.com.jokerbai.com/v1
kind: Spotpool
metadata:
  labels:
    app.kubernetes.io/name: spotpool
    app.kubernetes.io/managed-by: kustomize
  name: spotpool-sample
spec:
  secretId: xxx
  secretKey: xxx
  region: ap-singapore
  availabilityZone: ap-singapore-2
  instanceType: "GN7.2XLARGE32"
  minimum: 2
  maximum: 2
  subnetId: DEFAULT
  vpcId: DEFAULT
  securityGroupIds:
    - sg-xxx
  imageId: img-xxx
  instanceChargeType: SPOTPAID(2)運行資源清單
# 創(chuàng)建實例
kubectl apply -f config/samples/devops.jokerbai.com_v1_spotpool.yaml
# 查看狀態(tài)
kubectl get spotpool(3)構(gòu)建并部署到集群
# 構(gòu)建鏡像
make docker-build docker-push IMG=<your-registry>/spotpool:v1
# 部署到集群
make deploy IMG=<your-registry>/spotpool:v1(4)清理
# 刪除 operator
make undeploy
# 刪除 CRD
make uninstall最后
本文通過結(jié)合 Kubernetes、AI 和云平臺,深入探討了如何利用 K8s Operator 實現(xiàn)對 GPU 資源池的自動化管理。我們從 Operator 的核心概念出發(fā),介紹了 CRD(自定義資源定義)和控制器的設(shè)計原理,并基于 kubebuilder 開發(fā)了一個名為 Spotpool 的 Operator,用于在騰訊云上維護競價實例的穩(wěn)定運行。
整個開發(fā)過程遵循“聲明式 API”的思想,用戶只需定義期望的狀態(tài)(如最小/最大實例數(shù)),Operator 便會在后臺持續(xù)監(jiān)控并自動調(diào)整實際狀態(tài),確保資源池始終符合預(yù)期。這不僅極大地簡化了運維操作,也提升了 AI 模型訓(xùn)練平臺的穩(wěn)定性和彈性。
Operator 是云原生時代自動化運維的重要利器。掌握其開發(fā)方法,意味著我們不僅能“用好” Kubernetes,更能“擴展” Kubernetes,為復(fù)雜業(yè)務(wù)場景提供定制化的解決方案。















 
 
 











 
 
 
 