第5章 トピックからのメッセージの送受信
OpenShift にインストールされている Kafka クラスターとの間でメッセージを送受信します。
この手順では、Kafka クライアントを使用してメッセージを生成および消費する方法について説明します。クライアントを OpenShift にデプロイするか、ローカル Kafka クライアントを OpenShift クラスターに接続できます。いずれかまたは両方のオプションを使用して、Kafka クラスターのインストールをテストできます。ローカルクライアントの場合は、OpenShift ルート接続を使用して Kafka クラスターにアクセスします。
oc コマンドラインツールを使用して、Kafka クライアントをデプロイして実行します。
前提条件
- OpenShift で Kafka クラスターを作成している。
- OpenShift で実行している Kafka クラスターへの外部アクセス用のルートを作成 している。
- AMQ Streams software downloads page ページから、最新バージョンの Red Hat AMQStreams アーカイブにアクセスできる。
OpenShift クラスターにデプロイされた Kafka クライアントからのメッセージの送受信
プロデューサーおよびコンシューマーのクライアントを OpenShift クラスターにデプロイします。その後、クライアントを使用して、同じ namespace 内の Kafka クラスターとの間でメッセージを送受信できます。デプロイメントでは、Kafka を実行するために AMQ Streams コンテナーイメージを使用します。
ocコマンドラインインターフェイスを使用して、Kafka プロデューサーをデプロイします。この例では、Kafka クラスター
my-clusterに接続する Kafka プロデューサーをデプロイしますmy-topicという名前のトピックが作成されます。Kafka プロデューサーの OpenShift へのデプロイ
oc run kafka-producer -ti \ --image=registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic
- コマンドプロンプトから、いくつかのメッセージを入力します。
-
OpenShift Web コンソールで Home > Projects ページに移動し、作成した
amq-streams-kafkaプロジェクトを選択します。 -
Pod のリストから、
kafka-producerをクリックして、プロデューサー Pod の詳細を表示します。 - Logs ページを選択して、入力したメッセージが存在することを確認します。
ocコマンドラインインターフェイスを使用して、Kafka コンシューマーをデプロイします。Kafka コンシューマーの OpenShift へのデプロイ
oc run kafka-consumer -ti \ --image=registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic \ --from-beginning
コンシューマーは
my-topicに生成されたメッセージを消費しました。- コマンドプロンプトから、コンシューマーコンソールに着信メッセージが表示されていることを確認します。
-
OpenShift Web コンソールで Home > Projects ページに移動し、作成した
amq-streams-kafkaプロジェクトを選択します。 -
Pod のリストから、
kafka-consumerをクリックして、コンシューマー Pod の詳細を表示します。 - Logs ページを選択して、消費したメッセージが存在することを確認します。
ローカルで実行されている Kafka クライアントからのメッセージの送受信
コマンドラインインターフェイスを使用して、ローカルマシンで Kafka プロデューサーとコンシューマーを実行します。
AMQ Streams のソフトウェアダウンロードページ から AMQ Streams <version> インストールおよびサンプルファイル アーカイブをダウンロードして展開します。
ファイルを任意の場所で展開します。
コマンドラインインターフェイスを開き、トピック
my-topicと TLS の認証プロパティーを使用して Kafka コンソールプロデューサーを起動します。OpenShift ルートを使用して Kafka ブローカーにアクセスする のに必要なプロパティーを追加します。
- 使用している OpenShift ルートのホスト名およびポート 443 を使用します。
パスワードと、ブローカー証明書用に作成したトラストストアへの参照を使用します。
ローカル Kafka プロデューサーの起動
kafka-console-producer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --producer-property security.protocol=SSL \ --producer-property ssl.truststore.password=password \ --producer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic
- プロデューサーが実行しているコマンドラインインターフェイスにメッセージを入力します。
- Enter を押してメッセージを送信します。
新しいコマンドラインインターフェイスタブまたはウィンドウを開き、Kafka コンソールコンシューマーを起動してメッセージを受信します。
プロデューサーと同じ接続の詳細を使用します。
ローカル Kafka コンシューマーの起動
kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --consumer-property security.protocol=SSL \ --consumer-property ssl.truststore.password=password \ --consumer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic --from-beginning
- コンシューマーコンソールに受信メッセージが表示されることを確認します。
- Crtl+C を押して、Kafka コンソールプロデューサーとコンシューマーを終了します。