第 5 章 从主题发送和接收信息
向在 OpenShift 上安装的 Kafka 集群发送或接收信息。
此流程描述了如何使用 Kafka 客户端生成和使用信息。您可以将客户端部署到 OpenShift,或将本地 Kafka 客户端连接到 OpenShift 集群。您可以使用其中一个或这两个选项来测试 Kafka 集群安装。对于本地客户端,您可以使用 OpenShift 路由连接访问 Kafka 集群。
您将使用 oc 命令行工具来部署并运行 Kafka 客户端。
先决条件
- 您已在 OpenShift 上创建 Kafka 集群。
- 您已创建了路由,供外部访问在 OpenShift 中运行的 Kafka 集群。
- 您可以从 AMQ Streams 软件下载页面访问 Red Hat AMQ Streams 归档的最新版本。
从部署到 OpenShift 集群的 Kafka 客户端发送和接收信息
将生成者和消费者客户端部署到 OpenShift 集群。然后,您可以使用客户端从同一命名空间中的 Kafka 集群发送和接收信息。部署使用 AMQ Streams 容器镜像来运行 Kafka。
使用
oc命令行界面部署 Kafka producer。本例部署一个 Kafka 生成者,它连接到 Kafka 集群
my-cluster名为
my-topic的主题被创建。将 Kafka producer 部署到 OpenShift
oc run kafka-producer -ti \ --image=registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic
- 从命令提示符输入多个信息。
-
在 OpenShift web 控制台中进入到 Home > Projects 页面,然后选择
amq-streams-kafka项目。 -
从 pod 列表中,点
kafka-producer以查看 producer pod 详情。 - 选择 Logs 页面,以检查您输入的消息是否存在。
使用
oc命令行界面部署 Kafka 用户。将 Kafka 使用者部署到 OpenShift
oc run kafka-consumer -ti \ --image=registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic \ --from-beginning
消费者使用生成给
my-topic的消息。- 从命令提示符,确认您在使用者控制台中看到传入的信息。
-
在 OpenShift web 控制台中进入到 Home > Projects 页面,然后选择
amq-streams-kafka项目。 -
从 pod 列表中,点
kafka-consumer查看使用者 pod 详情。 - 选择 Logs 页面,以检查您消耗的消息是否存在。
从本地运行的 Kafka 客户端发送和接收信息
使用命令行界面在本地机器上运行 Kafka 制作者和使用者。
从 AMQ Streams 软件下载页下载并提取 AMQ Streams < version > 安装和示例文件 存档。
将文件解压缩到任何目的地。
打开命令行界面,并使用主题
my-topic和 TLS 的身份验证属性启动 Kafka 控制台制作者。添加使用 OpenShift 路由访问 Kafka 代理所需的属性。
- 将主机名和端口 443 用于您正在使用的 OpenShift 路由。
使用密码并引用您为代理证书创建的信任存储。
启动本地 Kafka producer
kafka-console-producer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --producer-property security.protocol=SSL \ --producer-property ssl.truststore.password=password \ --producer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic
- 在运行制作者的命令行界面中键入您的消息。
- 按 enter 来发送邮件。
打开新的命令行界面标签页或窗口,并启动 Kafka 控制台使用者来接收信息。
使用与生成者相同的连接详情。
启动本地 Kafka 消费者
kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --consumer-property security.protocol=SSL \ --consumer-property ssl.truststore.password=password \ --consumer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic --from-beginning
- 确认您在使用者控制台中看到传入的信息。
- 按 Crtl+C 退出 Kafka 控制台生成者和消费者。