第5章 トピックからのメッセージの送受信

OpenShift にインストールされている Kafka クラスターとの間でメッセージを送受信します。

この手順では、Kafka クライアントを使用してメッセージを生成および消費する方法について説明します。クライアントを OpenShift にデプロイするか、ローカル Kafka クライアントを OpenShift クラスターに接続できます。いずれかまたは両方のオプションを使用して、Kafka クラスターのインストールをテストできます。ローカルクライアントの場合は、OpenShift ルート接続を使用して Kafka クラスターにアクセスします。

oc コマンドラインツールを使用して、Kafka クライアントをデプロイして実行します。

前提条件

OpenShift クラスターにデプロイされた Kafka クライアントからのメッセージの送受信

プロデューサーおよびコンシューマーのクライアントを OpenShift クラスターにデプロイします。その後、クライアントを使用して、同じ namespace 内の Kafka クラスターとの間でメッセージを送受信できます。デプロイメントでは、Kafka を実行するために AMQ Streams コンテナーイメージを使用します。

  1. oc コマンドラインインターフェイスを使用して、Kafka プロデューサーをデプロイします。

    この例では、Kafka クラスター my-cluster に接続する Kafka プロデューサーをデプロイします

    my-topic という名前のトピックが作成されます。

    Kafka プロデューサーの OpenShift へのデプロイ

    oc run kafka-producer -ti \
    --image=registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0 \
    --rm=true \
    --restart=Never \
    -- bin/kafka-console-producer.sh
    --bootstrap-server my-cluster-kafka-bootstrap:9092 \
    --topic my-topic

  2. コマンドプロンプトから、いくつかのメッセージを入力します。
  3. OpenShift Web コンソールで Home > Projects ページに移動し、作成した amq-streams-kafka プロジェクトを選択します。
  4. Pod のリストから、kafka-producer をクリックして、プロデューサー Pod の詳細を表示します。
  5. Logs ページを選択して、入力したメッセージが存在することを確認します。
  6. oc コマンドラインインターフェイスを使用して、Kafka コンシューマーをデプロイします。

    Kafka コンシューマーの OpenShift へのデプロイ

    oc run kafka-consumer -ti \
    --image=registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0 \
    --rm=true \
    --restart=Never \
    -- bin/kafka-console-consumer.sh \
    --bootstrap-server my-cluster-kafka-bootstrap:9092 \
    --topic my-topic \
    --from-beginning

    コンシューマーは my-topic に生成されたメッセージを消費しました。

  7. コマンドプロンプトから、コンシューマーコンソールに着信メッセージが表示されていることを確認します。
  8. OpenShift Web コンソールで Home > Projects ページに移動し、作成した amq-streams-kafka プロジェクトを選択します。
  9. Pod のリストから、kafka-consumer をクリックして、コンシューマー Pod の詳細を表示します。
  10. Logs ページを選択して、消費したメッセージが存在することを確認します。

ローカルで実行されている Kafka クライアントからのメッセージの送受信

コマンドラインインターフェイスを使用して、ローカルマシンで Kafka プロデューサーとコンシューマーを実行します。

  1. AMQ Streams のソフトウェアダウンロードページ から AMQ Streams <version> インストールおよびサンプルファイル アーカイブをダウンロードして展開します。

    ファイルを任意の場所で展開します。

  2. コマンドラインインターフェイスを開き、トピック my-topic と TLS の認証プロパティーを使用して Kafka コンソールプロデューサーを起動します。

    OpenShift ルートを使用して Kafka ブローカーにアクセスする のに必要なプロパティーを追加します。

    • 使用している OpenShift ルートのホスト名およびポート 443 を使用します。
    • パスワードと、ブローカー証明書用に作成したトラストストアへの参照を使用します。

      ローカル Kafka プロデューサーの起動

      kafka-console-producer.sh \
      --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \
      --producer-property security.protocol=SSL \
      --producer-property ssl.truststore.password=password \
      --producer-property ssl.truststore.location=client.truststore.jks \
      --topic my-topic

  3. プロデューサーが実行しているコマンドラインインターフェイスにメッセージを入力します。
  4. Enter を押してメッセージを送信します。
  5. 新しいコマンドラインインターフェイスタブまたはウィンドウを開き、Kafka コンソールコンシューマーを起動してメッセージを受信します。

    プロデューサーと同じ接続の詳細を使用します。

    ローカル Kafka コンシューマーの起動

    kafka-console-consumer.sh \
    --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \
    --consumer-property security.protocol=SSL \
    --consumer-property ssl.truststore.password=password \
    --consumer-property ssl.truststore.location=client.truststore.jks \
    --topic my-topic --from-beginning

  6. コンシューマーコンソールに受信メッセージが表示されることを確認します。
  7. Crtl+C を押して、Kafka コンソールプロデューサーとコンシューマーを終了します。