8.2. 配置 Knative Kafka
Knative Kafka 提供集成选项,供您在 OpenShift Serverless 中使用支持的 Apache Kafka 消息流平台。Kafka 为事件源、频道、代理和事件 sink 功能提供选项。
除了作为 OpenShift Serverless 核心安装一部分的 Knative Eventing 组件外,集群管理员还可安装 KnativeKafka
自定义资源 (CR) 。
IBM Z 和 IBM Power 目前不支持 Knative Kafka。
KnativeKafka
CR 为用户提供其他选项,例如:
- Kafka 源
- Kafka 频道
- Kafka 代理
- Kafka 接收器
8.2.1. 为 Kafka 源配置 SASL 身份验证
Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。
先决条件
- 在 OpenShift Container Platform 上具有集群或专用管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
CR 已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您有一个 Kafka 集群的用户名和密码。
-
您已选择使用 SASL 机制,例如
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。 -
如果启用了 TLS,您还需要 Kafka 集群的
ca.crt
证书文件。 -
已安装 OpenShift (
oc
) CLI。
流程
在所选命名空间中创建证书文件作为 secret:
$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ 1 --from-literal=user="my-sasl-user"
- 1
- SASL 类型可以是
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。
创建或修改 Kafka 源,使其包含以下
spec
配置:apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <kafka_auth_secret> key: user password: secretKeyRef: name: <kafka_auth_secret> key: password type: secretKeyRef: name: <kafka_auth_secret> key: saslType tls: enable: true caCert: 1 secretKeyRef: name: <kafka_auth_secret> key: ca.crt ...
- 1
- 如果您使用公有云 Kafka 服务,则不需要
caCert
spec,如 Red Hat OpenShift Streams for Apache Kafka。
8.2.2. 为 Kafka sink 配置安全性
Apache Kafka 客户端和服务器使用 传输层安全性 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。TLS 是 Knative Kafka 唯一支持的流量加密方法。
Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源(CR)已安装在 OpenShift Container Platform 集群中。 -
在
KnativeKafka
CR 中启用了 Kafka sink。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
您有一个 Kafka 集群 CA 证书存储为一个
.pem
文件。 -
您有一个 Kafka 集群客户端证书,并存储为
.pem
文件的密钥。 -
已安装 OpenShift (
oc
) CLI。 -
您已选择使用 SASL 机制,例如
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。
流程
在与
KafkaSink
对象相同的命名空间中创建一个 secret:重要证书和密钥必须采用 PEM 格式。
对于使用 SASL 时没有加密的身份验证:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SASL_PLAINTEXT \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-literal=user=<username> \ --from-literal=password=<password>
对于使用 TLS 的 SASL 和加密进行身份验证:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SASL_SSL \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-file=ca.crt=<my_caroot.pem_file_path> \ 1 --from-literal=user=<username> \ --from-literal=password=<password>
- 1
- 如果您使用公有云管理 Kafka 服务,可以省略
ca.crt
来使用系统的根 CA,如用于 Apache Kafka 的 Red Hat OpenShift Streams。
使用 TLS 进行身份验证和加密:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SSL \ --from-file=ca.crt=<my_caroot.pem_file_path> \ 1 --from-file=user.crt=<my_cert.pem_file_path> \ --from-file=user.key=<my_key.pem_file_path>
- 1
- 如果您使用公有云管理 Kafka 服务,可以省略
ca.crt
来使用系统的根 CA,如用于 Apache Kafka 的 Red Hat OpenShift Streams。
创建或修改
KafkaSink
对象,并在auth
spec 中添加对 secret 的引用:apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink metadata: name: <sink_name> namespace: <namespace> spec: ... auth: secret: ref: name: <secret_name> ...
应用
KafkaSink
对象:$ oc apply -f <filename>
8.2.3. 配置 Kafka 代理设置
您可以通过创建配置映射并在 Kafka Broker
对象中引用此配置映射,配置复制因素、bootstrap 服务器和 Kafka 代理的主题分区数量。
先决条件
- 在 OpenShift Container Platform 上具有集群或专用管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源(CR)已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
已安装 OpenShift CLI(
oc
)。
流程
修改
kafka-broker-config
配置映射,或创建自己的配置映射来包含以下配置:apiVersion: v1 kind: ConfigMap metadata: name: <config_map_name> 1 namespace: <namespace> 2 data: default.topic.partitions: <integer> 3 default.topic.replication.factor: <integer> 4 bootstrap.servers: <list_of_servers> 5
重要default.topic.replication.factor
值必须小于或等于集群中的 Kafka 代理实例数量。例如,如果您只有一个 Kafka 代理,则default.topic.replication.factor
值应该不超过"1"
。Kafka 代理配置映射示例
apiVersion: v1 kind: ConfigMap metadata: name: kafka-broker-config namespace: knative-eventing data: default.topic.partitions: "10" default.topic.replication.factor: "3" bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
应用配置映射:
$ oc apply -f <config_map_filename>
指定 Kafka
Broker
对象的配置映射:Broker 对象示例
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: <broker_name> 1 namespace: <namespace> 2 annotations: eventing.knative.dev/broker.class: Kafka 3 spec: config: apiVersion: v1 kind: ConfigMap name: <config_map_name> 4 namespace: <namespace> 5 ...
应用代理:
$ oc apply -f <broker_filename>
其他资源