如何寫一個 K8s Operator

如何寫一個 K8s Operator

使用過 k8s 的同學可能執行過以下命令:

1kubectl edit sts myapp # 編輯一個名稱爲 myapp 的 StatefulSet
2kubectl describe sts myapp # 查看一個名稱爲 myapp 的 StatefulSet

StatefulSet 是 k8s 定義的一種資源,類似的還有 Deployment、Job、ConfigMap 等。當你執行 edit 命令編輯這些資源後,k8s 會通過不停輪詢的方式(核心概念:control loop),將目標資源調整(核心概念:reconcile)到你期望的狀態。

例如,你按了空調的遙控器,希望將房間的溫度下調到 20℃。空調的壓縮機開始工作,並且同時不停的檢測當前實際的溫度與你期望的溫度之間的差異,直到溫度達到20℃,這就是一個 control loop 的例子。

很簡單,對吧?

設想一下如果不是這樣,你將一手拿着溫度計,然後不停的告訴空調溫度仍然很高,或者已經變得過低了。

這就是聲明式 API 的好處,用戶只需要告訴程序你的期望,剩下的交給程序來做(對於程序開發者來說是雷鋒行爲),而程序實現目標最省力的方式,就是採用 control loop 的方式,不停的對比期望與現實的差距。

Operator 是什麼

試想我們不再滿足於 k8s 提供的默認的資源,我們想利用這種省心省力的方式,來管理我們自己的資源,如:數據庫的一個用戶。

你可能想說,數據庫的用戶存在於數據庫內,我知道數據庫的集羣可以定義爲 StatefulSet 然後由 k8s 管理,用戶又怎麼使用 k8s 管理呢?爲什麼要用 k8s 來管理呢?

爲什麼要用 k8s 管理用戶資源?

以 MySql 爲例,通常我們創建用戶,是使用 root 用戶登錄到數據庫,執行 sql 語句創建用戶。但是設想以下幾種場景:

  1. 你不知道 root 用戶的密碼,或者因爲安全要求,不能提供給你
  2. 你不知道 MySql 的 IP
  3. 你知道以上信息,但是因爲沒有開啓相應的節點權限,你無法登錄數據庫
  4. 你完成了以上所有步驟,結果其中某些登錄或者創建步驟失敗了,你和數據庫運維人員開始扯皮

看到了吧?這些都是生產環境中,真實會遇到的事情。而使用以下步驟,我們就可以一舉解決這些問題。

怎麼做到?

把大象關進冰箱需要三步,而我們要使用 Operator 完成在數據庫中創建用戶只需要兩步:

  1. 告訴數據庫,我需要創建的用戶信息

    1apiVersion: handsomeguy.cn/v1alpha1
    2kind: DatabaseUser                        
    3metadata:
    4  name: cnhandsomeguy                                 
    5spec:
    6  user: cnhandsomeguy                
    7  password: changeit                     
    

    這就是一個最簡單的自定義資源(核心概念:custom resource,簡稱 cr),包含用戶名、密碼,還有 k8s 資源的一些唯一性信息,如 apiVersion(假如你對自己的定義不滿意,新加了一些字段,就需要更改版本號,但是這種做法要堅決避免,後文會提到原因和對策),Kind(就像 StatefulSet 和 Deployment 也是一種 Kind 一樣,我們給自己起了一個名字叫 DatabaseUser)

    聰明的同學肯定能看到,這個資源還缺少了一些信息,如需要在哪個數據庫創建?密碼怎麼明文寫在這裏了呢?我們將在後面的章節完善這些部分。

  2. 數據庫來創建用戶

    實際上此時並不是數據庫來執行創建用戶的動作,而是我們的 Operator。Operator 一直在待命(持續的監控 Kind 爲 DatabaseUser 的資源),在我們提交上面的請求後,它就可以連接到數據庫,執行創建用戶的動作,當然,這部分邏輯需要由我們自己來編寫。

簡單吧!

Operator 是管理 k8s 自定義資源的一種擴展。它也遵循 control loop 的設計理念,通常我們需要在一個 Operator 中,編寫一個控制器(核心概念:controller),它是一段代碼(廢話),這個控制器接收到資源的創建、更新、刪除事件,由我們編碼來決定:

  1. 如何實現這些創建、更新、刪除的邏輯
  2. 檢測是否達到了期望,如果返回了錯誤,則認爲需要進入下一次循環,再來一遍!

看看一個案例

還有一個重要的概念沒有介紹:自定義資源的定義(custom resource definition,簡稱 crd)。有點繞口,但是試着這麼理解:

  1. Operator 是一個進程,一直運行在 k8s 集羣內部,監控着某種 Kind 的資源的事件
  2. 它到底在監控什麼呢?我們需要一個名字!(Kind)
  3. 如果它監控到了 DatabaseUser,該如何去 spec 中找到用戶名、密碼這些信息呢? 我們需要一個定義,一個描述文件,來事先告訴 Operator 數據庫用戶的類型、細節,以便於 Operator 來監控、按照流程執行。

使用以下命令可以查看當前 k8s 中已經有哪些 crd:

1kubectl get crd

所以現在,我們需要以下幾種東西:

  1. 資源定義(crd)
  2. Operator 程序的編碼和部署
  3. 資源(cr)

嚇到我了,我需要從零開始編碼,寫一個 Operator 嗎?可以用 Java 嗎?部署在哪?怎麼監控?怎麼對接 k8s?

Relax! 有框架,有示例,只要你的 Ctrl + C/V 能用就行,可以開始了嗎?

使用 Kubebuilder 開發 Operator

有點快了,Kubebuilder 是什麼?爲什麼選它?

還有個選擇是 operator-sdk,大同小異,都是生成代碼的工具罷了。當然你想手擼也不是不行。

我建議通篇閱讀一下 https://book.kubebuilder.io/,但是時間有限的同學,看本文熟悉下脈絡就行。本文有個作用是,幫助你避免一些坑,否則你生成的代碼很可能是在某些 k8s 版本上跑不起來的。

準備

  1. 準備一臺 linux 虛擬機,並且安裝好 gcc 和 make 命令。

    按照 https://book.kubebuilder.io/quick-start.html#installation 執行命令,下載 kubebuilder。

  2. 初始化倉庫

    按照 https://book.kubebuilder.io/quick-start.html#create-a-project 執行命令,執行初始化

    1kubebuilder init --domain handsomeguy.cn --repo handsomeguy.cn/databaseuser
    
  3. 創建一個 API

    一個 Operator 可以管理多個資源,這些資源可以理解爲就是一個 API。

    1kubebuilder create api --crd-version v1 --group mygroup --version v1alpha1 --kind DatabaseUser
    

    這裏注意一下, –crd-version 從 k8s 1.16 的版本後就不支持 v1beta1 了。 這一步驟會生成一些 go 文件。

  4. 編輯這些 go 文件。 假設我們要增加用戶名和密碼:

    1type DatabaseUser struct {
    2   metav1.TypeMeta   `json:",inline"`
    3   metav1.ObjectMeta `json:"metadata"`
    4   Spec   DatabaseUserSpec   `json:"spec,omitempty"`
    5}
    6type DatabaseUserSpec struct {
    7	  User string `json:"user,omitempty"`
    8	  Password corev1.SecretKeySelector `json:"password,omitempty"`
    9}
    

    編寫好了,假設先寫這麼多。有兩點要說明一下:

    1. 上面我們說密碼是明文存儲的,不安全,這裏我們使用了一個 corev1.SecretKeySelector。假設你的密碼存儲在了某一個名爲 my-secret 的卷中的 my-key 字段,我們就可以在後面使用如下方式來取到它的明文。
    1apiVersion: handsomeguy.cn/v1alpha1
    2kind: DatabaseUser                        
    3metadata:
    4  name: cnhandsomeguy                                 
    5spec:
    6  user: cnhandsomeguy                
    7  password: 
    8    name: my-secret
    9    key: my-key                     
    
    1. 用戶的定義還可以增加 status 這樣的 sub-resource,這樣在用戶創建失敗的時候,將失敗的信息刷回到 status 中,就可以使用 kubectl describe 命令來查看失敗信息,後文會給案例。
  5. 生成 crd 文件

    1make manifests #在項目的根目錄執行 
    

    重大提醒

    根目錄中有MakeFile文件,有必要仔細閱讀一下。因爲 make manifests 步驟會調用一個 controller-gen 的工具,來生成 crd 文件。但是 controller-gen 和 k8s 是有嚴格的配套關係的。如果你想生成老版本的(k8s 1.12)的 v1beta1 版本的 crd 文件,就必須使用 0.6.2 版本之前的 controller-gen。可以查看其 release 頁面 https://github.com/kubernetes-sigs/controller-tools/releases 來確定使用什麼版本。 controller-gen 是由 MakeFile 中指定並在 make 過程中自動下載的(你也可以下載好後放到指定位置),如果想要修改 controller-gen 的版本,可以在 MakeFile 的以下位置修改:

    1controller-gen: ## Download controller-gen locally if necessary.
    2  $(call go-get-tool,$(CONTROLLER_GEN),sigs.k8s.io/controller-tools/cmd/controller-gen@v0.6.2) #將0.6.2改爲你需要的版本
    
  6. 同時生成多個版本的 crd 文件 修改 MakeFile 的以下幾行,以同時生成支持新老版本 k8s 的 crd 文件。

    1CRD_OPTIONS ?= "crd:crdVersions={v1beta1,v1},trivialVersions=true,preserveUnknownFields=false"
    2manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and CustomResourceDefinition objects.
    3 $(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=my-crd-manager webhook paths=./... output:crd:artifacts:config=config/crd/bases
    

    實際執行的命令其實是,

    1bin/controller-gen rbac:roleName=my-crd-manager crd:crdVersions={v1,v1beta1} webhook paths=./... output:crd:artifacts:config=config/crd/ bases
    

    因此你也可以手動執行。

  7. 在 k8s cluster 中創建這個 crd 在進行這一步之前,你可以微調你的 crd,比如調整它的縮寫爲 dbu,這樣就可以執行 kubectl get dbu 來查看你的資源。

     1---
     2apiVersion: apiextensions.k8s.io/v1
     3kind: CustomResourceDefinition
     4metadata:
     5  annotations:
     6    controller-gen.kubebuilder.io/version: v0.6.2
     7    "helm.sh/resource-policy": keep
     8  creationTimestamp: null
     9  name: databaseuserss.handsomeguy.cn
    10spec:
    11  group: mygroup.handsomeguy.cn
    12  names:
    13    kind: DatabaseUser
    14    listKind: DatabaseUserList
    15    plural: databaseusers
    16    singular: databaseuser
    17    shortNames: [dbu]
    18  preserveUnknownFields: false
    19  scope: Namespaced
    20  versions:
    21  // ... 省略
    

    使用以下命令創建並查看 crd。

    1kubectl apply -f my-crd.yaml
    2kubectl get crd
    

到這裏,準備工作就做完了(不出意外,你會在 make manifests 的時候遇到很多報錯,請耐心查看報錯,一一思考解決,都是有跡可循的。)。 我們現在有了

  1. crd
  2. 代碼 此時可以觀察一下生成的代碼,如 DatabaseUserController 的 Reconcile 方法,這裏將是你編碼的主要陣地。 還差億點點小細節,就可以編寫 cr 文件並部署測試了。

對接 k8s

首先我們應該對接 k8s,不然怎麼知道我們的 operator 是否能正常監聽到資源呢? 查看 sigs.k8s.io/controller-runtime/pkg/client/config/config.go 的源碼,應該是有很多中配置的方式,我們選擇最簡單的一種:指定 KUBECONFIG 變量。

 1kind: Config
 2apiVersion: v1
 3clusters:
 4- cluster:
 5    insecure-skip-tls-verify: false
 6    certificate-authority: {{CA_DIR}}/ca.crt
 7    server: https://{{KUBERNETES_MASTER}}
 8  name: cluster
 9users:
10- user:
11    client-certificate: {{CA_DIR}}/kubecfg.crt
12    client-key-data: {{CLIENT_KEY}}
13  name: user
14contexts:
15- context:
16    cluster: cluster
17    user: user
18  name: defaultContext
19current-context: defaultContext

這個配置不是開箱即用的,多想一想怎麼獲取到這些證書吧,我寫本文的時候手頭沒有 k8s 集羣,暫不能提供方法了。

配置好後,在你的 go 程序運行的時候指定或在開發過程中在 goland 配置都可以,具體方式不再贅述。

開始編碼

  1. 監控指定 namespace 的資源

    operator 可以監控一個或多個 k8s namespace 下的資源。在 main.go 中找到如下位置,修改即可:

    1mgrOptions := ctrl.Options{
    2   Scheme:                 scheme,
    3   MetricsBindAddress:     setup.MetricsAddr,
    4   HealthProbeBindAddress: setup.ProbeAddr,
    5   NewCache:               cache.MultiNamespacedCacheBuilder(your_name_spaces), // 在此處指定需要監控的 namespace,實際生產過程中這些都是要做成可配置的,或通過啓動參數指定
    6}
    7mgr, err := ctrl.NewManager(cfg, mgrOptions)
    
  2. 監控要創建到某個數據庫的資源

    可以利用 k8s 資源的 label。如:

     1apiVersion: handsomeguy.cn/v1alpha1
     2kind: DatabaseUser                        
     3metadata:
     4  label:
     5    target: that-database
     6  name: cnhandsomeguy                                 
     7spec:
     8  user: cnhandsomeguy                
     9  password: 
    10    name: my-secret
    11    key: my-key   
    

    這樣我們就可以在 DatabaseUserController 中過濾出要創建到指定數據庫的資源,其它數據庫的資源都不管。

     1func GetLabelEventFilter(label string, value string) predicate.Predicate {
     2	  return predicate.Funcs{
     3	  	UpdateFunc: func(event event.UpdateEvent) bool {
     4	  		return strings.ToLower(event.ObjectOld.GetLabels()['target']) == strings.ToLower(value)
     5	  	},
     6	  	DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
     7	  		return strings.ToLower(deleteEvent.Object.GetLabels()['target']) == strings.ToLower(value)
     8	  	},
     9	  	CreateFunc: func(createEvent event.CreateEvent) bool {
    10	  		return strings.ToLower(createEvent.Object.GetLabels()['target']) == strings.ToLower(value)
    11	  	},
    12	  	GenericFunc: func(genericEvent event.GenericEvent) bool {
    13	  		return strings.ToLower(genericEvent.Object.GetLabels()['target']) == strings.ToLower(value)
    14	  	},
    15	  }
    16}
    17// SetupWithManager 方法由 kubebuilder 生成
    18func (r *DatabaseUserReconciler) SetupWithManager(mgr ctrl.Manager) error {
    19    if err := mgr.GetFieldIndexer().IndexField(context.Background(),
    20       // ... 省略
    21    }
    22
    23    logger.Infof("setup with manager, %s, %s", constants.CrLabelKey, r.Opts.Target)
    24    return ctrl.NewControllerManagedBy(mgr).
    25    	  For(&v1.DatabaseUser{}).
    26    	  WithEventFilter(GetLabelEventFilter(constants.CrLabelKey, r.Opts.Target)).
    27    	  Owns(&v1.DatabaseUser{}).
    28    	  Complete(r)
    29        // ... 省略
    
  3. Reconcile 方法的編寫

     1func (r *DatabaseUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
     2	   // get user from cluster
     3	   var DatabaseUser v1.DatabaseUser
     4	   if err := r.Get(ctx, req.NamespacedName, &DatabaseUser); err != nil {
     5	   	   logger.Errorf("unable to fetch DatabaseUser: %v.", err)
     6	   	   return ctrl.Result{}, client.IgnoreNotFound(err)
     7	   }
     8
     9	   // 處理用戶的刪除事件
    10	   if !DatabaseUser.ObjectMeta.DeletionTimestamp.IsZero() {
    11	   	   return ctrl.Result{}, r.delete(ctx, &DatabaseUser)
    12	   }
    13
    14	   logger.Infof("%s before update %s.", DatabaseUser.Name, DatabaseUser.ResourceVersion)
    15	   oldStatus := DatabaseUser.Status.DeepCopy()
    16	   // 用戶創建、更新等邏輯編寫。可以調用 job 或者直接在 go 中連接本地數據庫進行用戶操作。
    17	   reconcileError := r.do(ctx, &DatabaseUser)
    18    // 將實際狀態刷新到資源中
    19	   updateStatusError := r.updateStatus(ctx, &DatabaseUser, oldStatus)
    20    // 在以下方法中判斷成功還是失敗,並決策是否進行下一輪循環
    21	   return againOrDone(&DatabaseUser, reconcileError, updateStatusError)
    22}
    

    Reconcile 返回兩個結果:

    1return reconcile.Result{
    2 		Requeue:      true, // 重新開始下次循環
    3 		RequeueAfter: requeueAfter,
    4 	}, err // err 不爲 nil 的時候也重新開始下次循環
    
  4. 如何從 secret 中獲取密碼

    示例代碼,可以參考如下,實際實現還需要考慮很多健壯性和擴展性。

     1func (r *DatabaseUserReconciler) GetPasswordInCluster(ctx context.Context, user *v1alpha1.DatabaseUser) (string, error) {
     2    secret := &corev1.Secret{}
     3    secretKey := client.ObjectKey{Name: user.Spec.Password.Name, Namespace: user.Namespace}
     4
     5    if err := r.Get(ctx, secretKey, secret); err != nil {
     6    	   return "", err
     7    }
     8    var path = &user.Spec.Password.Key
     9    return string(sec.Data[path])
    10 }
    
  5. finalizer 實現同步刪除資源

    當你執行 kubectl delete dbu my-user 的時候,我們希望 k8s 等待 operatoror 執行完成並返回刪除用戶成功後,才真的刪除這個 cr 文件。這樣就需要用到 finalizer的能力。見官方文檔:https://kubernetes.io/zh-cn/docs/concepts/overview/working-with-objects/finalizers/

    代碼中,可以這麼實現:

     1	// finalizer is pre-delete hook
     2    myFinalizer = "mygroup.handsomeguy.cn/databaseuser"
     3    // 在新增用戶的時候,打上 finalizer 標記,使用 kubectl get dbu 可以看到 finalizer 的信息
     4	if !controllerutil.ContainsFinalizer(user, myFinalizer) {
     5		controllerutil.AddFinalizer(user, myFinalizer)
     6		if err = r.Update(ctx, user); err != nil {
     7			return
     8		}
     9	}
    10    // 在刪除用戶的時候,如果刪除成功就去除這個 finalizer
    11    if controllerutil.ContainsFinalizer(user, myFinalizer) && !r.Opts.SkipUpdateStatus {
    12        // 先刪除用戶
    13		if err := DeleteUser(user); err != nil {
    14			return err
    15		}
    16        // 再去除阻塞器 finalizer
    17		controllerutil.RemoveFinalizer(user, myFinalizer)
    18		// update to delete finalizer
    19		if err := r.Update(ctx, user); err != nil {
    20			return nil
    21		}
    22		return nil
    23	}
    

利用 status 提升可維護性

區分一個程序員水平的一個方法,是看他的代碼非功能性指標如何,如可維護性高不高,出現問題定位問題快不快。我們這個 operator 說實話還是蠻複雜的,出現問題沒有經驗的人還真的不好定位。我們需要一種便捷的手段,一條命令就可以定位大部分問題。對於用戶來說,熟悉的可能只有 kubectl get dbu 的命令,我們可不可以將創建用戶過程中的報錯放到這個結果裏面呢?答案是可以。

文檔在這裏。https://book-v1.book.kubebuilder.io/basics/status_subresource.html,但是不如直接看代碼:

  1. 在 xxxxtypes.go 中增加 status 當然你也可以按照官方的文檔去增加生成代碼的註解。

     1//+kubebuilder:object:root=true
     2//+kubebuilder:subresource:status
     3
     4// DatabaseUser is the Schema for the DatabaseUsers API
     5type DatabaseUser struct {
     6   metav1.TypeMeta   `json:",inline"`
     7   metav1.ObjectMeta `json:"metadata"`
     8
     9   Spec   DatabaseUserSpec   `json:"spec,omitempty"`
    10   Status DatabaseUserStatus `json:"status,omitempty"` // 新增部分
    11}
    12// DatabaseUserStatus defines the observed state of DatabaseUser
    13type DatabaseUserStatus struct {
    14   Conditions     []DatabaseUserCondition `json:"conditions,omitempty"`
    15   // 其它想放的字段,省略
    16}
    17// DatabaseUserConditionType custom type
    18type DatabaseUserConditionType string
    19// DatabaseUserCondition v3 condition
    20type DatabaseUserCondition struct {
    21	  // Type of user condition.
    22	  Type DatabaseUserConditionType `json:"type,omitempty"`
    23	  // Status of the condition, one of True, False, Unknown.
    24	  Status corev1.ConditionStatus `json:"status,omitempty"`
    25	  // The last time this condition was updated.
    26	  LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"`
    27	  // Last time the condition transitioned from one status to another.
    28	  LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
    29	  // The reason for the condition's last transition.
    30	  Reason string `json:"reason,omitempty"`
    31	  // A human readable message indicating details about the transition.
    32	  Message string `json:"message,omitempty"`
    33}
    34
    35// UpdateStatusCondition update status condition
    36func (u *DatabaseUser) UpdateStatusCondition(condType DatabaseUserConditionType,
    37	   status corev1.ConditionStatus, reason, message string) (cond *DatabaseUserCondition, changed bool) {
    38	   t := metav1.NewTime(time.Now())
    39	   existedCondition, exists := u.ConditionExists(condType)
    40	   if !exists {
    41	       newCondition := DatabaseUserCondition{
    42	      	   Type: condType, Status: status, Reason: reason, Message: message,
    43	      	   LastTransitionTime: t, LastUpdateTime: t,
    44	       }
    45	   	   u.Status.Conditions = append(u.Status.Conditions, newCondition)
    46
    47	   	   return &newCondition, true
    48	   }
    49
    50	   if status != existedCondition.Status {
    51	   	   existedCondition.LastTransitionTime = t
    52	   	   changed = true
    53	   }
    54
    55	   if message != existedCondition.Message || reason != existedCondition.Reason {
    56	   	   existedCondition.LastUpdateTime = t
    57	   	   changed = true
    58	   }
    59
    60	   existedCondition.Status = status
    61	   existedCondition.Message = message
    62	   existedCondition.Reason = reason
    63
    64	   return existedCondition, changed
    65}
    

    加的代碼有點多,類爆炸了,但是是值得的。

  2. 在成功或失敗的地方(DatabaseUserController中),調用 UpdateStatusCondition

     1user.UpdateStatusCondition(
     2	   "Ready", corev1.ConditionTrue,
     3	   "Provision Succeeded", "The user provisioning has succeeded.",
     4)      
     5if *err != nil {
     6	   user.UpdateStatusCondition(
     7	   	"NotReady", corev1.ConditionFalse,
     8	   	"Provision Failed", fmt.Sprintf("The user provisioning has failed: %s", *err),
     9	   )
    10}
    

    最後在 Reconcile 方法中刷回狀態即可。

    1func (r *DatabaseUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    2    // ... 省略
    3    reconcileError := r.do(ctx, &DatabaseUser)
    4    updateStatusError := r.updateStatus(ctx, &DatabaseUser, oldStatus) // 刷回狀態到集羣
    5    return againOrDone(&DatabaseUser, reconcileError, updateStatusError)
    6    // ... 省略
    

    如此,只有你在 controller 的全階段,將 err 信息寫入到 cr 中,用戶就可以使用 kubectl describe dbu 來看到這些報錯信息,而不用麻煩你了。

好了,以上就是一些實現一個 Operator 的步驟了。這些內容大部分靠回憶,代碼部分都是網上找的僞代碼,還有很多實戰中的坑需要注意,但是限於時間太久,已經想不起來了,以後想起來再補充吧。大家有什麼想法可以在評論區交流哦!