8.4.4. 升级 Kafka 代理和客户端应用程序
这个步骤描述了如何将 AMQ Streams Kafka 集群升级到最新支持的 Kafka 版本。
与您当前的 Kafka 版本相比,新版本可能支持更高的 日志消息格式版本 或代理协议版本,或两者。如果需要,请按照步骤升级这些版本。如需更多信息,请参阅 第 8.4.1 节 “Kafka 版本”。
您还应该选择 升级客户端的策略。Kafka 客户端在此流程的第 6 步中升级。
先决条件
要升级 Kafka
资源,请检查:
- 支持两个 Kafka 版本的 Cluster Operator 已启动且正在运行。
-
Kafka.spec.kafka.config
不包含 新 Kafka 版本中不支持的选项。
流程
更新 Kafka 集群配置:
oc edit kafka my-cluster
如果配置了,请确保
Kafka.spec.kafka.config
的 log.message.format
设置为 当前 Kafka 版本的默认值。.version
和 inter.broker.protocol.version例如,如果从 Kafka 版本 2.7.0 升级到 2.8.0:
kind: Kafka spec: # ... kafka: version: 2.7.0 config: log.message.format.version: "2.7" inter.broker.protocol.version: "2.7" # ...
如果没有配置
log.message.format
,AMQ Streams 会在下一步中将这些版本更新至 Kafka 版本后自动将这些版本更新至当前的默认值。.version
和 inter.broker.protocol.version注意log.message.format.version
和inter.broker.protocol.version
的值必须是字符串,以防止它们被解释为浮动点数。更改
Kafka.spec.kafka.version
以指定新的 Kafka 版本;保留log.message.format.version
和inter.broker.protocol.version
,默认为 当前 Kafka 版本。注意更改
kafka.version
可确保集群中的所有代理都将升级为使用新的代理二进制文件。在此过程中,一些代理使用旧二进制文件,另一些则已升级至新二进制文件。不要更改inter.broker.protocol.version
,确保代理可以在升级过程中继续相互通信。例如,如果从 Kafka 2.7.0 升级到 2.8.0:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 2.8.0 1 config: log.message.format.version: "2.7" 2 inter.broker.protocol.version: "2.7" 3 # ...
警告如果新 Kafka 版本的
inter.broker.protocol.version
发生变化,则无法降级 Kafka。broker 协议版本决定了代理存储的持久元数据的模式,包括写入到__consumer_offset
的消息。降级集群将不会理解消息。如果在 Kafka 自定义资源(在 Kafka
.spec.kafka.image 中)中定义了 Kafka
集群的镜像,请将镜像
更新为指向具有新 Kafka 版本的容器镜像。请参阅 Kafka 版本和镜像映射
保存并退出编辑器,然后等待滚动更新完成。
通过观察 pod 状态转换来检查滚动更新的进度:
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
滚动更新可确保每个 pod 在新版本的 Kafka 中使用代理二进制文件。
根据您选择的 升级客户端策略,升级所有客户端应用以使用新版本的客户端二进制文件。
如果需要,将 Kafka Connect 和 MirrorMaker 的
version
属性设置为 Kafka 的新版本:-
对于 Kafka Connect,更新
KafkaConnect.spec.version
。 -
对于 MirrorMaker,更新
KafkaMirrorMaker.spec.version
。 -
对于 MirrorMaker 2.0,更新
KafkaMirrorMaker2.spec.version
。
-
对于 Kafka Connect,更新
如果配置,更新 Kafka 资源,以使用新的
inter.broker.protocol.version
版本。否则,请转至第 9 步。例如,如果升级到 Kafka 2.8.0:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 2.8.0 config: log.message.format.version: "2.7" inter.broker.protocol.version: "2.8" # ...
- 等待 Cluster Operator 更新集群。
如果配置,更新 Kafka 资源,以使用新的
log.message.format.version
版本。否则,请转至第 10 步。例如,如果升级到 Kafka 2.8.0:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 2.8.0 config: log.message.format.version: "2.8" inter.broker.protocol.version: "2.8" # ...
等待 Cluster Operator 更新集群。
- Kafka 集群和客户端现在使用新的 Kafka 版本。
- 代理配置为使用 Kafka 的新版本的 Inter-broker 协议版本和消息格式发送消息。
在 Kafka 升级后,如果需要,您可以: