Menu Close

5.14.2.2. 使用 Knative CLI 创建 Kafka 事件源

这部分论述了如何使用 kn 命令创建 Kafka 事件源。

先决条件

  • OpenShift Serverless Operator、Knative Eventing、Knative Serving 和 KnativeKafka 自定义资源(CR)已安装在集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。

流程

  1. 要验证 Kafka 事件源是否可以工作,请创建一个 Knative 服务,在服务日志中转储传入的事件:

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display
  2. 创建 KafkaSource CR:

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    注意

    将此命令中的占位符值替换为源名称、引导服务器和主题的值。

    --servers--topics--consumergroup 选项指定到 Kafka 集群的连接参数。--consumergroup 选项是可选的。

  3. 可选:查看您创建的 KafkaSource CR 的详情:

    $ kn source kafka describe <kafka_source_name>

    输出示例

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

验证步骤

  1. 触发 Kafka 实例将信息发送到主题:

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    在提示符后输入信息。这个命令假设:

    • Kafka 集群安装在 kafka 命名空间中。
    • KafkaSource 对象已被配置为使用 my-topic 主题。
  2. 通过查看日志来验证消息是否显示:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

5.14.2.2.1. Knative CLI --sink 标志

当使用 Knative(kn)CLI 创建事件生成的自定义资源时,您可以使用 --sink 标志指定事件从该资源发送到的接收器。

以下示例创建了使用服务 http://event-display.svc.cluster.local 的接收器绑定作为接收器:

使用 --sink 标志的命令示例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.local 中的 svc 确定接收器是一个 Knative 服务。其他默认的接收器前缀包括 channelbroker