5.10. カスタムイベントソース

Knative に含まれていないイベントプロデューサーや、CloudEvent 形式ではないイベントを生成するプロデューサーからイベントを Ingress する必要がある場合は、カスタムイベントソースを使用してこれを実行できます。カスタムイベントソースは、次のいずれかの方法で作成できます。

  • シンクバインディングを作成して、PodSpecable オブジェクトをイベントソースとして使用します。
  • コンテナーソースを作成して、コンテナーをイベントソースとして使用します。

5.10.1. シンクバインディング

SinkBinding オブジェクトは、イベント生成を配信アドレス指定から切り離すことをサポートします。シンクバインディングは、イベントプロデューサー をイベントコンシューマーまたは シンク に接続するために使用されます。イベントプロデューサーは、PodSpec テンプレートを組み込む Kubernetes リソースであり、イベントを生成します。シンクは、イベントを受信できるアドレス指定可能な Kubernetes オブジェクトです。

SinkBinding オブジェクトは、環境変数をシンクの PodTemplateSpec に挿入します。つまり、アプリケーションコードが Kubernetes API と直接対話してイベントの宛先を見つける必要はありません。これらの環境変数は以下のとおりです。

K_SINK
解決されたシンクの URL。
K_CE_OVERRIDES
アウトバウンドイベントの上書きを指定する JSON オブジェクト。

5.10.1.1. YAML を使用したシンクバインディングの作成

YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でイベントソースを宣言的に記述することができます。YAML を使用してシンクバインディングを作成するには、SinkBinding オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
  • OpenShift CLI (oc) をインストールしている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。

    1. サービス YAML ファイルを作成します。

      サービス YAML ファイルの例

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest

    2. サービスを作成します。

      $ oc apply -f <filename>
  2. イベントをサービスに転送するシンクバインディングインスタンスを作成します。

    1. シンクバインディング YAML ファイルを作成します。

      サービス YAML ファイルの例

      apiVersion: sources.knative.dev/v1alpha1
      kind: SinkBinding
      metadata:
        name: bind-heartbeat
      spec:
        subject:
          apiVersion: batch/v1
          kind: Job 1
          selector:
            matchLabels:
              app: heartbeat-cron
      
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display

      1
      この例では、ラベル app: heartbeat-cron を指定したジョブがイベントシンクにバインドされます。
    2. シンクバインディングを作成します。

      $ oc apply -f <filename>
  3. CronJob オブジェクトを作成します。

    1. cron ジョブの YAML ファイルを作成します。

      cron ジョブの YAML ファイルの例

      apiVersion: batch/v1beta1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      重要

      シンクバインディングを使用するには、bindings.knative.dev/include=true ラベルを Knative リソースに手動で追加する必要があります。

      たとえば、このラベルを CronJob インスタンスに追加するには、以下の行を Job リソースの YAML 定義に追加します。

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. cron ジョブを作成します。

      $ oc apply -f <filename>
  4. 以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。

    $ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml

    出力例

    spec:
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default
      subject:
        apiVersion: batch/v1
        kind: Job
        namespace: default
        selector:
          matchLabels:
            app: heartbeat-cron

検証

メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。

  1. コマンドを入力します。

    $ oc get pods
  2. コマンドを入力します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

5.10.1.2. Knative CLI を使用したシンクバインディングの作成

kn source binding create コマンドを使用し、Knative (kn) を使用してシンクバインディングを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Knative (kn) CLI をインストールしている。
  • OpenShift CLI (oc) をインストールしている。
注記

以下の手順では、YAML ファイルを作成する必要があります。

サンプルで使用されたもので YAML ファイルの名前を変更する場合は、必ず対応する CLI コマンドを更新する必要があります。

手順

  1. シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。

    $ kn service create event-display --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  2. イベントをサービスに転送するシンクバインディングインスタンスを作成します。

    $ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
  3. CronJob オブジェクトを作成します。

    1. cron ジョブの YAML ファイルを作成します。

      cron ジョブの YAML ファイルの例

      apiVersion: batch/v1beta1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      重要

      シンクバインディングを使用するには、bindings.knative.dev/include=true ラベルを Knative CR に手動で追加する必要があります。

      たとえば、このラベルを CronJob CR に追加するには、以下の行を Job CR の YAML 定義に追加します。

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. cron ジョブを作成します。

      $ oc apply -f <filename>
  4. 以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。

    $ kn source binding describe bind-heartbeat

    出力例

    Name:         bind-heartbeat
    Namespace:    demo-2
    Annotations:  sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ...
    Age:          2m
    Subject:
      Resource:   job (batch/v1)
      Selector:
        app:      heartbeat-cron
    Sink:
      Name:       event-display
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE     AGE REASON
      ++ Ready     2m

検証

メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。

  • 以下のコマンドを入力して、メッセージダンパー機能ログを表示します。

    $ oc get pods
    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

5.10.1.2.1. Knative CLI シンクフラグ

Knative (kn) CLI を使用してイベントソースを作成する場合、--sink フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。

以下の例では、サービスの http://event-display.svc.cluster.local をシンクとして使用するシンクバインディングを作成します。

シンクフラグを使用したコマンドの例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.localsvc は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel および broker が含まれます。

5.10.1.3. Web コンソールを使用したシンクバインディングの作成

Knative Eventing がクラスターにインストールされると、Web コンソールを使用して シンクバインディングを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Container Platform Web コンソールにログインしている。
  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. シンクとして使用する Knative サービスを作成します。

    1. Developer パースペクティブで、+AddYAML に移動します。
    2. サンプル YAML をコピーします。

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    3. Create をクリックします。
  2. イベントソースとして使用される CronJob リソースを作成し、1 分ごとにイベントを送信します。

    1. Developer パースペクティブで、+AddYAML に移動します。
    2. サンプル YAML をコピーします。

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "*/1 * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: true 1
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats
                    args:
                    - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
      1
      bindings.knative.dev/include: true ラベルを含めるようにしてください。OpenShift Serverless のデフォルトの namespace 選択動作は包含モードを使用します。
    3. Create をクリックします。
  3. 直前の手順で作成したサービスと同じ namespace、またはイベントの送信先となる他のシンクと同じ namespace にシンクバインディングを作成します。

    1. Developer パースペクティブで、+AddEvent Source に移動します。Event Sources ページが表示されます。
    2. オプション: イベントソースに複数のプロバイダーがある場合は、Providers 一覧から必要なプロバイダーを選択し、プロバイダーから利用可能なイベントソースをフィルターします。
    3. Sink Binding を選択し、Create Event Source をクリックします。Create Event Source ページが表示されます。

      注記

      Form view または YAML view を使用して Sink Binding 設定を設定し、ビューを切り替えることができます。ビューの切り替え時に、データは永続化されます。

    4. apiVersion フィールドに batch/v1 を入力します。
    5. Kind フィールドに Job と入力します。

      注記

      CronJob の種類は OpenShift Serverless シンクバインディングで直接サポートされていないため、Kind フィールドは cron ジョブオブジェクト自体ではなく、cron ジョブで作成される Job オブジェクトをターゲットにする必要があります。

    6. Sink を選択します。これは Resource または URI のいずれかになります。この例では、直前の手順で作成された event-display サービスが Resources シンクとして使用されます。
    7. Match labels セクションで以下を実行します。

      1. Name フィールドに app と入力します。
      2. Value フィールドに heartbeat-cron と入力します。

        注記

        ラベルセレクターは、リソース名ではなくシンクバインディングで cron ジョブを使用する場合に必要になります。これは、cron ジョブで作成されたジョブには予測可能な名前がなく、名前に無作為に生成される文字列が含まれているためです。たとえば、hearthbeat-cron-1cc23f になります。

    8. Create をクリックします。

検証

Topology ページおよび Pod ログを表示して、シンクバインディング、シンク、および cron ジョブが正常に作成され、機能していることを確認できます。

  1. Developer パースペクティブで、Topology に移動します。
  2. シンクバインディング、シンク、およびハートビートの cron ジョブを表示します。

    View the sink binding and service in the Topology view
  3. シンクバインディングが追加されると、正常なジョブが cron ジョブによって登録されていることを確認します。つまり、シンクバインディングは cron ジョブで作成されたジョブが正常に再設定されることを意味します。
  4. event-display サービス Pod のログを参照し、ハートビート cron ジョブで生成されるイベントを表示します。

5.10.1.4. シンクバインディング参照

シンクバインディングを作成して、PodSpecable オブジェクトをイベントソースとして使用できます。SinkBinding オブジェクトを作成するときに、複数のパラメーターを設定できます。

SinkBinding オブジェクトは以下のパラメーターをサポートします。

フィールド説明必須またはオプション

apiVersion

API バージョンを指定します (例: sources.knative.dev/v1)。

必須

kind

このリソースオブジェクトを SinkBinding オブジェクトとして特定します。

必須

metadata

SinkBinding オブジェクトを一意に識別するメタデータを指定します。たとえば、name です。

必須

spec

この SinkBinding オブジェクトの設定情報を指定します。

必須

spec.sink

シンクとして使用する URI に解決するオブジェクトへの参照。

必須

spec.subject

ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。

必須

spec.ceOverrides

上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。

任意

5.10.1.4.1. Subject パラメーター

Subject パラメーターは、ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。Subject 定義に複数のフィールドを設定できます。

Subject 定義は、以下のフィールドをサポートします。

フィールド説明必須またはオプション

apiVersion

参照先の API バージョン。

必須

kind

参照先の種類。

必須

namespace

参照先の namespace。省略されている場合、デフォルトはオブジェクトの namespace に設定されます。

任意

name

参照先の名前。

selector を設定する場合は、使用しないでください。

selector

参照先のセレクター。

name を設定する場合は、使用しないでください。

selector.matchExpressions

ラベルセレクターの要件の一覧です。

matchExpressions または matchLabels のいずれかのみを使用します。

selector.matchExpressions.key

セレクターが適用されるラベルキー。

matchExpressions を使用する場合に必須です。

selector.matchExpressions.operator

キーと値のセットの関係を表します。有効な演算子は InNotInExists、および DoesNotExist です。

matchExpressions を使用する場合に必須です。

selector.matchExpressions.values

文字列値の配列。operator パラメーターの値が In または NotIn の場合、値配列が空でないようにする必要があります。operator パラメーターの値が Exists または DoesNotExist の場合、値の配列は空である必要があります。この配列は、ストラテジーに基づいたマージパッチの適用中に置き換えられます。

matchExpressions を使用する場合に必須です。

selector.matchLabels

キーと値のペアのマップ。matchLabels マップの各キーと値のペアは matchExpressions の要素と同じです。ここで、キーフィールドは matchLabels.<key> で、operatorIn で、values の配列には matchLabels.<value> のみが含まれます。

matchExpressions または matchLabels のいずれかのみを使用します。

サブジェクトパラメーターの例

以下の YAML の場合は、default namespace の mysubject という名前の Deployment オブジェクトが選択されます。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: apps/v1
    kind: Deployment
    namespace: default
    name: mysubject
  ...

以下の YAML の場合、default namespace にラベル working=example が設定された Job オブジェクトが選択されます。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: batch/v1
    kind: Job
    namespace: default
    selector:
      matchLabels:
        working: example
  ...

以下の YAML の場合、default namespace にラベル working=example または working=sample が含まれる Pod オブジェクトが選択されます。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: v1
    kind: Pod
    namespace: default
    selector:
      - matchExpression:
        key: working
        operator: In
        values:
          - example
          - sample
  ...
5.10.1.4.2. CloudEvent オーバーライド

ceOverrides 定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides 定義に複数のフィールドを設定できます。

ceOverrides の定義は、以下のフィールドをサポートします。

フィールド説明必須またはオプション

extensions

アウトバウンドイベントで追加または上書きされる属性を指定します。各 extensions のキーと値のペアは、属性拡張機能としてイベントに個別に設定されます。

任意

注記

拡張子として許可されるのは、有効な CloudEvent 属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type 属性を変更することはできません。

CloudEvent オーバーライドの例

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

これにより、subjectK_CE_OVERRIDES 環境変数が設定されます。

出力例

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

5.10.1.4.3. include ラベル

シンクバインディングを使用するには、bindings.knative.dev/include: "true" ラベルをリソースまたはリソースが含まれる namespace のいずれかに割り当てる必要があります。リソース定義にラベルが含まれていない場合には、クラスター管理者は以下を実行してこれを namespace に割り当てることができます。

$ oc label namespace <namespace> bindings.knative.dev/include=true