5.3. Go ベースの Operator の作成

Operator 開発者は、Operator SDK での Go プログラミング言語のサポートを利用して、Go ベースの Memcached Operator のサンプルをビルドして、分散キー/値のストアを作成し、そのライフサイクルを管理することができます。

注記

Kubebuilder は Go ベース Operator のスキャフォールディングソリューションとして、Operator SDK に組み込まれます。

5.3.1. Operator SDK を使用した Go ベース Operator の作成

Operator SDK は、詳細なアプリケーション固有の運用上の知識を必要とする可能性のあるプロセスである、Kubernetes ネイティブアプリケーションのビルドを容易にします。SDK はこの障壁を低くするだけでなく、メータリングやモニターリングなどの数多くの一般的な管理機能に必要なスケルトンコードの量を減らします。

この手順では、SDK によって提供されるツールおよびライブラリーを使用して単純な Memcached Operator を作成する例を示します。

前提条件

  • 開発ワークステーションにインストールされる Operator SDK v0.19.4 CLI
  • OpenShift Container Platform 4.6 などの、Kubernetes ベースのクラスター (v1.8 以上の apps/v1beta2 API グループをサポートするもの) にインストールされる Operator Lifecycle Manager (OLM)
  • cluster-admin パーミッションのあるアカウントを使用したクラスターへのアクセス
  • OpenShift CLI (oc) v4.6+ (インストール済み)

手順

  1. Operator プロジェクトを作成します。

    1. プロジェクトのディレクトリーを作成します。

      $ mkdir -p $HOME/projects/memcached-operator
    2. ディレクトリーに切り替えます。

      $ cd $HOME/projects/memcached-operator
    3. Go モジュールのサポートをアクティブにします。

      $ export GO111MODULE=on
    4. operator-sdk init コマンドを実行してプロジェクトを初期化します。

      $ operator-sdk init \
          --domain=example.com \
          --repo=github.com/example-inc/memcached-operator
      注記

      operator-sdk init コマンドは、デフォルトで go.kubebuilder.io/v2 プラグインを使用します。

  2. サポートされるイメージを使用するよう Operator を更新します。

    1. プロジェクトのルートレベルの Dockerfile で、デフォルトのランナーイメージの参照フォームを変更します。

      FROM gcr.io/distroless/static:nonroot

      以下のように変更してください。

      FROM registry.access.redhat.com/ubi8/ubi-minimal:latest
    2. Go プロジェクトのバージョンによっては、 Dockerfile に USER 65532:65532 または USER nonroot:nonroot ディレクティブが含まれる可能性があります。いずれの場合も、サポートされるランナーイメージでは必要ないため、その行を削除します。
    3. config/default/manager_auth_proxy_patch.yaml ファイルで、以下の image の値を変更します。

      gcr.io/kubebuilder/kube-rbac-proxy:<tag>

      サポートされるイメージを使用するには、以下へ変更します。

      registry.redhat.io/openshift4/ose-kube-rbac-proxy:v4.6
  3. 以下の行を置き換えて、Makefile の test ターゲットを更新し、後続のビルドで必要な依存関係をインストールできるようにします。

    例5.1 既存の test ターゲット

    test: generate fmt vet manifests
            go test ./... -coverprofile cover.out

    以下の行を使用します。

    例5.2 更新された test ターゲット

    ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
    test: manifests generate fmt vet ## Run tests.
    	mkdir -p ${ENVTEST_ASSETS_DIR}
    	test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh
    	source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./... -coverprofile cover.out
  4. カスタムリソース定義 (CRD) API およびコントローラーを作成します。

    1. 以下のコマンドを実行して、グループ cache、バージョン v1、および種類 Memcached を指定して API を作成します。

      $ operator-sdk create api \
          --group=cache \
          --version=v1 \
          --kind=Memcached
    2. プロンプトが表示されたら y を入力し、リソースとコントローラーの両方を作成します。

      Create Resource [y/n]
      y
      Create Controller [y/n]
      y

      出力例

      Writing scaffold for you to edit...
      api/v1/memcached_types.go
      controllers/memcached_controller.go
      ...

      このプロセスでは、api/v1/memcached_types.go で Memcached リソース API が生成され、controllers/memcached_controller.go でコントローラーが生成されます。

    3. api/v1/memcached_types.go で Go タイプの定義を変更し、以下の spec および status を追加します。

      // MemcachedSpec defines the desired state of Memcached
      type MemcachedSpec struct {
      	// +kubebuilder:validation:Minimum=0
      	// Size is the size of the memcached deployment
      	Size int32 `json:"size"`
      }
      
      // MemcachedStatus defines the observed state of Memcached
      type MemcachedStatus struct {
      	// Nodes are the names of the memcached pods
      	Nodes []string `json:"nodes"`
      }
    4. +kubebuilder:subresource:status マーカーを追加し、status サブリソースを CRD マニフェストに追加します。

      // Memcached is the Schema for the memcacheds API
      // +kubebuilder:subresource:status 1
      type Memcached struct {
      	metav1.TypeMeta   `json:",inline"`
      	metav1.ObjectMeta `json:"metadata,omitempty"`
      
      	Spec   MemcachedSpec   `json:"spec,omitempty"`
      	Status MemcachedStatus `json:"status,omitempty"`
      }
      1
      この行を追加します。

      これにより、コントローラーは残りの CR オブジェクトを変更せずに CR ステータスを更新できます。

    5. リソースタイプ用に生成されたコードを更新します。

      $ make generate
      ヒント

      *_types.go ファイルの変更後は、make generate コマンドを実行し、該当するリソースタイプ用に生成されたコードを更新する必要があります。

      上記の Makefile ターゲットは controller-gen ユーティリティーを呼び出して、api/v1/zz_generated.deepcopy.go ファイルを更新します。これにより、API Go タイプの定義は、すべての Kind タイプが実装する必要のある runtime.Object インターフェイスが実装されます。

  5. CRD マニフェストを生成して更新します。

    $ make manifests

    この Makefile ターゲットは controller-gen ユーティリティーを呼び出し、config/crd/bases/cache.example.com_memcacheds.yaml ファイルに CRD マニフェストを生成します。

    1. オプション: カスタム検証を CRD に追加します。

      OpenAPI v3.0 スキーマは、マニフェストの生成時に spec.validation ブロックの CRD マニフェストに追加されます。この検証ブロックにより、Kubernetes の作成または更新時に Memcached カスタムリソース (CR) のプロパティーを検証できます。

      Operator の作成者は Kubebuilder markers と呼ばれるアノテーションのような、単一行のコメントを使用して API のカスタム検証を設定できます。これらのマーカーには、+kubebuilder:validation 接頭辞が常に必要です。たとえば、以下のマーカーを追加して enum 型の仕様を追加できます。

      // +kubebuilder:validation:Enum=Lion;Wolf;Dragon
      type Alias string

      API コードのマーカーの使用については、Kubebuilder ドキュメントの Generating CRDs および Markers for Config/Code Generation を参照してください。OpenAPIv3 検証マーカーの詳細の一覧については、Kubebuilder ドキュメントの CRD Validation を参照してください。

      カスタム検証を追加する場合は、以下のコマンドを実行し、CRD の OpenAPI 検証セクションを更新します。

      $ make manifests
  6. 新規 API およびコントローラーの作成後に、コントローラーロジックを実装することができます。この例では、生成されたコントローラーファイル controllers/memcached_controller.go を以下の実装例に置き換えます。

    例5.3 memcached_controller.go の例

    /*
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
    
        http://www.apache.org/licenses/LICENSE-2.0
    
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    */
    
    package controllers
    
    import (
    	"context"
    	"reflect"
    
    	"github.com/go-logr/logr"
    	appsv1 "k8s.io/api/apps/v1"
    	corev1 "k8s.io/api/core/v1"
    	"k8s.io/apimachinery/pkg/api/errors"
    	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    	"k8s.io/apimachinery/pkg/runtime"
    	"k8s.io/apimachinery/pkg/types"
    	ctrl "sigs.k8s.io/controller-runtime"
    	"sigs.k8s.io/controller-runtime/pkg/client"
    
    	cachev1 "github.com/example-inc/memcached-operator/api/v1"
    )
    
    // MemcachedReconciler reconciles a Memcached object
    type MemcachedReconciler struct {
    	client.Client
    	Log    logr.Logger
    	Scheme *runtime.Scheme
    }
    
    // +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete
    // +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/status,verbs=get;update;patch
    // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
    // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;
    
    func (r *MemcachedReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    	ctx := context.Background()
    	log := r.Log.WithValues("memcached", req.NamespacedName)
    
    	// Fetch the Memcached instance
    	memcached := &cachev1.Memcached{}
    	err := r.Get(ctx, req.NamespacedName, memcached)
    	if err != nil {
    		if errors.IsNotFound(err) {
    			// Request object not found, could have been deleted after reconcile request.
    			// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
    			// Return and don't requeue
    			log.Info("Memcached resource not found. Ignoring since object must be deleted")
    			return ctrl.Result{}, nil
    		}
    		// Error reading the object - requeue the request.
    		log.Error(err, "Failed to get Memcached")
    		return ctrl.Result{}, err
    	}
    
    	// Check if the deployment already exists, if not create a new one
    	found := &appsv1.Deployment{}
    	err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found)
    	if err != nil && errors.IsNotFound(err) {
    		// Define a new deployment
    		dep := r.deploymentForMemcached(memcached)
    		log.Info("Creating a new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
    		err = r.Create(ctx, dep)
    		if err != nil {
    			log.Error(err, "Failed to create new Deployment", "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
    			return ctrl.Result{}, err
    		}
    		// Deployment created successfully - return and requeue
    		return ctrl.Result{Requeue: true}, nil
    	} else if err != nil {
    		log.Error(err, "Failed to get Deployment")
    		return ctrl.Result{}, err
    	}
    
    	// Ensure the deployment size is the same as the spec
    	size := memcached.Spec.Size
    	if *found.Spec.Replicas != size {
    		found.Spec.Replicas = &size
    		err = r.Update(ctx, found)
    		if err != nil {
    			log.Error(err, "Failed to update Deployment", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
    			return ctrl.Result{}, err
    		}
    		// Spec updated - return and requeue
    		return ctrl.Result{Requeue: true}, nil
    	}
    
    	// Update the Memcached status with the pod names
    	// List the pods for this memcached's deployment
    	podList := &corev1.PodList{}
    	listOpts := []client.ListOption{
    		client.InNamespace(memcached.Namespace),
    		client.MatchingLabels(labelsForMemcached(memcached.Name)),
    	}
    	if err = r.List(ctx, podList, listOpts...); err != nil {
    		log.Error(err, "Failed to list pods", "Memcached.Namespace", memcached.Namespace, "Memcached.Name", memcached.Name)
    		return ctrl.Result{}, err
    	}
    	podNames := getPodNames(podList.Items)
    
    	// Update status.Nodes if needed
    	if !reflect.DeepEqual(podNames, memcached.Status.Nodes) {
    		memcached.Status.Nodes = podNames
    		err := r.Status().Update(ctx, memcached)
    		if err != nil {
    			log.Error(err, "Failed to update Memcached status")
    			return ctrl.Result{}, err
    		}
    	}
    
    	return ctrl.Result{}, nil
    }
    
    // deploymentForMemcached returns a memcached Deployment object
    func (r *MemcachedReconciler) deploymentForMemcached(m *cachev1.Memcached) *appsv1.Deployment {
    	ls := labelsForMemcached(m.Name)
    	replicas := m.Spec.Size
    
    	dep := &appsv1.Deployment{
    		ObjectMeta: metav1.ObjectMeta{
    			Name:      m.Name,
    			Namespace: m.Namespace,
    		},
    		Spec: appsv1.DeploymentSpec{
    			Replicas: &replicas,
    			Selector: &metav1.LabelSelector{
    				MatchLabels: ls,
    			},
    			Template: corev1.PodTemplateSpec{
    				ObjectMeta: metav1.ObjectMeta{
    					Labels: ls,
    				},
    				Spec: corev1.PodSpec{
    					Containers: []corev1.Container{{
    						Image:   "memcached:1.4.36-alpine",
    						Name:    "memcached",
    						Command: []string{"memcached", "-m=64", "-o", "modern", "-v"},
    						Ports: []corev1.ContainerPort{{
    							ContainerPort: 11211,
    							Name:          "memcached",
    						}},
    					}},
    				},
    			},
    		},
    	}
    	// Set Memcached instance as the owner and controller
    	ctrl.SetControllerReference(m, dep, r.Scheme)
    	return dep
    }
    
    // labelsForMemcached returns the labels for selecting the resources
    // belonging to the given memcached CR name.
    func labelsForMemcached(name string) map[string]string {
    	return map[string]string{"app": "memcached", "memcached_cr": name}
    }
    
    // getPodNames returns the pod names of the array of pods passed in
    func getPodNames(pods []corev1.Pod) []string {
    	var podNames []string
    	for _, pod := range pods {
    		podNames = append(podNames, pod.Name)
    	}
    	return podNames
    }
    
    func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
    	return ctrl.NewControllerManagedBy(mgr).
    		For(&cachev1.Memcached{}).
    		Owns(&appsv1.Deployment{}).
    		Complete(r)
    }

    コントローラーのサンプルは、それぞれの Memcached CR について以下の調整 (reconciliation) ロジックを実行します。

    • Memcached デプロイメントを作成します (ない場合)。
    • デプロイメントのサイズが、Memcached CR 仕様で指定されたものと同じであることを確認します。
    • Memcached CR ステータスを memcached Pod の名前に置き換えます。

    次の 2 つのサブステップでは、コントローラーがリソースを監視する方法および調整ループがトリガーされる方法を検査します。これらの手順を省略し、直接 Operator のビルドおよび実行に進むことができます。

    1. controllers/memcached_controller.go ファイルでコントローラーの実装を検査し、コントローラーのリソースの監視方法を確認します。

      SetupWithManager() 関数は、CR およびコントローラーによって所有され、管理される他のリソースを監視するようにコントローラーがビルドされる方法を指定します。

      例5.4 SetupWithManager() 関数

      import (
      	...
      	appsv1 "k8s.io/api/apps/v1"
      	...
      )
      
      func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
      	return ctrl.NewControllerManagedBy(mgr).
      		For(&cachev1.Memcached{}).
      		Owns(&appsv1.Deployment{}).
      		Complete(r)
      }

      NewControllerManagedBy() は、さまざまなコントローラー設定を可能にするコントローラービルダーを提供します。

      For(&cachev1.Memcached{}) は、監視するプライマリーリソースとして Memcached タイプを指定します。Memcached タイプのそれぞれの Add、Update、または Delete イベントの場合、reconcile ループに Memcached オブジェクトの (namespace および name キーから成る) reconcile Request 引数が送られます。

      Owns(&appsv1.Deployment{}) は、監視するセカンダリーリソースとして Deployment タイプを指定します。Add、Update、または Delete イベントの各 Deployment タイプの場合、イベントハンドラーは各イベントを、デプロイメントのオーナーの reconcile request にマップします。この場合、デプロイメントが作成された Memcached オブジェクトがオーナーです。

    2. すべてのコントローラーには、reconcile ループを実装する Reconcile() メソッドのある reconciler オブジェクトがあります。この reconcile ループには、キャッシュからプライマリーリソースオブジェクトの Memcached を検索するために使用される namespace および name キーである Request 引数が渡されます。

      例5.5 reconcile ループ

      import (
      	ctrl "sigs.k8s.io/controller-runtime"
      
      	cachev1 "github.com/example-inc/memcached-operator/api/v1"
      	...
      )
      
      func (r *MemcachedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
        // Lookup the Memcached instance for this reconcile request
        memcached := &cachev1.Memcached{}
        err := r.Get(ctx, req.NamespacedName, memcached)
        ...
      }

      Reconcile() 関数の返り値に応じて、reconcile Request は再度キューに入れられ、ループが再びトリガーされる可能性があります。

      例5.6 再キューロジック

      // Reconcile successful - don't requeue
      return reconcile.Result{}, nil
      // Reconcile failed due to error - requeue
      return reconcile.Result{}, err
      // Requeue for any reason other than error
      return reconcile.Result{Requeue: true}, nil

      Result.RequeueAfter を設定して、猶予期間後に要求を再びキューに入れることができます。

      例5.7 猶予期間後の再キュー

      import "time"
      
      // Reconcile for any reason other than an error after 5 seconds
      return ctrl.Result{RequeueAfter: time.Second*5}, nil
      注記

      RequeueAfter を定期的な CR の調整に設定している Result を返すことができます。

      reconciler、クライアント、およびリソースイベントとの対話に関する詳細は、Controller Runtime Client API のドキュメントを参照してください。

追加リソース