5.3. 创建基于 Go 的 Operator

Operator SDK 中的 Go 编程语言支持可以利用 Operator SDK 中的 Go 编程语言支持,为 Memcached 构建基于 Go 的 Operator 示例、分布式键值存储并管理其生命周期。

注意

Kubebuilder 作为基于 Go 的 Operator 的构建解决方案嵌入 Operator SDK 中。

5.3.1. 使用 Operator SDK 创建基于 Go 的 Operator

Operator SDK 可简化 Kubernetes 原生应用程序的构建,构建该应用程序原本需要深入掌握特定于应用程序的操作知识。SDK 不仅降低了这一障碍,而且有助于减少许多常见管理功能(如计量或监控)所需的样板代码量。

本流程介绍了使用 SDK 提供的工具和库创建简单 Memcached Operator 的示例。

先决条件

  • 开发工作站上安装 operator SDK v0.19.4 CLI
  • 基于 Kubernetes 的集群(v1.8 或更高版本,支持 apps/v1beta2 API 组,如 OpenShift Container Platform 4.6)上已安装 operator Lifecycle Manager(OLM)
  • 使用具有 cluster-admin 权限的账户访问该集群
  • 已安装 OpenShift CLI(oc)v4.6+

流程

  1. 创建 Operator 项目:

    1. 为项目创建一个目录:

      $ mkdir -p $HOME/projects/memcached-operator
    2. 进入该目录:

      $ cd $HOME/projects/memcached-operator
    3. 激活对 Go 模块的支持:

      $ export GO111MODULE=on
    4. 运行 operator-sdk init 命令以初始化项目:

      $ operator-sdk init \
          --domain=example.com \
          --repo=github.com/example-inc/memcached-operator
      注意

      operator-sdk init 命令默认使用 go.kubebuilder.io/v2 插件。

  2. 更新 Operator 以使用支持的镜像:

    1. 在项目根级别 Dockerfile 中,更改默认运行程序镜像引用:

      FROM gcr.io/distroless/static:nonroot

      改为:

      FROM registry.access.redhat.com/ubi8/ubi-minimal:latest
    2. 根据 Go 项目版本,您的 Dockerfile 可能包含 USER 65532:65532USER nonroot:nonroot 指令。在这两种情况下,删除该行,因为受支持的 runner 镜像不需要该行。
    3. config/default/manager_auth_proxy_patch.yaml 文件中,修改 image 值:

      gcr.io/kubebuilder/kube-rbac-proxy:<tag>

      使用支持的镜像:

      registry.redhat.io/openshift4/ose-kube-rbac-proxy:v4.6
  3. 通过替换以下行,更新 Makefile 中的 test 目标,以安装后续构建过程中所需的依赖项:

    例 5.1. 现有 test 目标

    test: generate fmt vet manifests
            go test ./... -coverprofile cover.out

    使用以下行:

    例 5.2. 更新了 test 目标

    ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
    test: manifests generate fmt vet ## Run tests.
    	mkdir -p ${ENVTEST_ASSETS_DIR}
    	test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh
    	source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./... -coverprofile cover.out
  4. 创建自定义资源定义(CRD)API 和控制器:

    1. 运行以下命令创建带有组 cache、版本v1 和种类 Memcached 的 API:

      $ operator-sdk create api \
          --group=cache \
          --version=v1 \
          --kind=Memcached
    2. 提示时,输入 y 来创建资源和控制器:

      Create Resource [y/n]
      y
      Create Controller [y/n]
      y

      输出示例

      Writing scaffold for you to edit...
      api/v1/memcached_types.go
      controllers/memcached_controller.go
      ...

      此过程在 api/v1/memcached_types.gocontrollers/memcached_controller.go 上生成 Memcached 资源 API。

    3. 修改 api/v1/memcached_types.go 中的 Go 类型定义,使其具有以下 specstatus

      // MemcachedSpec defines the desired state of Memcached
      type MemcachedSpec struct {
      	// +kubebuilder:validation:Minimum=0
      	// Size is the size of the memcached deployment
      	Size int32 `json:"size"`
      }
      
      // MemcachedStatus defines the observed state of Memcached
      type MemcachedStatus struct {
      	// Nodes are the names of the memcached pods
      	Nodes []string `json:"nodes"`
      }
    4. 添加 +kubebuilder:subresource:status marker,将 status 子资源添加到 CRD 清单中:

      // Memcached is the Schema for the memcacheds API
      // +kubebuilder:subresource:status 1
      type Memcached struct {
      	metav1.TypeMeta   `json:",inline"`
      	metav1.ObjectMeta `json:"metadata,omitempty"`
      
      	Spec   MemcachedSpec   `json:"spec,omitempty"`
      	Status MemcachedStatus `json:"status,omitempty"`
      }
      1
      添加这一行。

      这可让控制器在不更改剩余的 CR 对象的情况下更新 CR 状态。

    5. 为资源类型更新生成的代码:

      $ make generate
      提示

      在修改了 *_types.go 文件后,您必须运行 make generate 命令来更新该资源类型生成的代码。

      以上 Makefile 目标调用 controller-gen 程序来更新 api/v1/zz_generated.deepcopy.go 文件。这样可确保您的 API Go 类型定义实现了 runtime.Object 接口,所有 Kind 类型都必须实现。

  5. 生成和更新 CRD 清单:

    $ make manifests

    此 Makefile 目标调用 controller-gen 实用程序在 config/crd/bases/cache.example.com_memcacheds.yaml 文件中生成 CRD 清单。

    1. 可选:将自定义验证添加到 CRD 中。

      当生成清单时,OpenAPI v3.0 模式会添加到 spec.validation 块中的 CRD 清单中。此验证块允许 Kubernetes 在 Memcached 自定义资源(CR)创建或更新时验证其中的属性。

      作为 Operator 作者,您可以使用类似于注解的单行注释(称为 Kubebuilder markers )来配置 API 的自定义验证。这些标记必须始终具有 +kubebuilder:validation 前缀。例如,可以通过添加以下标记来添加一个 enum-type 规格:

      // +kubebuilder:validation:Enum=Lion;Wolf;Dragon
      type Alias string

      API 代码中的标记用法会在 Kubebuilder 生成 CRD用于配置/代码生成的标记文档中讨论。Kubebuilder CRD 验证文档还提供了 OpenAPIv3 验证标记的完整列表。

      如果添加任何自定义验证,请运行以下命令来更新 CRD 的 OpenAPI 验证部分:

      $ make manifests
  6. 在创建新 API 和控制器后,您可以实现控制器逻辑。在本例中,将生成的控制器文件 controllers/memcached_controller.go 替换为以下示例实现:

    例 5.3. memcached_controller.go示例

    /*
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
    
        http://www.apache.org/licenses/LICENSE-2.0
    
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    */
    
    package controllers
    
    import (
    	"context"
    	"reflect"
    
    	"github.com/go-logr/logr"
    	appsv1 "k8s.io/api/apps/v1"
    	corev1 "k8s.io/api/core/v1"
    	"k8s.io/apimachinery/pkg/api/errors"
    	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    	"k8s.io/apimachinery/pkg/runtime"
    	"k8s.io/apimachinery/pkg/types"
    	ctrl "sigs.k8s.io/controller-runtime"
    	"sigs.k8s.io/controller-runtime/pkg/client"
    
    	cachev1 "github.com/example-inc/memcached-operator/api/v1"
    )
    
    // MemcachedReconciler reconciles a Memcached object
    type MemcachedReconciler struct {
    	client.Client
    	Log    logr.Logger
    	Scheme *runtime.Scheme
    }
    
    // +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete
    // +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/status,verbs=get;update;patch
    // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
    // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;
    
    func (r *MemcachedReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    	ctx := context.Background()
    	log := r.Log.WithValues("memcached", req.NamespacedName)
    
    	// Fetch the Memcached instance
    	memcached := &cachev1.Memcached{}
    	err := r.Get(ctx, req.NamespacedName, memcached)
    	if err != nil {
    		if errors.IsNotFound(err) {
    			// Request object not found, could have been deleted after reconcile request.
    			// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
    			// Return and don't requeue
    			log.Info("Memcached resource not found. Ignoring since object must be deleted")
    			return ctrl.Result{}, nil
    		}
    		// Error reading the object - requeue the request.
    		log.Error(err, "Failed to get Memcached")
    		return ctrl.Result{}, err
    	}
    
    	// Check if the deployment already exists, if not create a new one
    	found := &appsv1.Deployment{}
    	err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found)
    	if err != nil && errors.IsNotFound(err) {
    		// Define a new deployment
    		dep := r.deploymentForMemcached(memcached)
    		log.Info("Creating a new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
    		err = r.Create(ctx, dep)
    		if err != nil {
    			log.Error(err, "Failed to create new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
    			return ctrl.Result{}, err
    		}
    		// Deployment created successfully - return and requeue
    		return ctrl.Result{Requeue: true}, nil
    	} else if err != nil {
    		log.Error(err, "Failed to get Deployment")
    		return ctrl.Result{}, err
    	}
    
    	// Ensure the deployment size is the same as the spec
    	size := memcached.Spec.Size
    	if *found.Spec.Replicas != size {
    		found.Spec.Replicas = &size
    		err = r.Update(ctx, found)
    		if err != nil {
    			log.Error(err, "Failed to update Deployment", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
    			return ctrl.Result{}, err
    		}
    		// Spec updated - return and requeue
    		return ctrl.Result{Requeue: true}, nil
    	}
    
    	// Update the Memcached status with the pod names
    	// List the pods for this memcached's deployment
    	podList := &corev1.PodList{}
    	listOpts := []client.ListOption{
    		client.InNamespace(memcached.Namespace),
    		client.MatchingLabels(labelsForMemcached(memcached.Name)),
    	}
    	if err = r.List(ctx, podList, listOpts...); err != nil {
    		log.Error(err, "Failed to list pods", "Memcached.Namespace", memcached.Namespace, "Memcached.Name", memcached.Name)
    		return ctrl.Result{}, err
    	}
    	podNames := getPodNames(podList.Items)
    
    	// Update status.Nodes if needed
    	if !reflect.DeepEqual(podNames, memcached.Status.Nodes) {
    		memcached.Status.Nodes = podNames
    		err := r.Status().Update(ctx, memcached)
    		if err != nil {
    			log.Error(err, "Failed to update Memcached status")
    			return ctrl.Result{}, err
    		}
    	}
    
    	return ctrl.Result{}, nil
    }
    
    // deploymentForMemcached returns a memcached Deployment object
    func (r *MemcachedReconciler) deploymentForMemcached(m *cachev1.Memcached) *appsv1.Deployment {
    	ls := labelsForMemcached(m.Name)
    	replicas := m.Spec.Size
    
    	dep := &appsv1.Deployment{
    		ObjectMeta: metav1.ObjectMeta{
    			Name:      m.Name,
    			Namespace: m.Namespace,
    		},
    		Spec: appsv1.DeploymentSpec{
    			Replicas: &replicas,
    			Selector: &metav1.LabelSelector{
    				MatchLabels: ls,
    			},
    			Template: corev1.PodTemplateSpec{
    				ObjectMeta: metav1.ObjectMeta{
    					Labels: ls,
    				},
    				Spec: corev1.PodSpec{
    					Containers: []corev1.Container{{
    						Image:   "memcached:1.4.36-alpine",
    						Name:    "memcached",
    						Command: []string{"memcached", "-m=64", "-o", "modern", "-v"},
    						Ports: []corev1.ContainerPort{{
    							ContainerPort: 11211,
    							Name:          "memcached",
    						}},
    					}},
    				},
    			},
    		},
    	}
    	// Set Memcached instance as the owner and controller
    	ctrl.SetControllerReference(m, dep, r.Scheme)
    	return dep
    }
    
    // labelsForMemcached returns the labels for selecting the resources
    // belonging to the given memcached CR name.
    func labelsForMemcached(name string) map[string]string {
    	return map[string]string{"app": "memcached", "memcached_cr": name}
    }
    
    // getPodNames returns the pod names of the array of pods passed in
    func getPodNames(pods []corev1.Pod) []string {
    	var podNames []string
    	for _, pod := range pods {
    		podNames = append(podNames, pod.Name)
    	}
    	return podNames
    }
    
    func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
    	return ctrl.NewControllerManagedBy(mgr).
    		For(&cachev1.Memcached{}).
    		Owns(&appsv1.Deployment{}).
    		Complete(r)
    }

    示例控制器为每个 Memcached CR 运行以下协调逻辑:

    • 如果尚无 Memcached 部署,创建一个。
    • 确保部署大小与 Memcached CR spec 指定的大小相同。
    • 使用 memcached Pod 的名称更新 Memcached CR 状态。

    接下来的两个子步骤检查控制器如何监视资源以及协调循环的触发方式。您可跳过这些步骤,直接构建和运行 Operator。

    1. 检查 controllers/memcached_controller.go 文件中的控制器实施,以查看控制器如何监视资源。

      SetupWithManager() 函数指定如何构建控制器来监控 CR 以及由该控制器拥有和管理的其他资源:

      例 5.4. SetupWithManager() 功能

      import (
      	...
      	appsv1 "k8s.io/api/apps/v1"
      	...
      )
      
      func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
      	return ctrl.NewControllerManagedBy(mgr).
      		For(&cachev1.Memcached{}).
      		Owns(&appsv1.Deployment{}).
      		Complete(r)
      }

      NewControllerManagedBy() 提供了一个控制器构建器,它允许各种控制器配置。

      for(&cachev1.Memcached{})Memcached 类型指定为要监视的主要资源。对于 Memcached 类型的每个 Add、Update 或 Delete 事件,协调循环都会为该 Memcached 对象发送一个协调 Request 参数,其中包括命名空间和名称键。

      owns(&appsv1.Deployment{})Deployment 类型指定为要监视的辅助资源。对于 Deployment 类型的每个 Add、Update 或 Delete 事件,事件处理程序会将每个事件映射到部署所有者的协调请求。在本例中,所有者是创建部署的 Memcached 对象。

    2. 每个控制器都有一个协调器对象,它带有实现了协调循环的 Reconcile() 方法。协调循环通过 Request 参数传递,该参数是从缓存中查找主资源对象 Memcached 的命名空间和名称键:

      例 5.5. 协调循环

      import (
      	ctrl "sigs.k8s.io/controller-runtime"
      
      	cachev1 "github.com/example-inc/memcached-operator/api/v1"
      	...
      )
      
      func (r *MemcachedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
        // Lookup the Memcached instance for this reconcile request
        memcached := &cachev1.Memcached{}
        err := r.Get(ctx, req.NamespacedName, memcached)
        ...
      }

      根据 Reconcile() 函数的返回值,协调 Request 可能会重新排队,且可能会再次触发循环:

      例 5.6. 重排队列的逻辑

      // Reconcile successful - don't requeue
      return reconcile.Result{}, nil
      // Reconcile failed due to error - requeue
      return reconcile.Result{}, err
      // Requeue for any reason other than error
      return reconcile.Result{Requeue: true}, nil

      您可以将 Result.RequeueAfter 设置为在宽限期后重排队列请求:

      例 5.7. 宽限期后重排队列

      import "time"
      
      // Reconcile for any reason other than an error after 5 seconds
      return ctrl.Result{RequeueAfter: time.Second*5}, nil
      注意

      您可以返回带有 RequeueAfter 设置的 Result 来定期协调一个 CR。

      有关协调器、客户端并与资源事件交互的更多信息,请参阅 Controller Runtime Client API 文档

其它资源

  • 如需有关 CRD 中的 OpenAPI v3.0 验证模式的更多信息,请参阅 Kubernetes 文档