5.6. Channels

5.6.1. 频道和订阅

频道是定义单一事件转发和持久层的自定义资源。事件源或生成程序在将事件发送到频道后,可使用订阅将这些事件发送到多个 Knative 服务或其他 sink。

频道工作流概述

您可以通过实例化受支持的 Channel 对象来创建频道,并通过修改 Subscription 对象中的 delivery 规格来配置重新发送尝试。

创建 Channel 对象后,根据默认频道实现,一个经过更改的准入 Webhook 会为 Channel 对象添加一组 spec.channelTemplate 属性。例如,对于 InMemoryChannel 默认实现,Channel 对象如下所示:

apiVersion: messaging.knative.dev/v1
kind: Channel
metadata:
  name: example-channel
  namespace: default
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1
    kind: InMemoryChannel

然后,频道控制器将根据这个 spec.channelTemplate 配置创建后备频道实例。

注意

创建后,spec.channelTemplate 属性将无法更改,因为它们由默认频道机制设置,而不是由用户设置。

当此机制与上例搭配使用时,会创建两个对象:一个通用的后备频道和一个 InMemoryChannel 频道。如果您使用不同的默认频道实现,使用特定于您的实现的频道替换 InMemoryChannel。例如,在 Apache Kafka 的 Knative 代理中,会创建 KafkaChannel 频道。

后备频道充当将订阅复制到用户创建的频道对象的代理,并设置用户创建的频道对象状态来反映后备频道的状态。

5.6.1.1. 频道实现类型

InMemoryChannelKafkaChannel 频道实现可用于 OpenShift Serverless 进行开发。

以下是 InMemoryChannel 类型频道的限制:

  • 事件没有持久性。如果 Pod 停机,则 Pod 上的事件将会丢失。
  • InMemoryChannel 频道没有实现事件排序,因此同时接收到的两个事件可能会以任何顺序传送给订阅者。
  • 如果订阅者拒绝某个事件,则不会默认重新发送尝试。您可以通过修改 Subscription 对象中的 delivery 规格来配置重新发送尝试。

5.6.2. 创建频道

频道是定义单一事件转发和持久层的自定义资源。事件源或生成程序在将事件发送到频道后,可使用订阅将这些事件发送到多个 Knative 服务或其他 sink。

频道工作流概述

您可以通过实例化受支持的 Channel 对象来创建频道,并通过修改 Subscription 对象中的 delivery 规格来配置重新发送尝试。

5.6.2.1. 使用 Administrator 视角创建频道

在集群中安装 Knative Eventing 后,您可以使用 Administrator 视角创建频道。

先决条件

  • OpenShift Serverless Operator 和 Knative Eventing 已安装在 OpenShift Container Platform 集群中。
  • 您已登录到 Web 控制台,且处于 Administrator 视角。
  • 具有集群管理员 OpenShift Container Platform 的权限。

流程

  1. 在 OpenShift Container Platform Web 控制台的 Administrator 视角中,导航到 ServerlessEventing
  2. Create 列表中,选择 Channel。您将被定向到 Channel 页。
  3. 选择您要在 Type 列表中创建的 Channel 对象类型。

    注意

    目前默认只支持 InMemoryChannel 频道对象。如果您在 OpenShift Serverless 上安装了 Apache Kafka 的 Knative 代理实现,则 Apache Kafka 的 Knative 频道可用。

  4. Create

5.6.2.2. 使用 Developer 视角创建频道

使用 OpenShift Container Platform Web 控制台提供了一个简化的用户界面来创建频道。在集群中安装 Knative Eventing 后,您可以使用 web 控制台创建频道。

先决条件

  • 已登陆到 OpenShift Container Platform Web 控制台。
  • OpenShift Serverless Operator 和 Knative Eventing 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. Developer 视角中,导航到 +AddChannel
  2. 选择您要在 Type 列表中创建的 Channel 对象类型。
  3. Create

验证

  • 通过导航到 Topology 页面确认频道现在存在。

    在 Topology 视图中查看频道

5.6.2.3. 使用 Knative CLI 创建频道

使用 Knative (kn) 创建频道提供了比直接修改 YAML 文件更精简且直观的用户界面。您可以使用 kn channel create 命令创建频道。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 已安装 Knative (kn) CLI。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  • 创建频道:

    $ kn channel create <channel_name> --type <channel_type>

    频道类型是可选的,但如果指定,则必须使用 Group:Version:Kind 格式。例如,您可以创建一个 InMemoryChannel 对象:

    $ kn channel create mychannel --type messaging.knative.dev:v1:InMemoryChannel

    输出示例

    Channel 'mychannel' created in namespace 'default'.

验证

  • 要确认该频道现在存在,请列出现有频道并检查输出:

    $ kn channel list

    输出示例

    kn channel list
    NAME        TYPE              URL                                                     AGE   READY   REASON
    mychannel   InMemoryChannel   http://mychannel-kn-channel.default.svc.cluster.local   93s   True

删除频道

  • 删除频道:

    $ kn channel delete <channel_name>

5.6.2.4. 使用 YAML 创建默认实现频道

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述频道,并以可重复的方式描述频道。要使用 YAML 创建无服务器频道,您必须创建一个 YAML 文件来定义 Channel 对象,然后使用 oc apply 命令应用它。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 安装 OpenShift CLI (oc) 。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 创建一个 Channel 对象作为一个 YAML 文件:

    apiVersion: messaging.knative.dev/v1
    kind: Channel
    metadata:
      name: example-channel
      namespace: default
  2. 应用 YAML 文件:

    $ oc apply -f <filename>

5.6.2.5. 使用 YAML 为 Apache Kafka 创建频道

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述频道,并以可重复的方式描述频道。您可以通过创建一个 Kafka 频道,创建由 Kafka 主题支持的 Knative Eventing 频道。要使用 YAML 创建 Kafka 频道,您必须创建一个 YAML 文件来定义 KafkaChannel 对象,然后使用 oc apply 命令应用它。

先决条件

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在 OpenShift Container Platform 集群中。
  • 安装 OpenShift CLI (oc) 。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 创建一个 KafkaChannel 对象作为一个 YAML 文件:

    apiVersion: messaging.knative.dev/v1beta1
    kind: KafkaChannel
    metadata:
      name: example-channel
      namespace: default
    spec:
      numPartitions: 3
      replicationFactor: 1
    重要

    仅支持 OpenShift Serverless 上的 KafkaChannel 对象的 v1beta1 API 版本。不要使用这个 API 的 v1alpha1 版本,因为这个版本现已弃用。

  2. 应用 KafkaChannel YAML 文件:

    $ oc apply -f <filename>

5.6.2.6. 后续步骤

5.6.3. 将频道连接到 sink

来自事件源或制作者发送到频道的事件可使用 订阅 转发到一个或多个 sink。您可以通过配置 Subscription 对象来创建订阅,它指定消耗发送到该频道的事件的频道和接收器(也称为 订阅者)。

5.6.3.1. 使用 Developer 视角创建订阅

创建频道和事件 sink 后,您可以创建一个订阅来启用事件交付。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建订阅。

先决条件

  • OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已在 OpenShift Container Platform 集群中安装。
  • 已登陆到 web 控制台。
  • 您已创建了事件 sink,如 Knative 服务以及频道。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. Developer 视角中,进入 Topology 页。
  2. 使用以下方法之一创建订阅:

    1. 将鼠标悬停在您要为其创建订阅的频道上,并拖动箭头。此时会显示 Add Subscription 选项。

      为频道创建订阅
      1. Subscriber 列表中选择您的接收器。
      2. Add
    2. 如果服务在与频道相同的命名空间或项目下的 Topology 视图中可用,点击您要为该频道创建订阅的频道,并将箭头直接拖到服务以立即从频道创建订阅到该服务。

验证

  • 创建订阅后,您可以在 Topology 视图中将频道连接到该服务的行显示为:

    Topology 视图中的订阅

5.6.3.2. 使用 YAML 创建订阅

创建频道和事件 sink 后,您可以创建一个订阅来启用事件交付。使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述订阅,并以可重复的方式描述订阅。要使用 YAML 创建订阅,您必须创建一个 YAML 文件来定义 Subscription 对象,然后使用 oc apply 命令应用它。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 安装 OpenShift CLI (oc) 。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  • 创建 Subscription 对象:

    • 创建 YAML 文件并将以下示例代码复制到其中:

      apiVersion: messaging.knative.dev/v1beta1
      kind: Subscription
      metadata:
        name: my-subscription 1
        namespace: default
      spec:
        channel: 2
          apiVersion: messaging.knative.dev/v1beta1
          kind: Channel
          name: example-channel
        delivery: 3
          deadLetterSink:
            ref:
              apiVersion: serving.knative.dev/v1
              kind: Service
              name: error-handler
        subscriber: 4
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display
      1
      订阅的名称。
      2
      订阅连接的频道的配置设置。
      3
      事件交付的配置设置。这会告诉订阅无法发送给订阅者的事件。配置后,消耗的事件会发送到 deadLetterSink。事件将被丢弃,不会尝试重新发送该事件,并在系统中记录错误。deadLetterSink 的值需要是一个 Destination
      4
      订阅用户的配置设置。这是事件从频道发送的事件 sink。
    • 应用 YAML 文件:

      $ oc apply -f <filename>

5.6.3.3. 使用 Knative CLI 创建订阅

创建频道和事件 sink 后,您可以创建一个订阅来启用事件交付。使用 Knative (kn) CLI 创建订阅提供了比直接修改 YAML 文件更精简且直观的用户界面。您可以使用带有适当标志的 kn subscription create 命令创建订阅。

先决条件

  • OpenShift Serverless Operator 和 Knative Eventing 已安装在 OpenShift Container Platform 集群中。
  • 已安装 Knative (kn) CLI。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  • 创建订阅以将接收器连接到频道:

    $ kn subscription create <subscription_name> \
      --channel <group:version:kind>:<channel_name> \ 1
      --sink <sink_prefix>:<sink_name> \ 2
      --sink-dead-letter <sink_prefix>:<sink_name> 3
    1
    --channel 指定应处理的云事件的来源。您必须提供频道名称。如果您没有使用由 Channel 自定义资源支持的默认 InMemoryChannel 频道,您必须为指定频道类型添加 <group:version:kind> 前缀。例如,这是 Apache Kafka 支持的频道的 messaging.knative.dev:v1beta1:KafkaChannel
    2
    --sink 指定事件要传送到的目标目的地。默认情况下,<sink_name> 解释为此名称的 Knative 服务,与订阅位于同一个命名空间中。您可以使用以下前缀之一指定接收器类型:
    ksvc
    Knative 服务。
    channel
    作为目的地的频道。这里只能引用默认频道类型。
    broker
    Eventing 代理。
    3
    可选: --sink-dead-letter 是一个可选标志,可用于指定在无法发送事件时哪些事件应发送到的接收器。如需更多信息,请参阅 OpenShift Serverless 事件交付文档。

    示例命令

    $ kn subscription create mysubscription --channel mychannel --sink ksvc:event-display

    输出示例

    Subscription 'mysubscription' created in namespace 'default'.

验证

  • 要确认频道已连接到事件接收器或 subscriber,使用一个订阅列出现有订阅并检查输出:

    $ kn subscription list

    输出示例

    NAME            CHANNEL             SUBSCRIBER           REPLY   DEAD LETTER SINK   READY   REASON
    mysubscription   Channel:mychannel   ksvc:event-display                              True

删除订阅

  • 删除订阅:

    $ kn subscription delete <subscription_name>

5.6.3.4. 使用 Administrator 视角创建订阅

创建频道和事件 sink(也称为 订阅者 )后,您可以创建一个订阅来启用事件交付。订阅是通过配置 Subscription 对象创建的,它指定了要向其发送事件的频道和订阅者。您还可以指定一些特定于订阅者的选项,比如如何处理失败。

先决条件

  • OpenShift Serverless Operator 和 Knative Eventing 已安装在 OpenShift Container Platform 集群中。
  • 您已登录到 Web 控制台,且处于 Administrator 视角。
  • 具有集群管理员 OpenShift Container Platform 的权限。
  • 您已创建了 Knative 频道。
  • 您已创建了 Knative 服务以用作订阅者。

流程

  1. 在 OpenShift Container Platform Web 控制台的 Administrator 视角中,导航到 ServerlessEventing
  2. Channel 选项卡中,选择您要在其中添加订阅的频道的 Options 菜单 kebab
  3. 点击列表中的 Add Subscription
  4. Add Subscription 对话框中,为订阅选择 Subscriber。订阅者是可以从频道接收事件的 Knative 服务。
  5. Add

5.6.3.5. 后续步骤

5.6.4. 默认频道实现

default-ch-webhook 配置映射可以用来指定 Knative Eventing 的默认频道实现。您可以为整个集群或一个或多个命名空间指定默认频道实现。目前支持 InMemoryChannelKafkaChannel 频道类型。

5.6.4.1. 配置默认频道实施

先决条件

  • 在 OpenShift Container Platform 上具有管理员权限。
  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 如果要将 Apache Kafka 的 Knative 频道用作默认频道实现,还必须在集群中安装 KnativeKafka CR。

流程

  • 修改 KnativeEventing 自定义资源,以添加 default-ch-webhook 配置映射的配置详情:

    apiVersion: operator.knative.dev/v1beta1
    kind: KnativeEventing
    metadata:
      name: knative-eventing
      namespace: knative-eventing
    spec:
      config: 1
        default-ch-webhook: 2
          default-ch-config: |
            clusterDefault: 3
              apiVersion: messaging.knative.dev/v1
              kind: InMemoryChannel
              spec:
                delivery:
                  backoffDelay: PT0.5S
                  backoffPolicy: exponential
                  retry: 5
            namespaceDefaults: 4
              my-namespace:
                apiVersion: messaging.knative.dev/v1beta1
                kind: KafkaChannel
                spec:
                  numPartitions: 1
                  replicationFactor: 1
    1
    spec.config 中,您可以指定您要为修改的配置添加的配置映射。
    2
    default-ch-webhook 配置映射可以用来指定集群的默认频道实施,也可以用于一个或多个命名空间。
    3
    集群范围的默认频道类型配置。在本例中,集群的默认频道实现是 InMemoryChannel
    4
    命名空间范围的默认频道类型配置。在本例中, my-namespace 命名空间的默认频道实现是 KafkaChannel
    重要

    配置特定于命名空间的默认设置会覆盖任何集群范围的设置。

5.6.5. 频道的安全配置

5.6.5.1. 为 Apache Kafka 的 Knative 频道配置 TLS 身份验证

Apache Kafka 客户端和服务器使用 传输层安全性 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。TLS 是 Apache Kafka 的 Knative 代理实现唯一支持的流量加密方法。

先决条件

  • 在 OpenShift Container Platform 上具有集群或专用管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群 CA 证书存储为一个 .pem 文件。
  • 您有一个 Kafka 集群客户端证书,并存储为 .pem 文件的密钥。
  • 安装 OpenShift CLI (oc) 。

流程

  1. 在所选命名空间中创建证书文件作为 secret:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-file=user.crt=certificate.pem \
      --from-file=user.key=key.pem
    重要

    使用密钥名称 ca.crtuser.crtuser.key。不要更改它们。

  2. 编辑 KnativeKafka 自定义资源:

    $ oc edit knativekafka
  3. 引用您的 secret 和 secret 的命名空间:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: <kafka_auth_secret>
        authSecretNamespace: <kafka_auth_secret_namespace>
        bootstrapServers: <bootstrap_servers>
        enabled: true
      source:
        enabled: true
    注意

    确保指定 bootstrap 服务器中的匹配端口。

    例如:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: tls-user
        authSecretNamespace: kafka
        bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9094
        enabled: true
      source:
        enabled: true

5.6.5.2. 为 Apache Kafka 的 Knative 频道配置 SASL 身份验证

Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。

先决条件

  • 在 OpenShift Container Platform 上具有集群或专用管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群的用户名和密码。
  • 您已选择使用 SASL 机制,例如 PLAINSCRAM-SHA-256SCRAM-SHA-512
  • 如果启用了 TLS,您还需要 Kafka 集群的 ca.crt 证书文件。
  • 安装 OpenShift CLI (oc) 。

流程

  1. 在所选命名空间中创建证书文件作为 secret:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \
      --from-literal=user="my-sasl-user"
    • 使用键名 ca.crt, password, 和 sasl.mechanism.不要更改它们。
    • 如果要将 SASL 与公共 CA 证书搭配使用,您必须在创建 secret 时使用 tls.enabled=true 标志,而不是使用 ca.crt 参数。例如:

      $ oc create secret -n <namespace> generic <kafka_auth_secret> \
        --from-literal=tls.enabled=true \
        --from-literal=password="SecretPassword" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="my-sasl-user"
  2. 编辑 KnativeKafka 自定义资源:

    $ oc edit knativekafka
  3. 引用您的 secret 和 secret 的命名空间:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: <kafka_auth_secret>
        authSecretNamespace: <kafka_auth_secret_namespace>
        bootstrapServers: <bootstrap_servers>
        enabled: true
      source:
        enabled: true
    注意

    确保指定 bootstrap 服务器中的匹配端口。

    例如:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: scram-user
        authSecretNamespace: kafka
        bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9093
        enabled: true
      source:
        enabled: true