6.2. 配置 Knative Kafka

Knative Kafka 提供集成选项,供您在 OpenShift Serverless 中使用支持的 Apache Kafka 消息流平台。Kafka 为事件源、频道、代理和事件 sink 功能提供选项。

除了作为 OpenShift Serverless 核心安装一部分的 Knative Eventing 组件外,集群管理员还可安装 KnativeKafka 自定义资源 (CR) 。

注意

IBM Z 和 IBM Power Systems 目前不支持 Knative Kafka。

KnativeKafka CR 为用户提供其他选项,例如:

  • Kafka 源
  • Kafka 频道
  • Kafka 代理
  • Kafka 接收器

6.2.1. 安装 Knative Kafka

Knative Kafka 提供集成选项,供您在 OpenShift Serverless 中使用支持的 Apache Kafka 消息流平台。如果您已安装 KnativeKafka 自定义资源,则 OpenShift Serverless 安装中提供了 Knative Kafka 功能。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 您可以访问 Red Hat AMQ Streams 集群。
  • 如果要使用验证步骤,请安装 OpenShift CLI (oc) 。
  • 在 OpenShift Container Platform 上具有集群管理员权限。
  • 已登陆到 OpenShift Container Platform Web 控制台。

流程

  1. Administrator 视角中,进入 OperatorsInstalled Operators
  2. 检查页面顶部的 Project 下拉菜单是否已设置为 Project: knative-eventing
  3. 在 OpenShift Serverless Operator 的 Provided APIs 列表中,找到 Knative Kafka 复选框并点 Create Instance
  4. Create Knative Kafka 页面中配置 KnativeKafka 对象。

    重要

    要在集群中使用 Kafka 频道、源、代理或 sink,您需要将要使用的属性的 enabled 选项设置为 true。这些交换机默认设置为 false。另外,要使用 Kafka 频道、代理或接收器,您必须指定 bootstrap 服务器。

    KnativeKafka 自定义资源示例

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
        name: knative-kafka
        namespace: knative-eventing
    spec:
        channel:
            enabled: true 1
            bootstrapServers: <bootstrap_servers> 2
        source:
            enabled: true 3
        broker:
            enabled: true 4
            defaultConfig:
                bootstrapServers: <bootstrap_servers> 5
                numPartitions: <num_partitions> 6
                replicationFactor: <replication_factor> 7
        sink:
            enabled: true 8

    1
    让开发人员在集群中使用 KafkaChannel 频道类型。
    2
    以逗号分隔的 AMQ Streams 集群中的 bootstrap 服务器列表。
    3
    让开发人员在集群中使用 KafkaSource 事件源类型。
    4
    让开发人员在集群中使用 Knative Kafka 代理实现。
    5
    来自 Red Hat AMQ Streams 集群的 bootstrap 服务器的逗号分隔列表。
    6
    定义 Kafka 主题分区的数量,由 Broker 对象支持。默认值为 10
    7
    定义 Kafka 主题的复制因素,由 Broker 对象支持。默认值为 3
    8
    让开发人员在集群中使用 Kafka sink。
    注意

    replicationFactor 值必须小于或等于 Red Hat AMQ Streams 集群的节点数量。

    1. 建议您在不需要完全控制 KnativeKafka 对象创建的简单配置中使用该表单。
    2. 对于更复杂的配置,建议编辑 YAML,这可以完全控制 KnativeKafka 对象的创建。您可以通过点 Create Knative Kafka 页面右上角的 Edit YAML 链接来访问 YAML。
  5. 完成 Kafka 的任何可选配置后,点 Create。您会自动定向到 Knative Kafka 标签页,其中 knative-kafka 在资源列表中。

验证

  1. Knative Kafka 选项卡中的 knative-kafka 资源。您会自动定向到 Knative Kafka Overview 页面。
  2. 查看资源的 Conditions 列表,并确认其状态为 True

    Kafka Knative Overview 页面显示 Conditions

    如果条件的状态为 UnknownFalse,请等待几分钟刷新页面。

  3. 检查是否已创建 Knative Kafka 资源:

    $ oc get pods -n knative-eventing

    输出示例

    NAME                                        READY   STATUS    RESTARTS   AGE
    kafka-broker-dispatcher-7769fbbcbb-xgffn    2/2     Running   0          44s
    kafka-broker-receiver-5fb56f7656-fhq8d      2/2     Running   0          44s
    kafka-channel-dispatcher-84fd6cb7f9-k2tjv   2/2     Running   0          44s
    kafka-channel-receiver-9b7f795d5-c76xr      2/2     Running   0          44s
    kafka-controller-6f95659bf6-trd6r           2/2     Running   0          44s
    kafka-source-dispatcher-6bf98bdfff-8bcsn    2/2     Running   0          44s
    kafka-webhook-eventing-68dc95d54b-825xs     2/2     Running   0          44s

6.2.2. Knative Kafka 的安全配置

Kafka 集群通常使用 TLS 或 SASL 身份验证方法进行保护。您可以使用 TLS 或 SASL 将 Kafka 代理或频道配置为针对受保护的 Red Hat AMQ Streams 集群进行操作。

注意

红帽建议您同时启用 SASL 和 TLS。

6.2.2.1. 为 Kafka 代理配置 TLS 身份验证

Apache Kafka 客户端和服务器使用 传输层安全性 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。TLS 是 Knative Kafka 唯一支持的流量加密方法。

先决条件

  • 在 OpenShift Container Platform 上具有集群管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群 CA 证书存储为一个 .pem 文件。
  • 您有一个 Kafka 集群客户端证书,并存储为 .pem 文件的密钥。
  • 安装 OpenShift CLI (oc) 。

流程

  1. knative-eventing 命名空间中创建证书文件作为 secret:

    $ oc create secret -n knative-eventing generic <secret_name> \
      --from-literal=protocol=SSL \
      --from-file=ca.crt=caroot.pem \
      --from-file=user.crt=certificate.pem \
      --from-file=user.key=key.pem
    重要

    使用密钥名称 ca.crtuser.crtuser.key。不要更改它们。

  2. 编辑 KnativeKafka CR,并在 broker spec 中添加对 secret 的引用:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      broker:
        enabled: true
        defaultConfig:
          authSecretName: <secret_name>
    ...

6.2.2.2. 为 Kafka 代理配置 SASL 身份验证

Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。

先决条件

  • 在 OpenShift Container Platform 上具有集群管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群的用户名和密码。
  • 您已选择使用 SASL 机制,例如 PLAINSCRAM-SHA-256SCRAM-SHA-512
  • 如果启用了 TLS,您还需要 Kafka 集群的 ca.crt 证书文件。
  • 安装 OpenShift CLI (oc) 。

流程

  1. knative-eventing 命名空间中创建证书文件作为 secret:

    $ oc create secret -n knative-eventing generic <secret_name> \
      --from-literal=protocol=SASL_SSL \
      --from-literal=sasl.mechanism=<sasl_mechanism> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=user="my-sasl-user"
    • 使用键名 ca.crt, password, 和 sasl.mechanism.不要更改它们。
    • 如果要将 SASL 与公共 CA 证书搭配使用,您必须在创建 secret 时使用 tls.enabled=true 标志,而不是使用 ca.crt 参数。例如:

      $ oc create secret -n <namespace> generic <kafka_auth_secret> \
        --from-literal=tls.enabled=true \
        --from-literal=password="SecretPassword" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="my-sasl-user"
  2. 编辑 KnativeKafka CR,并在 broker spec 中添加对 secret 的引用:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      broker:
        enabled: true
        defaultConfig:
          authSecretName: <secret_name>
    ...

6.2.2.3. 为 Kafka 频道配置 TLS 验证

Apache Kafka 客户端和服务器使用 传输层安全性 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。TLS 是 Knative Kafka 唯一支持的流量加密方法。

先决条件

  • 在 OpenShift Container Platform 上具有集群管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群 CA 证书存储为一个 .pem 文件。
  • 您有一个 Kafka 集群客户端证书,并存储为 .pem 文件的密钥。
  • 安装 OpenShift CLI (oc) 。

流程

  1. 在所选命名空间中创建证书文件作为 secret:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-file=user.crt=certificate.pem \
      --from-file=user.key=key.pem
    重要

    使用密钥名称 ca.crtuser.crtuser.key。不要更改它们。

  2. 编辑 KnativeKafka 自定义资源:

    $ oc edit knativekafka
  3. 引用您的 secret 和 secret 的命名空间:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: <kafka_auth_secret>
        authSecretNamespace: <kafka_auth_secret_namespace>
        bootstrapServers: <bootstrap_servers>
        enabled: true
      source:
        enabled: true
    注意

    确保指定 bootstrap 服务器中的匹配端口。

    例如:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: tls-user
        authSecretNamespace: kafka
        bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9094
        enabled: true
      source:
        enabled: true

6.2.2.4. 为 Kafka 频道配置 SASL 验证

Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。

先决条件

  • 在 OpenShift Container Platform 上具有集群管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群的用户名和密码。
  • 您已选择使用 SASL 机制,例如 PLAINSCRAM-SHA-256SCRAM-SHA-512
  • 如果启用了 TLS,您还需要 Kafka 集群的 ca.crt 证书文件。
  • 安装 OpenShift CLI (oc) 。

流程

  1. 在所选命名空间中创建证书文件作为 secret:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \
      --from-literal=user="my-sasl-user"
    • 使用键名 ca.crt, password, 和 sasl.mechanism.不要更改它们。
    • 如果要将 SASL 与公共 CA 证书搭配使用,您必须在创建 secret 时使用 tls.enabled=true 标志,而不是使用 ca.crt 参数。例如:

      $ oc create secret -n <namespace> generic <kafka_auth_secret> \
        --from-literal=tls.enabled=true \
        --from-literal=password="SecretPassword" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="my-sasl-user"
  2. 编辑 KnativeKafka 自定义资源:

    $ oc edit knativekafka
  3. 引用您的 secret 和 secret 的命名空间:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: <kafka_auth_secret>
        authSecretNamespace: <kafka_auth_secret_namespace>
        bootstrapServers: <bootstrap_servers>
        enabled: true
      source:
        enabled: true
    注意

    确保指定 bootstrap 服务器中的匹配端口。

    例如:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: scram-user
        authSecretNamespace: kafka
        bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9093
        enabled: true
      source:
        enabled: true

6.2.2.5. 为 Kafka 源配置 SASL 身份验证

Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。

先决条件

  • 在 OpenShift Container Platform 上具有集群或专用管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群的用户名和密码。
  • 您已选择使用 SASL 机制,例如 PLAINSCRAM-SHA-256SCRAM-SHA-512
  • 如果启用了 TLS,您还需要 Kafka 集群的 ca.crt 证书文件。
  • 已安装 OpenShift (oc) CLI。

流程

  1. 在所选命名空间中创建证书文件作为 secret:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \ 1
      --from-literal=user="my-sasl-user"
    1
    SASL 类型可以是 PLAINSCRAM-SHA-256SCRAM-SHA-512
  2. 创建或修改 Kafka 源,使其包含以下 spec 配置:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: user
          password:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: password
          type:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: saslType
        tls:
          enable: true
          caCert: 1
            secretKeyRef:
              name: <kafka_auth_secret>
              key: ca.crt
    ...
    1
    如果您使用公有云 Kafka 服务,则不需要 caCert spec,如 Red Hat OpenShift Streams for Apache Kafka。

6.2.2.6. 为 Kafka sink 配置安全性

Apache Kafka 客户端和服务器使用 传输层安全性 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。TLS 是 Knative Kafka 唯一支持的流量加密方法。

Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。

先决条件

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源(CR)已安装在 OpenShift Container Platform 集群中。
  • KnativeKafka CR 中启用了 Kafka sink。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群 CA 证书存储为一个 .pem 文件。
  • 您有一个 Kafka 集群客户端证书,并存储为 .pem 文件的密钥。
  • 已安装 OpenShift (oc) CLI。
  • 您已选择使用 SASL 机制,例如 PLAINSCRAM-SHA-256SCRAM-SHA-512

流程

  1. 在与 KafkaSink 对象相同的命名空间中创建一个 secret:

    重要

    证书和密钥必须采用 PEM 格式。

    • 对于使用 SASL 时没有加密的身份验证:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_PLAINTEXT \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-literal=user=<username> \
        --from-literal=password=<password>
    • 对于使用 TLS 的 SASL 和加密进行身份验证:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_SSL \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ 1
        --from-literal=user=<username> \
        --from-literal=password=<password>
      1
      如果您使用公有云管理 Kafka 服务,可以省略 ca.crt 来使用系统的根 CA,如用于 Apache Kafka 的 Red Hat OpenShift Streams。
    • 使用 TLS 进行身份验证和加密:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SSL \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ 1
        --from-file=user.crt=<my_cert.pem_file_path> \
        --from-file=user.key=<my_key.pem_file_path>
      1
      如果您使用公有云管理 Kafka 服务,可以省略 ca.crt 来使用系统的根 CA,如用于 Apache Kafka 的 Red Hat OpenShift Streams。
  2. 创建或修改 KafkaSink 对象,并在 auth spec 中添加对 secret 的引用:

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
       name: <sink_name>
       namespace: <namespace>
    spec:
    ...
       auth:
         secret:
           ref:
             name: <secret_name>
    ...
  3. 应用 KafkaSink 对象:

    $ oc apply -f <filename>

6.2.3. 配置 Kafka 代理设置

您可以通过创建配置映射并在 Kafka Broker 对象中引用此配置映射,配置复制因素、bootstrap 服务器和 Kafka 代理的主题分区数量。

先决条件

  • 在 OpenShift Container Platform 上具有集群或专用管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源(CR)已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 已安装 OpenShift CLI(oc)。

流程

  1. 修改 kafka-broker-config 配置映射,或创建自己的配置映射来包含以下配置:

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: <config_map_name> 1
      namespace: <namespace> 2
    data:
      default.topic.partitions: <integer> 3
      default.topic.replication.factor: <integer> 4
      bootstrap.servers: <list_of_servers> 5
    1
    配置映射名称。
    2
    配置映射所在的命名空间。
    3
    Kafka 代理的主题分区数量。这控制如何将事件发送到代理的速度。更多分区需要更多计算资源。
    4
    主题消息的复制因素。这可防止数据丢失。更高的复制因素需要更大的计算资源和更多存储。
    5
    以逗号分隔的 bootstrap 服务器列表。这可以位于 OpenShift Container Platform 集群内部或外部,是代理从发送事件发送到的 Kafka 集群列表。
    重要

    default.topic.replication.factor 值必须小于或等于集群中的 Kafka 代理实例数量。例如,如果您只有一个 Kafka 代理,则 default.topic.replication.factor 值应该不超过 "1"

    Kafka 代理配置映射示例

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-broker-config
      namespace: knative-eventing
    data:
      default.topic.partitions: "10"
      default.topic.replication.factor: "3"
      bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

  2. 应用配置映射:

    $ oc apply -f <config_map_filename>
  3. 指定 Kafka Broker 对象的配置映射:

    Broker 对象示例

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      name: <broker_name> 1
      namespace: <namespace> 2
      annotations:
        eventing.knative.dev/broker.class: Kafka 3
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: <config_map_name> 4
        namespace: <namespace> 5
    ...

    1
    代理名称。
    2
    代理存在的命名空间。
    3
    代理类注解。在本例中,代理是使用类值 Kafka 的 Kafka 代理。
    4
    配置映射名称。
    5
    配置映射所在的命名空间。
  4. 应用代理:

    $ oc apply -f <broker_filename>

其他资源

6.2.4. 其他资源