Chapter 10. Knative Eventing

10.1. Using brokers with Knative Eventing

Knative Eventing uses the default broker unless otherwise specified.

If you have cluster administrator permissions, you can create the default broker automatically using namespace annotation.

All other users must create a broker using the manual process as described in this guide.

10.1.1. Creating a broker manually

To create a broker, you must create a service account for each namespace, and give that service account the required RBAC permissions.

Prerequisites

  • Knative Eventing installed, which includes the ClusterRole.

Procedure

  1. Create the ServiceAccount objects.

    1. Create the eventing-broker-ingress object by entering the following command:

      $ oc -n <namespace> create serviceaccount eventing-broker-ingress
    2. Create the eventing-broker-filter object by entering the following command:

      $ oc -n <namespace> create serviceaccount eventing-broker-filter
  2. Give the objects that you have created RBAC permissions:

    $ oc -n default create rolebinding eventing-broker-ingress \
      --clusterrole=eventing-broker-ingress \
      --serviceaccount=default:eventing-broker-ingress
    $ oc -n default create rolebinding eventing-broker-filter \
      --clusterrole=eventing-broker-filter \
      --serviceaccount=default:eventing-broker-filter
  3. Create a broker by creating and applying a YAML file containing the following:

    apiVersion: eventing.knative.dev/v1beta1
    kind: Broker
    metadata:
      namespace: default
      name: default 1
    1
    This example uses the name default, but you can replace this with any other valid name.

10.1.2. Creating a broker automatically using namespace annotation

If you have cluster administrator permissions, you can create a broker automatically by annotating a namespace.

Prerequisites

  • Knative Eventing installed.
  • Cluster administrator permissions for OpenShift Container Platform.

Procedure

  1. Annotate your namespace by entering the following commands:

    $ oc label namespace default knative-eventing-injection=enabled 1
    $ oc -n default get broker default
    1
    Replace default with the desired namespace.

    The line shown in this example will automatically create a broker named default in the default namespace.

Note

Brokers created due to annotation will not be removed if you remove the annotation. You must manually delete them.

10.1.3. Deleting a broker that was created using namespace annotation

  1. Delete the injected broker from the selected namespace (in this example, the default namespace):

    $ oc -n default delete broker default

10.2. Using channels

It is possible to sink events from an event source to a Knative Eventing channel. Channels are custom resources (CRs) that define a single event-forwarding and persistence layer. After events have been sent to a channel, these events can be sent to multiple Knative services by using a subscription.

The default configuration for channel instances is defined in the default-ch-webhook ConfigMap. However, developers can still create their own channels directly by instantiating a supported channel object.

10.2.1. Supported channel types

Currently, OpenShift Serverless only supports the use of InMemoryChannel type channels as part of the Knative Eventing Technology Preview.

10.2.2. Using the default InMemoryChannel configuration

InMemoryChannels are for development use only, and should not be used in a production environment.

The following are limitations of InMemoryChannel type channels:

  • No event persistence is available. If a Pod goes down, events on that Pod are lost.
  • InMemoryChannel type channels do not implement event ordering, so two events that are received in the channel at the same time can be delivered to a subscriber in any order.
  • If a subscriber rejects an event, there are no re-delivery attempts. Instead, the rejected event is sent to a deadLetterSink if this sink exists, or is otherwise dropped. For more information about configuring event delivery and deadLetterSink settings for a channel, see Using subscriptions to send events from a channel to a sink.

When you install Knative Eventing, the following custom resource definition (CRD) is created automatically:

apiVersion: v1
kind: ConfigMap
metadata:
  namespace: knative-eventing
  name: config-br-default-channel
data:
  channelTemplateSpec: |
    apiVersion: messaging.knative.dev/v1
    kind: InMemoryChannel

Creating a channel using the cluster default configuration

  • Create a generic Channel custom object.

    apiVersion: messaging.knative.dev/v1
    kind: Channel
    metadata:
      name: example-channel
      namespace: default

    When the Channel object is created, a mutating admission webhook adds a set of spec.channelTemplate properties for the Channel object based on the default channel implementation.

    apiVersion: messaging.knative.dev/v1
    kind: Channel
    metadata:
      name: example-channel
      namespace: default
    spec:
      channelTemplate:
        apiVersion: messaging.knative.dev/v1
        kind: InMemoryChannel

The channel controller then creates the backing channel instance based on that spec.channelTemplate configuration. The spec.channelTemplate properties cannot be changed after creation, because they are set by the default channel mechanism rather than by the user.

When this mechanism is used, two objects are created: a generic channel, and an InMemoryChannel type channel.

The generic channel acts as a proxy that copies its subscriptions to the InMemoryChannel, and sets its status to reflect the status of the backing InMemoryChannel type channel.

Because the channel in this example is created in the default namespace, the channel uses the cluster default, which is InMemoryChannel.

10.3. Using subscriptions to send events from a channel to a sink

Subscriptions deliver events to event sinks from a Channel.

10.3.1. Creating a subscription

You can create a subscription to connect a service or other event sink to a channel.

Important

Knative Eventing is a Technology Preview feature. The InMemoryChannel type is provided for development use only, and should not be used in a production environment.

Prerequisites

Procedure

  1. Create a Subscription object to connect a channel to a service, by creating a YAML file containing the following:

    apiVersion: messaging.knative.dev/v1beta1
    kind: Subscription
    metadata:
      name: my-subscription 1
      namespace: default
    spec:
      channel: 2
        apiVersion: messaging.knative.dev/v1beta1
        kind: Channel
        name: example-channel
      delivery: 3
        deadLetterSink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: error-handler
      subscriber: 4
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    1
    Name of the subscription.
    2
    Configuration settings for the channel that the subscription connects to.
    3
    Configuration settings for event delivery. This tells the subscription what happens to events that cannot be delivered to the subscriber. When this is configured, events that failed to be consumed are sent to the deadLetterSink. The event is dropped, no re-delivery of the event is attempted, and an error is logged in the system. The deadLetterSink value must be a Destination.
    4
    Configuration settings for the subscriber. This is the event sink that events are delivered to from the channel.
  2. Apply the YAML file by entering:

    $ oc apply -f <filename>

10.4. Using triggers

All events which are sent to a channel or broker will be sent to all subscribers of that channel or broker by default.

Using triggers allows you to filter events from a channel or broker, so that subscribers will only receive a subset of events based on your defined criteria.

The Knative CLI provides a set of kn trigger commands that can be used to create and manage triggers.

10.4.1. Prerequisites

Before you can use triggers, you will need:

  • Knative Eventing and kn installed.
  • An available broker, either the default broker or one that you have created.

    You can create the default broker either by following the instructions on Using brokers with Knative Eventing, or by using the --inject-broker flag while creating a trigger. Use of this flag is described in the procedure below.

  • An available event consumer, for example, a Knative service.

10.4.2. Creating a trigger using kn

Procedure

  • Create a trigger:

    $ kn trigger create <trigger_name> --broker <broker_name> --filter <key=value> --sink <sink_name>

    To create a trigger and also create the default broker using broker injection, enter the following command:

    $ kn trigger create <TRIGGER-NAME> --inject-broker --filter <KEY=VALUE> --sink <SINK>

Example trigger YAML:

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: trigger-example 1
spec:
 broker: default 2
 subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service 3

1
The name of the trigger.
2
The name of the broker where events will be filtered from. If the broker is not specified, the trigger will revert to using the default broker.
3
The name of the service that will consumer filtered events.

10.4.3. Listing triggers using kn

The kn trigger list command prints a list of available triggers.

Procedure

  1. Print a list of available triggers:

    $ kn trigger list

    Example output

    NAME    BROKER    SINK           AGE   CONDITIONS   READY   REASON
    email   default   svc:edisplay   4s    5 OK / 5     True
    ping    default   svc:edisplay   32s   5 OK / 5     True

10.4.4. Listing triggers using kn in JSON format

Procedure

  1. Print a list of triggers in JSON format:

    $ kn trigger list -o json

10.4.5. Describing a trigger using kn

The kn trigger describe command prints information about a trigger.

Procedure

  • Enter the command:

    $ kn trigger describe <trigger_name>

    Example output

    Name:         ping
    Namespace:    default
    Labels:       eventing.knative.dev/broker=default
    Annotations:  eventing.knative.dev/creator=kube:admin, eventing.knative.dev/lastModifier=kube:admin
    Age:          2m
    Broker:       default
    Filter:
      type:       dev.knative.event
    
    Sink:
      Name:       edisplay
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE                  AGE REASON
      ++ Ready                  2m
      ++ BrokerReady            2m
      ++ DependencyReady        2m
      ++ Subscribed             2m
      ++ SubscriberResolved     2m

10.4.6. Deleting a trigger using kn

Procedure

  • Delete a trigger:
$ kn trigger delete <trigger_name>

10.4.7. Updating a trigger using kn

You can use the kn trigger update command with certain flags to update attributes for a trigger.

Example

  1. Update a trigger to filter exact event attributes that match incoming events, such as type=knative.dev.event:

    $ kn trigger update <trigger_name> --filter type=knative.dev.event
  2. Remove the filter attribute with key type:

    $ kn trigger update mytrigger --filter type-
  3. Update the sink of a trigger to use a service named event-display:

    $ kn trigger update <trigger_name> --sink svc:event-display

10.4.8. Filtering events using triggers

In the following trigger example, only events with the attribute type: dev.knative.samples.helloworld will reach the event sink.

$ kn trigger create <trigger_name> --broker <broker_name> --filter type=dev.knative.samples.helloworld --sink svc:<service_name>

You can also filter events using multiple attributes. The following example shows how to filter events using the type, source, and extension attributes.

$ kn trigger create <trigger_name> --broker <broker_name> --sink svc:<service_name> \
--filter type=dev.knative.samples.helloworld \
--filter source=dev.knative.samples/helloworldsource \
--filter myextension=my-extension-value

10.5. Using SinkBinding

SinkBinding is used to connect event producers, or event sources, to an event consumer, or event sink, for example, a Knative service or application.

Note

Both of the following procedures require you to create YAML files.

If you change the names of the YAML files from those used in the examples, you must ensure that you also update the corresponding CLI commands.

10.5.1. Using SinkBinding with the Knative CLI (kn)

This guide describes the steps required to create, manage, and delete a SinkBinding instance using kn commands.

Prerequisites

  • You have Knative Serving and Eventing installed.
  • You have the kn CLI installed.

Procedure

  1. To check that SinkBinding is set up correctly, create a Knative event display service, or event sink, that dumps incoming messages to its log:

    $ kn service create event-display --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  2. Create a SinkBinding that directs events to the service:

    $ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink svc:event-display
  3. Create a CronJob.

    1. Create a file named heartbeats-cronjob.yaml and copy the following sample code into it:

      apiVersion: batch/v1beta1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/knative-eventing-sources-heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
    2. After you have created the heartbeats-cronjob.yaml file, apply it by entering:

      $ oc apply -f heartbeats-cronjob.yaml
  4. Check that the controller is mapped correctly by entering the following command and inspecting the output:

    $ kn source binding describe bind-heartbeat

    Example output

    Name:         bind-heartbeat
    Namespace:    demo-2
    Annotations:  sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ...
    Age:          2m
    Subject:
      Resource:   job (batch/v1)
      Selector:
        app:      heartbeat-cron
    Sink:
      Name:       event-display
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE     AGE REASON
      ++ Ready     2m

Verification steps

You can verify that the Kubernetes events were sent to the Knative event sink by looking at the message dumper function logs.

  • View the message dumper function logs by entering the following commands:

    $ oc get pods
    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

10.5.2. Using SinkBinding with the YAML method

This guide describes the steps required to create, manage, and delete a SinkBinding instance using YAML files.

Prerequisites

  • You have Knative Serving and Eventing installed.

Procedure

  1. To check that SinkBinding is set up correctly, create a Knative event display service, or event sink, that dumps incoming messages to its log.

    1. Copy the following sample YAML into a file named service.yaml:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    2. After you have created the service.yaml file, apply it by entering:

      $ oc apply -f service.yaml
  2. Create a SinkBinding that directs events to the service.

    1. Create a file named sinkbinding.yaml and copy the following sample code into it:

      apiVersion: sources.knative.dev/v1alpha1
      kind: SinkBinding
      metadata:
        name: bind-heartbeat
      spec:
        subject:
          apiVersion: batch/v1
          kind: Job 1
          selector:
            matchLabels:
              app: heartbeat-cron
      
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display
      1
      In this example, any Job with the label app: heartbeat-cron will be bound to the event sink.
    2. After you have created the sinkbinding.yaml file, apply it by entering:

      $ oc apply -f sinkbinding.yaml
  3. Create a CronJob.

    1. Create a file named heartbeats-cronjob.yaml and copy the following sample code into it:

      apiVersion: batch/v1beta1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/knative-eventing-sources-heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
    2. After you have created the heartbeats-cronjob.yaml file, apply it by entering:

      $ oc apply -f heartbeats-cronjob.yaml
  4. Check that the controller is mapped correctly by entering the following command and inspecting the output:

    $ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml

    Example output

    spec:
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default
      subject:
        apiVersion: batch/v1
        kind: Job
        namespace: default
        selector:
          matchLabels:
            app: heartbeat-cron

Verification steps

You can verify that the Kubernetes events were sent to the Knative event sink by looking at the message dumper function logs.

  1. Enter the command:

    $ oc get pods
  2. Enter the command:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }