Chapter 10. Event sources

10.1. Understanding event sources

A Knative event source can be any Kubernetes object that generates or imports cloud events, and relays those events to another endpoint, known as a sink. Sourcing events is critical to developing a distributed system that reacts to events.

You can create and manage Knative event sources by using the Developer perspective in the OpenShift Container Platform web console, the kn CLI, or by applying YAML files.

Currently, OpenShift Serverless supports the following event source types:

API server source
Brings Kubernetes API server events into Knative. The API server source fires a new event each time a Kubernetes resource is created, updated or deleted.
Ping source
Produces events with a fixed payload on a specified cron schedule.
Kafka source
Connects a Kafka cluster to a sink as an event source.

You can also create a custom event source.

10.2. Listing event sources and event source types

You can use the kn CLI or the Developer perspective in the OpenShift Container Platform web console to list and manage available event sources or event source types.

Currently, OpenShift Serverless supports the following event source types:

API server source
Connects a sink to the Kubernetes API server.
Ping source
Periodically sends ping events with a constant payload. It can be used as a timer.
Sink binding
Connects core Kubernetes resource objects, such as Deployment, Job, or StatefulSet objects, with a sink.
Container source
Creates a custom event source by using an image.
Knative Kafka source
Connects a Kafka cluster to a sink as an event source.

10.2.1. Listing available event source types by using the Knative CLI

Procedure

  1. List the available event source types in the terminal:

    $ kn source list-types

    Example output

    TYPE              NAME                                            DESCRIPTION
    ApiServerSource   apiserversources.sources.knative.dev            Watch and send Kubernetes API events to a sink
    PingSource        pingsources.sources.knative.dev                 Periodically send ping events to a sink
    SinkBinding       sinkbindings.sources.knative.dev                Binding for connecting a PodSpecable to a sink

  2. Optional: You can also list the available event source types in YAML format:

    $ kn source list-types -o yaml

10.2.2. Viewing available event source types within the Developer perspective

You can use the web console to view available event source types.

Note

Additional event source types can be added by cluster administrators by installing Operators on OpenShift Container Platform.

Procedure

  1. Access the Developer perspective.
  2. Click +Add.
  3. Click Event source.

10.2.3. Listing available event sources by using the Knative CLI

  • List the available event sources:

    $ kn source list

    Example output

    NAME   TYPE              RESOURCE                               SINK         READY
    a1     ApiServerSource   apiserversources.sources.knative.dev   ksvc:eshow2   True
    b1     SinkBinding       sinkbindings.sources.knative.dev       ksvc:eshow3   False
    p1     PingSource        pingsources.sources.knative.dev        ksvc:eshow1   True

10.2.3.1. Listing event sources of a specific type only

You can list event sources of a specific type only, by using the --type flag.

  • List the available ping sources:

    $ kn source list --type PingSource

    Example output

    NAME   TYPE              RESOURCE                               SINK         READY
    p1     PingSource        pingsources.sources.knative.dev        ksvc:eshow1   True

10.3. Using the API server source

The API server source is an event source that can be used to connect an event sink, such as a Knative service, to the Kubernetes API server. The API server source watches for Kubernetes events and forwards them to the Knative Eventing broker.

10.3.1. Prerequisites

  • You must have a current installation of OpenShift Serverless, including Knative Serving and Eventing, in your OpenShift Container Platform cluster. This can be installed by a cluster administrator.
  • Event sources need a service to use as an event sink. The sink is the service or application that events are sent to from the event source.
  • You must create or update a service account, role and role binding for the event source.
Note

Some of the following procedures require you to create YAML files.

If you change the names of the YAML files from those used in the examples, you must ensure that you also update the corresponding CLI commands.

10.3.2. Creating a service account, role, and role binding for event sources

Procedure

  1. Create a service account, role, and role binding for the event source as a YAML file:

    Note

    If you want to re-use an existing service account, you can modify your existing ServiceAccount resource to include the required permissions instead of creating a new resource.

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 1
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 2
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 3
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 4
    1 2 3 4
    Change this namespace to the namespace that you have selected for installing the event source.
  2. Apply the YAML file:

    $ oc apply -f <filename>

10.3.3. Creating an API server source event source using the Developer perspective

Procedure

  1. In the Developer perspective, navigate to +AddEvent Source. The Event Sources page is displayed.
  2. Optional: If you have multiple providers for your event sources, select the required provider from the Providers list to filter the available event sources from the provider.
  3. Select ApiServerSource and then click Create Event Source. The Create Event Source page is displayed.
  4. Configure the ApiServerSource settings by using the Form view or YAML view:

    Note

    You can switch between the Form view and YAML view. The data is persisted when switching between the views.

    1. Enter v1 as the APIVERSION and Event as the KIND.
    2. Select the Service Account Name for the service account that you created.
    3. Select the Sink for the event source. A Sink can be either a Resource, such as a channel, broker, or service, or a URI.
  5. Click Create.

Verification

  • After you have created the API server source, you will see it connected to the service it is sinked to in the Topology view.

    ApiServerSource Topology view
Note

If a URI sink is used, modify the URI by right-clicking on URI sinkEdit URI.

10.3.4. Deleting an API server source using the Developer perspective

Procedure

  1. Navigate to the Topology view.
  2. Right-click the API server source and select Delete ApiServerSource.

    Delete the ApiServerSource

10.3.5. Creating an API server source by using the Knative CLI

This section describes the steps required to create an API server source using kn commands.

Prerequisites

  • You must have OpenShift Serverless, the Knative Serving and Eventing components, and the kn CLI installed.

Procedure

  1. Create an API server source that uses a broker as a sink:

    $ kn source apiserver create <event_source_name> --sink broker:<broker_name> --resource "event:v1" --service-account <service_account_name> --mode Resource
  2. To check that the API server source is set up correctly, create a Knative service that dumps incoming messages to its log:

    $ kn service create <service_name> --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  3. Create a trigger to filter events from the default broker to the service:

    $ kn trigger create <trigger_name> --sink ksvc:<service_name>
  4. Create events by launching a pod in the default namespace:

    $ oc create deployment hello-node --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  5. Check that the controller is mapped correctly by inspecting the output generated by the following command:

    $ kn source apiserver describe <source_name>

    Example output

    Name:                mysource
    Namespace:           default
    Annotations:         sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer
    Age:                 3m
    ServiceAccountName:  events-sa
    Mode:                Resource
    Sink:
      Name:       default
      Namespace:  default
      Kind:       Broker (eventing.knative.dev/v1)
    Resources:
      Kind:        event (v1)
      Controller:  false
    Conditions:
      OK TYPE                     AGE REASON
      ++ Ready                     3m
      ++ Deployed                  3m
      ++ SinkProvided              3m
      ++ SufficientPermissions     3m
      ++ EventTypesProvided        3m

Verification

You can verify that the Kubernetes events were sent to Knative by looking at the message dumper function logs.

  1. Get the pods:

    $ oc get pods
  2. View the message dumper function logs for the pods:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.apiserver.resource.update
      datacontenttype: application/json
      ...
    Data,
      {
        "apiVersion": "v1",
        "involvedObject": {
          "apiVersion": "v1",
          "fieldPath": "spec.containers{hello-node}",
          "kind": "Pod",
          "name": "hello-node",
          "namespace": "default",
           .....
        },
        "kind": "Event",
        "message": "Started container",
        "metadata": {
          "name": "hello-node.159d7608e3a3572c",
          "namespace": "default",
          ....
        },
        "reason": "Started",
        ...
      }

10.3.5.1. Knative CLI --sink flag

When you create an event-producing custom resource by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource, by using the --sink flag.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

Example command using the --sink flag

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.

10.3.6. Deleting the API server source by using the Knative CLI

This section describes the steps used to delete the API server source, trigger, service account, cluster role, and cluster role binding using kn and oc commands.

Prerequisites

  • You must have the kn CLI installed.

Procedure

  1. Delete the trigger:

    $ kn trigger delete <trigger_name>
  2. Delete the event source:

    $ kn source apiserver delete <source_name>
  3. Delete the service account, cluster role, and cluster binding:

    $ oc delete -f authentication.yaml

10.3.7. Using the API server source with the YAML method

This guide describes the steps required to create an API server source using YAML files.

Prerequisites

  • You will need to have a Knative Serving and Eventing installation.
  • You will need to have created the default broker in the same namespace as the one defined in the API server source YAML file.

Procedure

  1. Create a service account, role, and role binding for the API server source as a YAML file:

    Note

    If you want to re-use an existing service account, you can modify your existing ServiceAccount resource to include the required permissions instead of creating a new resource.

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 1
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 2
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 3
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 4
    1 2 3 4
    Change this namespace to the namespace that you have selected for installing the API server source.
  2. Apply the YAML file:

    $ oc apply -f <filename>
  3. Create an API server source as a YAML file:

    apiVersion: sources.knative.dev/v1alpha1
    kind: ApiServerSource
    metadata:
      name: testevents
    spec:
      serviceAccountName: events-sa
      mode: Resource
      resources:
        - apiVersion: v1
          kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default
  4. Apply the ApiServerSource YAML file:

    $ oc apply -f <filename>
  5. To check that the API server source is set up correctly, create a Knative service as a YAML file that dumps incoming messages to its log:

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: default
    spec:
      template:
        spec:
          containers:
            - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  6. Apply the Service YAML file:

    $ oc apply -f <filename>
  7. Create a Trigger object as a YAML file that filters events from the default broker to the service created in the previous step:

    apiVersion: eventing.knative.dev/v1
    kind: Trigger
    metadata:
      name: event-display-trigger
      namespace: default
    spec:
      broker: default
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
  8. Apply the Trigger YAML file:

    $ oc apply -f <filename>
  9. Create events by launching a pod in the default namespace:

    $ oc create deployment hello-node --image=quay.io/openshift-knative/knative-eventing-sources-event-display
  10. Check that the controller is mapped correctly, by entering the following command and inspecting the output:

    $ oc get apiserversource.sources.knative.dev testevents -o yaml

    Example output

    apiVersion: sources.knative.dev/v1alpha1
    kind: ApiServerSource
    metadata:
      annotations:
      creationTimestamp: "2020-04-07T17:24:54Z"
      generation: 1
      name: testevents
      namespace: default
      resourceVersion: "62868"
      selfLink: /apis/sources.knative.dev/v1alpha1/namespaces/default/apiserversources/testevents2
      uid: 1603d863-bb06-4d1c-b371-f580b4db99fa
    spec:
      mode: Resource
      resources:
      - apiVersion: v1
        controller: false
        controllerSelector:
          apiVersion: ""
          kind: ""
          name: ""
          uid: ""
        kind: Event
        labelSelector: {}
      serviceAccountName: events-sa
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default

Verification

To verify that the Kubernetes events were sent to Knative, you can look at the message dumper function logs.

  1. Get the pods by entering the following command:

    $ oc get pods
  2. View the message dumper function logs for the pods by entering the following command:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.apiserver.resource.update
      datacontenttype: application/json
      ...
    Data,
      {
        "apiVersion": "v1",
        "involvedObject": {
          "apiVersion": "v1",
          "fieldPath": "spec.containers{hello-node}",
          "kind": "Pod",
          "name": "hello-node",
          "namespace": "default",
           .....
        },
        "kind": "Event",
        "message": "Started container",
        "metadata": {
          "name": "hello-node.159d7608e3a3572c",
          "namespace": "default",
          ....
        },
        "reason": "Started",
        ...
      }

10.3.8. Deleting the API server source

This section describes how to delete the API server source, trigger, service account, cluster role, and cluster role binding by deleting their YAML files.

Procedure

  1. Delete the trigger:

    $ oc delete -f trigger.yaml
  2. Delete the event source:

    $ oc delete -f k8s-events.yaml
  3. Delete the service account, cluster role, and cluster binding:

    $ oc delete -f authentication.yaml

10.4. Using a ping source

A ping source is used to periodically send ping events with a constant payload to an event consumer.

A ping source can be used to schedule sending events, similar to a timer.

Example ping source YAML

apiVersion: sources.knative.dev/v1alpha2
kind: PingSource
metadata:
  name: test-ping-source
spec:
  schedule: "*/2 * * * *" 1
  jsonData: '{"message": "Hello world!"}' 2
  sink: 3
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display

1
The schedule of the event specified using CRON expression.
2
The event message body expressed as a JSON encoded data string.
3
These are the details of the event consumer. In this example, we are using a Knative service named event-display.

10.4.1. Creating a ping source using the Developer perspective

You can create and verify a basic ping source from the OpenShift Container Platform web console.

Prerequisites

To create a ping source using the Developer perspective, ensure that:

  • The OpenShift Serverless Operator, Knative Serving, and Knative Eventing are installed on your OpenShift Container Platform cluster.
  • You have logged in to the web console.
  • You are in the Developer perspective.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. To verify that the ping source is working, create a simple Knative service that dumps incoming messages to the logs of the service.

    1. In the Developer perspective, navigate to +AddYAML.
    2. Copy the example YAML:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    3. Click Create.
  2. Create a ping source in the same namespace as the service created in the previous step, or any other sink that you want to send events to.

    1. In the Developer perspective, navigate to +AddEvent Source. The Event Sources page is displayed.
    2. Optional: If you have multiple providers for your event sources, select the required provider from the Providers list to filter the available event sources from the provider.
    3. Select Ping Source and then click Create Event Source. The Create Event Source page is displayed.

      Note

      You can configure the PingSource settings by using the Form view or YAML view and can switch between the views. The data is persisted when switching between the views.

    4. Enter a value for Schedule. In this example, the value is */2 * * * *, which creates a PingSource that sends a message every two minutes.
    5. Optional: You can enter a value for Data, which is the message payload.
    6. Select a Sink. This can be either a Resource or a URI. In this example, the event-display service created in the previous step is used as the Resource sink.
    7. Click Create.

Verification

You can verify that the ping source was created and is connected to the sink by viewing the Topology page.

  1. In the Developer perspective, navigate to Topology.
  2. View the ping source and sink.

    View the ping source and service in the Topology view

10.4.2. Creating a ping source by using the Knative CLI

The following procedure describes how to create a basic ping source by using the kn CLI.

Prerequisites

  • You have Knative Serving and Eventing installed.
  • You have the kn CLI installed.

Procedure

  1. To verify that the ping source is working, create a simple Knative service that dumps incoming messages to the service logs:

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  2. For each set of ping events that you want to request, create a ping source in the same namespace as the event consumer:

    $ kn source ping create test-ping-source \
        --schedule "*/2 * * * *" \
        --data '{"message": "Hello world!"}' \
        --sink ksvc:event-display
  3. Check that the controller is mapped correctly by entering the following command and inspecting the output:

    $ kn source ping describe test-ping-source

    Example output

    Name:         test-ping-source
    Namespace:    default
    Annotations:  sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer
    Age:          15s
    Schedule:     */2 * * * *
    Data:         {"message": "Hello world!"}
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE                 AGE REASON
      ++ Ready                 8s
      ++ Deployed              8s
      ++ SinkProvided         15s
      ++ ValidSchedule        15s
      ++ EventTypeProvided    15s
      ++ ResourcesCorrect     15s

Verification

You can verify that the Kubernetes events were sent to the Knative event sink by looking at the logs of the sink pod.

By default, Knative services terminate their pods if no traffic is received within a 60 second period. The example shown in this guide creates a ping source that sends a message every 2 minutes, so each message should be observed in a newly created pod.

  1. Watch for new pods created:

    $ watch oc get pods
  2. Cancel watching the pods using Ctrl+C, then look at the logs of the created pod:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.sources.ping
      source: /apis/v1/namespaces/default/pingsources/test-ping-source
      id: 99e4f4f6-08ff-4bff-acf1-47f61ded68c9
      time: 2020-04-07T16:16:00.000601161Z
      datacontenttype: application/json
    Data,
      {
        "message": "Hello world!"
      }

10.4.2.1. Knative CLI --sink flag

When you create an event-producing custom resource by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource, by using the --sink flag.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

Example command using the --sink flag

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.

10.4.3. Deleting a ping source by using the Knative CLI

The following procedure describes how to delete a ping source using the kn CLI.

  • Delete the ping source:

    $ kn delete pingsources.sources.knative.dev <ping_source_name>

10.4.4. Using a ping source with YAML

The following sections describe how to create a basic ping source using YAML files.

Prerequisites

  • You have Knative Serving and Eventing installed.
Note

The following procedure requires you to create YAML files.

If you change the names of the YAML files from those used in the examples, you must ensure that you also update the corresponding CLI commands.

Procedure

  1. To verify that the ping source is working, create a simple Knative service that dumps incoming messages to the service’s logs.

    1. Copy the example YAML into a file named service.yaml:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    2. Create the service:

      $ oc apply --filename service.yaml
  2. For each set of ping events that you want to request, create a ping source in the same namespace as the event consumer.

    1. Copy the example YAML into a file named ping-source.yaml:

      apiVersion: sources.knative.dev/v1alpha2
      kind: PingSource
      metadata:
        name: test-ping-source
      spec:
        schedule: "*/2 * * * *"
        jsonData: '{"message": "Hello world!"}'
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display
    2. Create the ping source:

      $ oc apply --filename ping-source.yaml
  3. Check that the controller is mapped correctly by entering the following command:

    $ oc get pingsource.sources.knative.dev test-ping-source -oyaml

    Example output

    apiVersion: sources.knative.dev/v1alpha2
    kind: PingSource
    metadata:
      annotations:
        sources.knative.dev/creator: developer
        sources.knative.dev/lastModifier: developer
      creationTimestamp: "2020-04-07T16:11:14Z"
      generation: 1
      name: test-ping-source
      namespace: default
      resourceVersion: "55257"
      selfLink: /apis/sources.knative.dev/v1alpha2/namespaces/default/pingsources/test-ping-source
      uid: 3d80d50b-f8c7-4c1b-99f7-3ec00e0a8164
    spec:
      jsonData: '{ value: "hello" }'
      schedule: '*/2 * * * *'
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default

Verification

You can verify that the Kubernetes events were sent to the Knative event sink by looking at the sink pod’s logs.

By default, Knative services terminate their pods if no traffic is received within a 60 second period. The example shown in this guide creates a PingSource that sends a message every 2 minutes, so each message should be observed in a newly created pod.

  1. Watch for new pods created:

    $ watch oc get pods
  2. Cancel watching the pods using Ctrl+C, then look at the logs of the created pod:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.sources.ping
      source: /apis/v1/namespaces/default/pingsources/test-ping-source
      id: 042ff529-240e-45ee-b40c-3a908129853e
      time: 2020-04-07T16:22:00.000791674Z
      datacontenttype: application/json
    Data,
      {
        "message": "Hello world!"
      }

10.4.5. Deleting a ping source that was created by using YAML

The following procedure describes how to delete a ping source that was created by using YAML.

Procedure

  • Delete the ping source:

    $ oc delete -f <ping_source_yaml_filename>

    Example command

    $ oc delete -f ping-source.yaml

10.5. Using a Kafka source

You can create a Knative Kafka event source that reads events from an Apache Kafka cluster and passes these events to a sink.

10.5.1. Prerequisites

You can use the KafkaSource event source with OpenShift Serverless after you have Knative Eventing and Knative Kafka installed on your cluster.

10.5.2. Creating a Kafka event source by using the web console

You can create and verify a Kafka event source from the OpenShift Container Platform web console.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your cluster.
  • You have logged in to the web console.
  • You are in the Developer perspective.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Navigate to the Add page and select Event Source.
  2. In the Event Sources page, select Kafka Source in the Type section.
  3. Configure the Kafka Source settings:

    1. Add a comma-separated list of Bootstrap Servers.
    2. Add a comma-separated list of Topics.
    3. Add a Consumer Group.
    4. Select the Service Account Name for the service account that you created.
    5. Select the Sink for the event source. A Sink can be either a Resource, such as a channel, broker, or service, or a URI.
    6. Enter a Name for the Kafka event source.
  4. Click Create.

Verification

You can verify that the Kafka event source was created and is connected to the sink by viewing the Topology page.

  1. In the Developer perspective, navigate to Topology.
  2. View the Kafka event source and sink.

    View the Kafka source and service in the Topology view

10.5.3. Creating a Kafka event source by using the Knative CLI

This section describes how to create a Kafka event source by using the kn command.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, Knative Serving, and the KnativeKafka custom resource (CR) are installed on your cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.

Procedure

  1. To verify that the Kafka event source is working, create a Knative service that dumps incoming events into the service logs:

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display
  2. Create a KafkaSource CR:

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    Note

    Replace the placeholder values in this command with values for your source name, bootstrap servers, and topics.

    The --servers, --topics, and --consumergroup options specify the connection parameters to the Kafka cluster. The --consumergroup option is optional.

  3. Optional: View details about the KafkaSource CR you created:

    $ kn source kafka describe <kafka_source_name>

    Example output

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

Verification steps

  1. Trigger the Kafka instance to send a message to the topic:

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    Enter the message in the prompt. This command assumes that:

    • The Kafka cluster is installed in the kafka namespace.
    • The KafkaSource object has been configured to use the my-topic topic.
  2. Verify that the message arrived by viewing the logs:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

10.5.3.1. Knative CLI --sink flag

When you create an event-producing custom resource by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource, by using the --sink flag.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

Example command using the --sink flag

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.

10.5.4. Creating a Kafka event source by using YAML

You can create a Kafka event source by using YAML.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Create a KafkaSource object as a YAML file:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> 1
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> 2
      sink:
      - <list_of_sinks> 3
    1
    A consumer group is a group of consumers that use the same group ID, and consume data from a topic.
    2
    A topic provides a destination for the storage of data. Each topic is split into one or more partitions.
    3
    A sink specifies where events are sent to from a source.
    Important

    Only the v1beta1 version of the API for KafkaSource objects on OpenShift Serverless is supported. Do not use the v1alpha1 version of this API, as this version is now deprecated.

    Example KafkaSource object

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

  2. Apply the KafkaSource YAML file:

    $ oc apply -f <filename>

Verification

  • Verify that the Kafka event source was created by entering the following command:

    $ oc get pods

    Example output

    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

10.5.5. Additional resources

10.6. Custom event sources

If you need to ingress events from an event producer that is not included in Knative, or from a producer that emits events which are not in the CloudEvent format, you can do this by using one of the following methods:

  • Use a PodSpecable object as an event source, by creating a sink binding.
  • Use a container as an event source, by creating a container source.

10.6.1. Using a sink binding

The SinkBinding object supports decoupling event production from delivery addressing. Sink binding is used to connect event producers to an event consumer, or sink. An event producer is a Kubernetes resource that embeds a PodSpec template and produces events. A sink is an addressable Kubernetes object that can receive events.

The SinkBinding object injects environment variables into the PodTemplateSpec of the sink, which means that the application code does not need to interact directly with the Kubernetes API to locate the event destination. These environment variables are as follows:

K_SINK
The URL of the resolved sink.
K_CE_OVERRIDES
A JSON object that specifies overrides to the outbound event.

10.6.1.1. Using sink binding with the YAML method

This guide describes the steps required to create a sink binding instance using YAML files.

Prerequisites

  • You have Knative Serving and Eventing installed.
Note

The following procedure requires you to create YAML files.

If you change the names of the YAML files from those used in the examples, you must ensure that you also update the corresponding CLI commands.

Procedure

  1. To check that sink binding is set up correctly, create a Knative event display service, or event sink, that dumps incoming messages to its log.

    1. Copy the following sample YAML into a file named service.yaml:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    2. After you have created the service.yaml file, apply it by entering:

      $ oc apply -f service.yaml
  2. Create a sink binding instance that directs events to the service.

    1. Create a file named sinkbinding.yaml and copy the following sample code into it:

      apiVersion: sources.knative.dev/v1alpha1
      kind: SinkBinding
      metadata:
        name: bind-heartbeat
      spec:
        subject:
          apiVersion: batch/v1
          kind: Job 1
          selector:
            matchLabels:
              app: heartbeat-cron
      
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display
      1
      In this example, any Job with the label app: heartbeat-cron will be bound to the event sink.
    2. After you have created the sinkbinding.yaml file, apply it by entering:

      $ oc apply -f sinkbinding.yaml
  3. Create a CronJob resource.

    1. Create a file named heartbeats-cronjob.yaml and copy the following sample code into it:

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
      Important

      To use sink binding, you must manually add a bindings.knative.dev/include=true label to your Knative resources.

      For example, to add this label to a CronJob resource, add the following lines to the Job resource YAML definition:

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. After you have created the heartbeats-cronjob.yaml file, apply it by entering:

      $ oc apply -f heartbeats-cronjob.yaml
  4. Check that the controller is mapped correctly by entering the following command and inspecting the output:

    $ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml

    Example output

    spec:
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default
      subject:
        apiVersion: batch/v1
        kind: Job
        namespace: default
        selector:
          matchLabels:
            app: heartbeat-cron

Verification

You can verify that the Kubernetes events were sent to the Knative event sink by looking at the message dumper function logs.

  1. Enter the command:

    $ oc get pods
  2. Enter the command:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

10.6.1.2. Creating a sink binding by using the Knative CLI

This guide describes the steps required to create a sink binding instance using kn commands.

Prerequisites

  • You have Knative Serving and Eventing installed.
  • You have the kn CLI installed.
Note

The following procedure requires you to create YAML files.

If you change the names of the YAML files from those used in the examples, you must ensure that you also update the corresponding CLI commands.

Procedure

  1. To check that sink binding is set up correctly, create a Knative event display service, or event sink, that dumps incoming messages to its log:

    $ kn service create event-display --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  2. Create a sink binding instance that directs events to the service:

    $ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
  3. Create a CronJob custom resource (CR).

    1. Create a file named heartbeats-cronjob.yaml and copy the following sample code into it:

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
      Important

      To use sink binding, you must manually add a bindings.knative.dev/include=true label to your Knative CRs.

      For example, to add this label to a CronJob CR, add the following lines to the Job CR YAML definition:

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. After you have created the heartbeats-cronjob.yaml file, apply it by entering:

      $ oc apply -f heartbeats-cronjob.yaml
  4. Check that the controller is mapped correctly by entering the following command and inspecting the output:

    $ kn source binding describe bind-heartbeat

    Example output

    Name:         bind-heartbeat
    Namespace:    demo-2
    Annotations:  sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ...
    Age:          2m
    Subject:
      Resource:   job (batch/v1)
      Selector:
        app:      heartbeat-cron
    Sink:
      Name:       event-display
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE     AGE REASON
      ++ Ready     2m

Verification

You can verify that the Kubernetes events were sent to the Knative event sink by looking at the message dumper function logs.

  • View the message dumper function logs by entering the following commands:

    $ oc get pods
    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

10.6.1.2.1. Knative CLI --sink flag

When you create an event-producing custom resource by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource, by using the --sink flag.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

Example command using the --sink flag

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.

10.6.1.3. Creating a sink binding using the web console

You can create and verify a basic sink binding from the OpenShift Container Platform web console.

Prerequisites

To create a sink binding using the Developer perspective, ensure that:

  • The OpenShift Serverless Operator, Knative Serving, and Knative Eventing are installed on your OpenShift Container Platform cluster.
  • You have logged in to the web console.
  • You are in the Developer perspective.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Create a Knative service to use as a sink:

    1. In the Developer perspective, navigate to +AddYAML.
    2. Copy the example YAML:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    3. Click Create.
  2. Create a CronJob resource that is used as an event source and sends an event every minute.

    1. In the Developer perspective, navigate to +AddYAML.
    2. Copy the example YAML:

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "*/1 * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: true 1
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats
                    args:
                    - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
      1
      Ensure that you include the bindings.knative.dev/include: true label. The default namespace selection behavior of OpenShift Serverless uses inclusion mode.
    3. Click Create.
  3. Create a sink binding in the same namespace as the service created in the previous step, or any other sink that you want to send events to.

    1. In the Developer perspective, navigate to +AddEvent Source. The Event Sources page is displayed.
    2. Optional: If you have multiple providers for your event sources, select the required provider from the Providers list to filter the available event sources from the provider.
    3. Select Sink Binding and then click Create Event Source. The Create Event Source page is displayed.

      Note

      You can configure the Sink Binding settings by using the Form view or YAML view and can switch between the views. The data is persisted when switching between the views.

    4. In the apiVersion field enter batch/v1.
    5. In the Kind field enter Job.

      Note

      The CronJob kind is not supported directly by OpenShift Serverless sink binding, so the Kind field must target the Job objects created by the cron job, rather than the cron job object itself.

    6. Select a Sink. This can be either a Resource or a URI. In this example, the event-display service created in the previous step is used as the Resource sink.
    7. In the Match labels section:

      1. Enter app in the Name field.
      2. Enter heartbeat-cron in the Value field.

        Note

        The label selector is required when using cron jobs with sink binding, rather than the resource name. This is because jobs created by a cron job do not have a predictable name, and contain a randomly generated string in their name. For example, hearthbeat-cron-1cc23f.

    8. Click Create.

Verification

You can verify that the sink binding, sink, and cron job have been created and are working correctly by viewing the Topology page and pod logs.

  1. In the Developer perspective, navigate to Topology.
  2. View the sink binding, sink, and heartbeats cron job.

    View the sink binding and service in the Topology view
  3. Observe that successful jobs are being registered by the cron job once the sink binding is added. This means that the sink binding is successfully reconfiguring the jobs created by the cron job.
  4. Browse the logs of the event-display service pod to see events produced by the heartbeats cron job.

10.6.1.4. Sink binding reference

This topic provides reference information about the configurable parameters for SinkBinding objects.

SinkBinding objects support the following parameters:

FieldDescriptionRequired or optional

apiVersion

Specifies the API version, for example sources.knative.dev/v1.

Required

kind

Identifies this resource object as a SinkBinding object.

Required

metadata

Specifies metadata that uniquely identifies the SinkBinding object. For example, a name.

Required

spec

Specifies the configuration information for this SinkBinding object.

Required

spec.sink

A reference to an object that resolves to a URI to use as the sink.

Required

spec.subject

References the resources for which the runtime contract is augmented by binding implementations.

Required

spec.ceOverrides

Defines overrides to control the output format and modifications to the event sent to the sink.

Optional

10.6.1.4.1. Subject parameter

The Subject parameter references the resources for which the runtime contract is augmented by binding implementations.

The Subject definition supports the following fields:

FieldDescriptionRequired or optional

apiVersion

API version of the referent.

Required

kind

Kind of the referent.

Required

namespace

Namespace of the referent. If omitted, this defaults to the namespace of the object.

Optional

name

Name of the referent.

Do not use if you configure selector.

selector

Selector of the referents.

Do not use if you configure name.

selector.matchExpressions

A list of label selector requirements.

Only use one of either matchExpressions or matchLabels.

selector.matchExpressions.key

The label key that the selector applies to.

Required if using matchExpressions.

selector.matchExpressions.operator

Represents a key’s relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist.

Required if using matchExpressions.

selector.matchExpressions.values

An array of string values. If the operator parameter value is In or NotIn, the values array must be non-empty. If the operator parameter value is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch.

Required if using matchExpressions.

selector.matchLabels

A map of key-value pairs. Each key-value pair in the matchLabels map is equivalent to an element of matchExpressions, where the key field is matchLabels.<key>, the operator is In, and the values array contains only matchLabels.<value>.

Only use one of either matchExpressions or matchLabels.

10.6.1.4.1.1. Subject parameter examples

Given the following YAML, the Deployment object named mysubject in the default namespace is selected:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: apps/v1
    kind: Deployment
    namespace: default
    name: mysubject
  ...

Given the following YAML, any Job object with the label working=example in the default namespace is selected:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: batch/v1
    kind: Job
    namespace: default
    selector:
      matchLabels:
        working: example
  ...

Given the following YAML, any Pod object with the label working=example or working=sample in the default namespace is selected:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: v1
    kind: Pod
    namespace: default
    selector:
      - matchExpression:
        key: working
        operator: In
        values:
          - example
          - sample
  ...
10.6.1.4.2. CloudEvent overrides

A ceOverrides definition provides overrides that control the CloudEvent’s output format and modifications sent to the sink

A ceOverrides definition supports the following fields:

FieldDescriptionRequired or optional

extensions

Specifies which attributes are added or overridden on the outbound event. Each extensions key-value pair is set independently on the event as an attribute extension.

Optional

Note

Only valid CloudEvent attribute names are allowed as extensions. You cannot set the spec defined attributes from the extensions override configuration. For example, you can not modify the type attribute.

CloudEvent Overrides example

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

This sets the K_CE_OVERRIDES environment variable on the subject:

Example output

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

10.6.1.4.3. The include label

To use a SinkBinding, you need to do assign the bindings.knative.dev/include: "true" label to either the resource or the namespace. If the resource definition does not include the label, a cluster administrator can attach it to the namespace by running:

$ oc label namespace <namespace> bindings.knative.dev/include=true

10.6.2. Using a container source

Container sources create a container image that generates events and sends events to a sink. You can use a container source to create a custom event source, by creating a container image and a ContainerSource object that uses your image URI.

10.6.2.1. Guidelines for creating a container image

  • Two environment variables are injected by the container source controller: K_SINK and K_CE_OVERRIDES. These variables are resolved from the sink and ceOverrides spec, respectively.
  • Event messages are sent to the sink URI specified in the K_SINK environment variable. The event message can be in any format; however, using the CloudEvent spec is recommended.
10.6.2.1.1. Example container images

The following is an example of a heartbeats container image:

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	duckv1 "knative.dev/pkg/apis/duck/v1"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/kelseyhightower/envconfig"
)

type Heartbeat struct {
	Sequence int    `json:"id"`
	Label    string `json:"label"`
}

var (
	eventSource string
	eventType   string
	sink        string
	label       string
	periodStr   string
)

func init() {
	flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)")
	flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)")
	flag.StringVar(&sink, "sink", "", "the host url to heartbeat to")
	flag.StringVar(&label, "label", "", "a special label")
	flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats")
}

type envConfig struct {
	// Sink URL where to send heartbeat cloud events
	Sink string `envconfig:"K_SINK"`

	// CEOverrides are the CloudEvents overrides to be applied to the outbound event.
	CEOverrides string `envconfig:"K_CE_OVERRIDES"`

	// Name of this pod.
	Name string `envconfig:"POD_NAME" required:"true"`

	// Namespace this pod exists in.
	Namespace string `envconfig:"POD_NAMESPACE" required:"true"`

	// Whether to run continuously or exit.
	OneShot bool `envconfig:"ONE_SHOT" default:"false"`
}

func main() {
	flag.Parse()

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	if env.Sink != "" {
		sink = env.Sink
	}

	var ceOverrides *duckv1.CloudEventOverrides
	if len(env.CEOverrides) > 0 {
		overrides := duckv1.CloudEventOverrides{}
		err := json.Unmarshal([]byte(env.CEOverrides), &overrides)
		if err != nil {
			log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err)
			os.Exit(1)
		}
		ceOverrides = &overrides
	}

	p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
	if err != nil {
		log.Fatalf("failed to create http protocol: %s", err.Error())
	}

	c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow())
	if err != nil {
		log.Fatalf("failed to create client: %s", err.Error())
	}

	var period time.Duration
	if p, err := strconv.Atoi(periodStr); err != nil {
		period = time.Duration(5) * time.Second
	} else {
		period = time.Duration(p) * time.Second
	}

	if eventSource == "" {
		eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)
		log.Printf("Heartbeats Source: %s", eventSource)
	}

	if len(label) > 0 && label[0] == '"' {
		label, _ = strconv.Unquote(label)
	}
	hb := &Heartbeat{
		Sequence: 0,
		Label:    label,
	}
	ticker := time.NewTicker(period)
	for {
		hb.Sequence++

		event := cloudevents.NewEvent("1.0")
		event.SetType(eventType)
		event.SetSource(eventSource)
		event.SetExtension("the", 42)
		event.SetExtension("heart", "yes")
		event.SetExtension("beats", true)

		if ceOverrides != nil && ceOverrides.Extensions != nil {
			for n, v := range ceOverrides.Extensions {
				event.SetExtension(n, v)
			}
		}

		if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil {
			log.Printf("failed to set cloudevents data: %s", err.Error())
		}

		log.Printf("sending cloudevent to %s", sink)
		if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) {
			log.Printf("failed to send cloudevent: %v", res)
		}

		if env.OneShot {
			return
		}

		// Wait for next tick
		<-ticker.C
	}
}

The following is an example of a container source that references the previous heartbeats container image:

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        # This corresponds to a heartbeats image URI that you have built and published
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "example-pod"
            - name: POD_NAMESPACE
              value: "event-test"
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: example-service
...

10.6.2.2. Creating and managing container sources by using the Knative CLI

You can use the following kn commands to create and manage container sources:

Create a container source

$ kn source container create <container_source_name> --image <image_uri> --sink <sink>

Delete a container source

$ kn source container delete <container_source_name>

Describe a container source

$ kn source container describe <container_source_name>

List existing container sources

$ kn source container list

List existing container sources in YAML format

$ kn source container list -o yaml

Update a container source

This command updates the image URI for an existing container source:

$ kn source container update <container_source_name> --image <image_uri>

10.6.2.3. Creating a container source by using the web console

You can create a container source by using the Developer perspective of the OpenShift Container Platform web console.

Prerequisites

To create a container source using the Developer perspective, ensure that:

  • The OpenShift Serverless Operator and Knative Eventing are installed on your OpenShift Container Platform cluster.
  • You have logged in to the web console.
  • You are in the Developer perspective.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. In the Developer perspective, navigate to +AddEvent Source. The Event Sources page is displayed.
  2. Select Container Source and then click Create Event Source. The Create Event Source page is displayed.
  3. Configure the Container Source settings by using the Form view or YAML view:

    Note

    You can switch between the Form view and YAML view. The data is persisted when switching between the views.

    1. In the Image field, enter the URI of the image that you want to run in the container created by the container source.
    2. In the Name field, enter the name of the image.
    3. Optional: In the Arguments field, enter any arguments to be passed to the container.
    4. Optional: In the Environment variables field, add any environment variables to set in the container.
    5. In the Sink section, add a sink where events from the container source are routed to. If you are using the Form view, you can choose from the following options:

      1. Select Resource to use a channel, broker, or service as a sink for the event source.
      2. Select URI to specify where the events from the container source are routed to.
  4. After you have finished configuring the container source, click Create.

10.6.2.4. Container source reference

This topic provides reference information about the configurable fields for the ContainerSource object.

ContainerSource objects support the following fields:

FieldDescriptionRequired or optional

apiVersion

Specifies the API version, for example sources.knative.dev/v1.

Required

kind

Identifies this resource object as a ContainerSource object.

Required

metadata

Specifies metadata that uniquely identifies the ContainerSource object. For example, a name.

Required

spec

Specifies the configuration information for this ContainerSource object.

Required

spec.sink

A reference to an object that resolves to a URI to use as the sink.

Required

spec.template

A template spec for the ContainerSource object.

Required

spec.ceOverrides

Defines overrides to control the output format and modifications to the event sent to the sink.

Optional

10.6.2.4.1. Template parameter example
apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        - image: quay.io/openshift-knative/heartbeats:latest
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "mypod"
            - name: POD_NAMESPACE
              value: "event-test"
  ...
10.6.2.4.2. CloudEvent overrides

A ceOverrides definition provides overrides that control the CloudEvent’s output format and modifications sent to the sink

A ceOverrides definition supports the following fields:

FieldDescriptionRequired or optional

extensions

Specifies which attributes are added or overridden on the outbound event. Each extensions key-value pair is set independently on the event as an attribute extension.

Optional

Note

Only valid CloudEvent attribute names are allowed as extensions. You cannot set the spec defined attributes from the extensions override configuration. For example, you can not modify the type attribute.

CloudEvent Overrides example

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

This sets the K_CE_OVERRIDES environment variable on the subject:

Example output

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }