5.6. Channels

5.6.1. 频道和订阅

频道是定义单一事件转发和持久层的自定义资源。事件源或生成程序在将事件发送到频道后,可使用订阅将这些事件发送到多个 Knative 服务或其他 sink。

频道工作流概述

您可以通过实例化受支持的 Channel 对象来创建频道,并通过修改 Subscription 对象中的 delivery 规格来配置重新发送尝试。

创建 Channel 对象后,根据默认频道实现,一个经过更改的准入 Webhook 会为 Channel 对象添加一组 spec.channelTemplate 属性。例如,对于 InMemoryChannel 默认实现,Channel 对象如下所示:

apiVersion: messaging.knative.dev/v1
kind: Channel
metadata:
  name: example-channel
  namespace: default
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1
    kind: InMemoryChannel

然后,频道控制器将根据这个 spec.channelTemplate 配置创建后备频道实例。

注意

创建后,spec.channelTemplate 属性将无法更改,因为它们由默认频道机制设置,而不是由用户设置。

当此机制与上例搭配使用时,会创建两个对象:一个通用的后备频道和一个 InMemoryChannel 频道。如果您使用不同的默认频道实现,使用特定于您的实现的频道替换 InMemoryChannel。例如,在 Knative Kafka 中,创建 KafkaChannel 频道。

后备频道充当将订阅复制到用户创建的频道对象的代理,并设置用户创建的频道对象状态来反映后备频道的状态。

5.6.1.1. 频道实现类型

InMemoryChannelKafkaChannel 频道实现可用于 OpenShift Serverless 进行开发。

以下是 InMemoryChannel 类型频道的限制:

  • 事件没有持久性。如果 Pod 停机,则 Pod 上的事件将会丢失。
  • InMemoryChannel 频道没有实现事件排序,因此同时接收到的两个事件可能会以任何顺序传送给订阅者。
  • 如果订阅者拒绝某个事件,则不会默认重新发送尝试。您可以通过修改 Subscription 对象中的 delivery 规格来配置重新发送尝试。

5.6.2. 创建频道

频道是定义单一事件转发和持久层的自定义资源。事件源或生成程序在将事件发送到频道后,可使用订阅将这些事件发送到多个 Knative 服务或其他 sink。

频道工作流概述

您可以通过实例化受支持的 Channel 对象来创建频道,并通过修改 Subscription 对象中的 delivery 规格来配置重新发送尝试。

5.6.2.1. 使用 Administrator 视角创建频道

在集群中安装 Knative Eventing 后,您可以使用 Administrator 视角创建频道。

先决条件

  • OpenShift Serverless Operator 和 Knative Eventing 已安装在 OpenShift Container Platform 集群中。
  • 您已登录到 Web 控制台,且处于 Administrator 视角。
  • 具有集群管理员 OpenShift Container Platform 的权限。

流程

  1. 在 OpenShift Container Platform Web 控制台的 Administrator 视角中,导航到 ServerlessEventing
  2. Create 列表中,选择 Channel。您将被定向到 Channel 页。
  3. 选择您要在 Type 列表中创建的 Channel 对象类型。

    注意

    目前默认只支持 InMemoryChannel 频道对象。如果您在 OpenShift Serverless 上安装了 Knative Kafka,则 Kafka 频道可用。

  4. Create

5.6.2.2. 使用 Developer 视角创建频道

使用 OpenShift Container Platform Web 控制台提供了一个简化的用户界面来创建频道。在集群中安装 Knative Eventing 后,您可以使用 web 控制台创建频道。

先决条件

  • 已登陆到 OpenShift Container Platform Web 控制台。
  • OpenShift Serverless Operator 和 Knative Eventing 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. Developer 视角中,导航到 +AddChannel
  2. 选择您要在 Type 列表中创建的 Channel 对象类型。
  3. Create

验证

  • 通过导航到 Topology 页面确认频道现在存在。

    在 Topology 视图中查看频道

5.6.2.3. 使用 Knative CLI 创建频道

使用 Knative (kn) 创建频道提供了比直接修改 YAML 文件更精简且直观的用户界面。您可以使用 kn channel create 命令创建频道。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 已安装 Knative (kn) CLI。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  • 创建频道:

    $ kn channel create <channel_name> --type <channel_type>

    频道类型是可选的,但如果指定,则必须使用 Group:Version:Kind 格式。例如,您可以创建一个 InMemoryChannel 对象:

    $ kn channel create mychannel --type messaging.knative.dev:v1:InMemoryChannel

    输出示例

    Channel 'mychannel' created in namespace 'default'.

验证

  • 要确认该频道现在存在,请列出现有频道并检查输出:

    $ kn channel list

    输出示例

    kn channel list
    NAME        TYPE              URL                                                     AGE   READY   REASON
    mychannel   InMemoryChannel   http://mychannel-kn-channel.default.svc.cluster.local   93s   True

删除频道

  • 删除频道:

    $ kn channel delete <channel_name>

5.6.2.4. 使用 YAML 创建默认实现频道

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述频道,并以可重复的方式描述频道。要使用 YAML 创建无服务器频道,您必须创建一个 YAML 文件来定义 Channel 对象,然后使用 oc apply 命令应用它。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 安装 OpenShift CLI (oc) 。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 创建一个 Channel 对象作为一个 YAML 文件:

    apiVersion: messaging.knative.dev/v1
    kind: Channel
    metadata:
      name: example-channel
      namespace: default
  2. 应用 YAML 文件:

    $ oc apply -f <filename>

5.6.2.5. 使用 YAML 创建 Kafka 频道

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述频道,并以可重复的方式描述频道。您可以通过创建一个 Kafka 频道,创建由 Kafka 主题支持的 Knative Eventing 频道。要使用 YAML 创建 Kafka 频道,您必须创建一个 YAML 文件来定义 KafkaChannel 对象,然后使用 oc apply 命令应用它。

先决条件

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在 OpenShift Container Platform 集群中。
  • 安装 OpenShift CLI (oc) 。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 创建一个 KafkaChannel 对象作为一个 YAML 文件:

    apiVersion: messaging.knative.dev/v1beta1
    kind: KafkaChannel
    metadata:
      name: example-channel
      namespace: default
    spec:
      numPartitions: 3
      replicationFactor: 1
    重要

    仅支持 OpenShift Serverless 上的 KafkaChannel 对象的 v1beta1 API 版本。不要使用这个 API 的 v1alpha1 版本,因为这个版本现已弃用。

  2. 应用 KafkaChannel YAML 文件:

    $ oc apply -f <filename>

5.6.2.6. 后续步骤

5.6.3. 默认频道实现

default-ch-webhook 配置映射可以用来指定 Knative Eventing 的默认频道实现。您可以为整个集群或一个或多个命名空间指定默认频道实现。目前支持 InMemoryChannelKafkaChannel 频道类型。

5.6.3.1. 配置默认频道实施

先决条件

  • 在 OpenShift Container Platform 上具有管理员权限。
  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 如果要使用 Kafka 频道作为默认频道实现,还必须在集群中安装 KnativeKafka CR。

流程

  • 修改 KnativeEventing 自定义资源,以添加 default-ch-webhook 配置映射的配置详情:

    apiVersion: operator.knative.dev/v1beta1
    kind: KnativeEventing
    metadata:
      name: knative-eventing
      namespace: knative-eventing
    spec:
      config: 1
        default-ch-webhook: 2
          default-ch-config: |
            clusterDefault: 3
              apiVersion: messaging.knative.dev/v1
              kind: InMemoryChannel
              spec:
                delivery:
                  backoffDelay: PT0.5S
                  backoffPolicy: exponential
                  retry: 5
            namespaceDefaults: 4
              my-namespace:
                apiVersion: messaging.knative.dev/v1beta1
                kind: KafkaChannel
                spec:
                  numPartitions: 1
                  replicationFactor: 1
    1
    spec.config 中,您可以指定您要为修改的配置添加的配置映射。
    2
    default-ch-webhook 配置映射可以用来指定集群的默认频道实施,也可以用于一个或多个命名空间。
    3
    集群范围的默认频道类型配置。在本例中,集群的默认频道实现是 InMemoryChannel
    4
    命名空间范围的默认频道类型配置。在本例中, my-namespace 命名空间的默认频道实现是 KafkaChannel
    重要

    配置特定于命名空间的默认设置会覆盖任何集群范围的设置。

5.6.4. Knative Kafka 频道的安全配置

5.6.4.1. 为 Kafka 频道配置 TLS 验证

Apache Kafka 客户端和服务器使用 传输层安全性 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。TLS 是 Knative Kafka 唯一支持的流量加密方法。

先决条件

  • 在 OpenShift Container Platform 上具有集群或专用管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群 CA 证书存储为一个 .pem 文件。
  • 您有一个 Kafka 集群客户端证书,并存储为 .pem 文件的密钥。
  • 安装 OpenShift CLI (oc) 。

流程

  1. 在所选命名空间中创建证书文件作为 secret:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-file=user.crt=certificate.pem \
      --from-file=user.key=key.pem
    重要

    使用密钥名称 ca.crtuser.crtuser.key。不要更改它们。

  2. 编辑 KnativeKafka 自定义资源:

    $ oc edit knativekafka
  3. 引用您的 secret 和 secret 的命名空间:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: <kafka_auth_secret>
        authSecretNamespace: <kafka_auth_secret_namespace>
        bootstrapServers: <bootstrap_servers>
        enabled: true
      source:
        enabled: true
    注意

    确保指定 bootstrap 服务器中的匹配端口。

    例如:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: tls-user
        authSecretNamespace: kafka
        bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9094
        enabled: true
      source:
        enabled: true

5.6.4.2. 为 Kafka 频道配置 SASL 验证

Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。

先决条件

  • 在 OpenShift Container Platform 上具有集群或专用管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群的用户名和密码。
  • 您已选择使用 SASL 机制,例如 PLAINSCRAM-SHA-256SCRAM-SHA-512
  • 如果启用了 TLS,您还需要 Kafka 集群的 ca.crt 证书文件。
  • 安装 OpenShift CLI (oc) 。

流程

  1. 在所选命名空间中创建证书文件作为 secret:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \
      --from-literal=user="my-sasl-user"
    • 使用键名 ca.crt, password, 和 sasl.mechanism.不要更改它们。
    • 如果要将 SASL 与公共 CA 证书搭配使用,您必须在创建 secret 时使用 tls.enabled=true 标志,而不是使用 ca.crt 参数。例如:

      $ oc create secret -n <namespace> generic <kafka_auth_secret> \
        --from-literal=tls.enabled=true \
        --from-literal=password="SecretPassword" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="my-sasl-user"
  2. 编辑 KnativeKafka 自定义资源:

    $ oc edit knativekafka
  3. 引用您的 secret 和 secret 的命名空间:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: <kafka_auth_secret>
        authSecretNamespace: <kafka_auth_secret_namespace>
        bootstrapServers: <bootstrap_servers>
        enabled: true
      source:
        enabled: true
    注意

    确保指定 bootstrap 服务器中的匹配端口。

    例如:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: scram-user
        authSecretNamespace: kafka
        bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9093
        enabled: true
      source:
        enabled: true