5.15.2.2. Knative CLI를 사용하여 Kafka 이벤트 소스 생성

kn source kafka create 명령을 사용하여 Knative(kn) CLI를 사용하여 Kafka 소스를 생성할 수 있습니다. Knative CLI를 사용하여 이벤트 소스를 생성하면 YAML 파일을 직접 수정하는 것보다 더 간소화되고 직관적인 사용자 인터페이스를 제공합니다.

사전 요구 사항

  • OpenShift Serverless Operator, Knative Eventing, Knative Serving, KnativeKafka 사용자 정의 리소스(CR가 클러스터에 설치되어 있습니다.
  • 프로젝트를 생성했거나 OpenShift Container Platform에서 애플리케이션 및 기타 워크로드를 생성하는 데 적절한 역할 및 권한이 있는 프로젝트에 액세스할 수 있습니다.
  • 가져오려는 Kafka 메시지를 생성하는 Red Hat AMQ Streams(Kafka) 클러스터에 액세스할 수 있습니다.
  • Knative(kn) CLI가 설치되어 있습니다.
  • 선택 사항: 이 절차의 확인 단계를 사용하려면 OpenShift CLI(oc)를 설치했습니다.

프로세스

  1. Kafka 이벤트 소스가 작동하는지 확인하려면 수신 이벤트를 서비스 로그에 덤프하는 Knative 서비스를 생성합니다.

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display
  2. KafkaSource CR을 생성합니다.

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    참고

    이 명령의 자리 표시자 값을 소스 이름, 부트스트랩 서버 및 주제의 값으로 바꿉니다.

    --servers, --topics, --consumergroup 옵션은 Kafka 클러스터에 대한 연결 매개 변수를 지정합니다. --consumergroup 옵션은 선택 사항입니다.

  3. 선택 사항: 생성한 KafkaSource CR에 대한 세부 정보를 확인합니다.

    $ kn source kafka describe <kafka_source_name>

    출력 예

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

검증 단계

  1. Kafka 인스턴스를 트리거하여 메시지를 항목에 보냅니다.

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    프롬프트에 메시지를 입력합니다. 이 명령은 다음을 가정합니다.

    • Kafka 클러스터는 kafka 네임스페이스에 설치됩니다.
    • my-topic 주제를 사용하도록 KafkaSource 오브젝트가 구성되어 있습니다.
  2. 로그를 보고 메시지가 도착했는지 확인합니다.

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    출력 예

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

5.15.2.2.1. Knative CLI 싱크 플래그

Knative(kn) CLI를 사용하여 이벤트 소스를 생성할 때 --sink 플래그를 사용하여 해당 리소스에서 이벤트가 전송되는 싱크를 지정할 수 있습니다. 싱크는 다른 리소스에서 들어오는 이벤트를 수신할 수 있는 주소 지정 가능 또는 호출 가능한 리소스일 수 있습니다.

다음 예제에서는 싱크로 서비스 http://event-display.svc.cluster.local 를 사용하는 싱크 바인딩을 생성합니다.

싱크 플래그를 사용하는 명령의 예

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.localsvc 는 싱크가 Knative 서비스인지 확인합니다. 기타 기본 싱크 접두사에는 channel, 및 broker가 포함됩니다.