5.15. 使用 Knative Kafka

Knative Kafka 提供集成选项,供您在 OpenShift Serverless 中使用支持的 Apache Kafka 消息流平台。Kafka 为事件源、频道、代理和事件 sink 功能提供选项。

如果集群管理员安装了 KnativeKafka 自定义资源,则 OpenShift Serverless 安装中就会提供 Knative Kafka 功能。

注意

IBM Z 和 IBM Power Systems 目前不支持 Knative Kafka。

Knative Kafka 提供了额外的选项,例如:

  • Kafka 源
  • Kafka 频道
  • Kafka 代理
  • Kafka 接收器

5.15.1. Kafka 事件交付和重试

在事件驱动的架构中使用 Kafka 组件会提供"至少一次"事件交付。这意味着,会在收到返回代码值前重试操作。这使您的应用对丢失的事件更具弹性,但可能会导致发送重复的事件。

对于 Kafka 事件源,默认会尝试发送事件的固定次数。对于 Kafka 频道,只有在 Kafka 频道 Delivery 规格中配置了它们时才会进行重试。

有关交付保证的更多信息,请参阅 事件交付 文档。

5.15.2. Kafka 源

您可以创建一个 Kafka 源从 Apache Kafka 集群中读取事件,并将这些事件传递给接收器。您可以使用 OpenShift Container Platform web 控制台、Knative (kn) CLI 或直接创建 KafkaSource 对象并使用 OpenShift CLI (oc) 创建 Kafka 源来应用它。

5.15.2.1. 使用 Web 控制台创建 Kafka 事件源

在集群中安装了 Knative Kafka 后,您可以使用 Web 控制台创建 Kafka 源。使用 OpenShift Container Platform Web 控制台提供了一个简化的用户界面来创建 Kafka 源。

先决条件

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在集群中。
  • 已登陆到 web 控制台。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. Developer 视角中,进入到 +Add 页面并选择 Event Source
  2. Event Sources 页面中,在 Type 部分选择 Kafka Source
  3. 配置 Kafka Source 设置:

    1. 添加用逗号分开的 Bootstrap 服务器列表。
    2. 添加以逗号分隔的标题列表。
    3. 添加一个 消费者组
    4. 为您创建的服务帐户选择 Service Account Name
    5. 为事件源选择 SinkSink 可以是一个 资源,如频道、代理或服务,也可以是一个 URI
    6. 输入 Kafka 事件源的 名称
  4. Create

验证

您可以通过查看 Topology 页面来验证 Kafka 事件源是否已创建并连接到接收器。

  1. Developer 视角中,导航到 Topology
  2. 查看 Kafka 事件源和接收器。

    在 Topology 视图中查看 Kafka 源和服务

5.15.2.2. 使用 Knative CLI 创建 Kafka 事件源

您可以使用 kn source kafka create 命令,使用 Knative (kn) CLI 创建 Kafka 源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。

先决条件

  • OpenShift Serverless Operator、Knative Eventing、Knative Serving 和 KnativeKafka 自定义资源(CR)已安装在集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
  • 已安装 Knative (kn) CLI。
  • 可选:如果您想要使用此流程中的验证步骤,已安装 OpenShift CLI (oc)。

流程

  1. 要验证 Kafka 事件源是否可以工作,请创建一个 Knative 服务,在服务日志中转储传入的事件:

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display
  2. 创建 KafkaSource CR:

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    注意

    将此命令中的占位符值替换为源名称、引导服务器和主题的值。

    --servers--topics--consumergroup 选项指定到 Kafka 集群的连接参数。--consumergroup 选项是可选的。

  3. 可选:查看您创建的 KafkaSource CR 的详情:

    $ kn source kafka describe <kafka_source_name>

    输出示例

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

验证步骤

  1. 触发 Kafka 实例将信息发送到主题:

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    在提示符后输入信息。这个命令假设:

    • Kafka 集群安装在 kafka 命名空间中。
    • KafkaSource 对象已被配置为使用 my-topic 主题。
  2. 通过查看日志来验证消息是否显示:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

5.15.2.2.1. Knative CLI sink 标记

当使用 Knative (kn) CLI 创建事件源时,您可以使用 --sink 标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。

以下示例创建使用服务 http://event-display.svc.cluster.local 的接收器绑定作为接收器:

使用 sink 标记的命令示例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.local 中的 svc 确定接收器是一个 Knative 服务。其他默认的接收器前缀包括 channelbroker

5.15.2.3. 使用 YAML 创建 Kafka 事件源

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述应用程序,并以可重复的方式描述应用程序。要使用 YAML 创建 Kafka 源,您必须创建一个 YAML 文件来定义 KafkaSource 对象,然后使用 oc apply 命令应用它。

先决条件

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
  • 安装 OpenShift CLI (oc) 。

流程

  1. 创建 KafkaSource 对象作为 YAML 文件:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> 1
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> 2
      sink:
      - <list_of_sinks> 3
    1
    用户组是一组使用相同组群 ID 的用户,并消耗一个标题中的数据。
    2
    主题提供数据存储的目的地。每个主题都被分成一个或多个分区。
    3
    sink 指定事件从源发送到的位置。
    重要

    仅支持 OpenShift Serverless 上的 KafkaSource 对象的 v1beta1 API 版本。不要使用这个 API 的 v1alpha1 版本,因为这个版本现已弃用。

    KafkaSource 对象示例

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

  2. 应用 KafkaSource YAML 文件:

    $ oc apply -f <filename>

验证

  • 输入以下命令验证 Kafka 事件源是否已创建:

    $ oc get pods

    输出示例

    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

5.15.3. Kafka 代理

对于生产环境就绪的 Knative Eventing 部署,红帽建议您使用 Knative Kafka 代理实现。Kafka 代理是 Knative 代理的 Apache Kafka 原生实现,它将 CloudEvents 直接发送到 Kafka 实例。

重要

Kafka 代理禁用联邦信息处理标准 (FIPS) 模式。

Kafka 代理具有与 Kafka 的原生集成,用于存储和路由事件。它可以更好地与 Kafka 集成用于代理,并在其他代理类型中触发模型,并减少网络跃点。Kafka 代理实现的其他优点包括:

  • 最少一次的交付保证
  • 根据 CloudEvents 分区扩展排序事件交付
  • 数据平面的高可用性
  • 水平扩展数据平面

Knative Kafka 代理使用二进制内容模式将传入的 CloudEvents 存储为 Kafka 记录。这意味着,所有 CloudEvent 属性和扩展都会在 Kafka 记录上映射,而 CloudEvent 的 data 规格与 Kafka 记录的值对应。

有关使用 Kafka 代理的详情,请参考创建代理

5.15.4. 使用 YAML 创建 Kafka 频道

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述频道,并以可重复的方式描述频道。您可以通过创建一个 Kafka 频道,创建由 Kafka 主题支持的 Knative Eventing 频道。要使用 YAML 创建 Kafka 频道,您必须创建一个 YAML 文件来定义 KafkaChannel 对象,然后使用 oc apply 命令应用它。

先决条件

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在 OpenShift Container Platform 集群中。
  • 安装 OpenShift CLI (oc) 。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 创建一个 KafkaChannel 对象作为一个 YAML 文件:

    apiVersion: messaging.knative.dev/v1beta1
    kind: KafkaChannel
    metadata:
      name: example-channel
      namespace: default
    spec:
      numPartitions: 3
      replicationFactor: 1
    重要

    仅支持 OpenShift Serverless 上的 KafkaChannel 对象的 v1beta1 API 版本。不要使用这个 API 的 v1alpha1 版本,因为这个版本现已弃用。

  2. 应用 KafkaChannel YAML 文件:

    $ oc apply -f <filename>

5.15.5. Kafka 接收器

如果集群管理员在集群中启用了 Kafka,则 Kafka sink 是事件 sink 类型。您可以使用 Kafka sink 将事件直接从事件源发送到 Kafka 主题。

5.15.5.1. 使用 Kafka sink

您可以创建一个称为 Kafka sink 的事件 sink,用于将事件发送到 Kafka 主题。使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述应用程序,并以可重复的方式描述应用程序。默认情况下,Kafka sink 使用二进制内容模式,其效率比结构化模式更高效。要使用 YAML 创建 Kafka sink,您必须创建一个 YAML 文件来定义 KafkaSink 对象,然后使用 oc apply 命令应用它。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源 (CR) 。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
  • 安装 OpenShift CLI (oc) 。

流程

  1. 创建一个 KafkaSink 对象定义作为一个 YAML 文件:

    Kafka sink YAML

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
      name: <sink-name>
      namespace: <namespace>
    spec:
      topic: <topic-name>
      bootstrapServers:
       - <bootstrap-server>

  2. 要创建 Kafka sink,请应用 KafkaSink YAML 文件:

    $ oc apply -f <filename>
  3. 配置事件源,以便在其 spec 中指定 sink:

    连接到 API 服务器源的 Kafka sink 示例

    apiVersion: sources.knative.dev/v1alpha2
    kind: ApiServerSource
    metadata:
      name: <source-name> 1
      namespace: <namespace> 2
    spec:
      serviceAccountName: <service-account-name> 3
      mode: Resource
      resources:
      - apiVersion: v1
        kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1alpha1
          kind: KafkaSink
          name: <sink-name> 4

    1
    事件源的名称。
    2
    事件源的命名空间。
    3
    事件源的服务帐户。
    4
    Kafka sink 名称。

5.15.6. 其他资源