第 7 章 在 Java 客户端中使用 serializers/deserializers 验证 Kafka 信息

Service Registry 为 Kafka producer 和使用 Java 编写的消费者应用程序提供客户端序列化程序/反序列化器(SerDes)。Kafka producer 应用程序使用序列化器对符合特定事件 schema 的消息进行编码。Kafka 用户应用程序使用 deserializers 验证消息已使用正确的模式序列化,具体取决于特定的模式 ID。这样可确保使用一致的模式,并帮助防止运行时的数据错误。

本章论述了如何在制作者和消费者应用程序中使用 Kafka 客户端 SerDes:

先决条件

7.1. Kafka 客户端应用程序和 Service Registry

Service Registry 将架构管理与客户端应用程序配置分离。您可以通过在客户端代码中指定其 URL,启用 Java 客户端应用程序从 Service Registry 中使用 schema。

您可以将模式存储在 registry 中,以序列化和反序列化消息,这些消息可以从客户端应用程序引用,以确保它们发送和接收的消息与这些架构兼容。Kafka 客户端应用程序可以在运行时从 Service Registry 中推送或拉取其模式。

模式可以演进,因此您可以在 Service Registry 中定义规则,例如确保模式更改有效且不会影响应用程序使用的旧版本。Service Registry 通过比较修改的 schema 与之前的 schema 版本进行比较,以检查兼容性。

Service Registry 架构技术

Service Registry 为 schema 技术提供架构 registry 支持,例如:

  • Avro
  • protobuf
  • JSON 架构

客户端应用程序可通过服务 Registry 提供的 SerDes (SerDes)服务使用这些模式技术。Service Registry 提供的 SerDes 类的成熟度和使用可能有所不同。后面的部分提供了有关每种 schema 类型的更多详情。

制作者模式配置

制作者客户端应用使用序列化器将消息放入特定的代理主题到正确的数据格式。

启用制作者以使用 Service Registry 序列化:

注册模式后,当您启动 Kafka 和 Service Registry 时,您可以通过制作者访问发送到 Kafka 代理的消息的 schema。另外,根据配置,制作者可以在第一次使用时注册该架构。

如果 schema 已存在,您可以根据 Service Registry 中定义的兼容性规则,使用 registry REST API 创建一个新版本。随着架构演进,版本用于兼容性检查。组 ID、工件 ID 和版本代表标识 schema 的唯一元组。

消费者架构配置

消费者客户端应用程序使用反序列化程序将信息从特定代理主题获取正确的数据格式。

启用消费者使用 Service Registry 进行反序列化:

使用全局 ID 检索 schema

默认情况下,通过 deserializer 使用一个全局 ID 从 Service Registry 中检索 schema,该 ID 在被使用的消息中指定。模式全局 ID 可以位于消息标头中,或消息有效负载中,具体取决于制作者应用的配置。

在消息有效负载中查找全局 ID 时,数据的格式以magic 字节开头,用作消费者信号,后跟全局 ID,并以正常方式的消息数据。例如:

# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]

然后,当您启动 Kafka 和 Service Registry 时,您可以访问 schema 来格式化从 Kafka 代理主题接收的消息。

使用内容 ID 检索 schema

另外,您还可以将 配置为基于内容 ID 从 Service Registry 检索 schema,这是工件内容的唯一 ID。虽然全局 ID 是工件版本的唯一 ID。

内容 ID 不唯一标识一个版本,只是唯一标识版本内容。如果多个版本共享相同的内容,则它们具有不同的全局 ID,但其内容 ID 相同。Confluent Schema Registry 默认使用内容 ID。