8.3. 如何配置不同的客户端序列化r/deserializer 类型
当在 Kafka 客户端应用程序中使用 schema 时,必须根据您的用例选择要使用的特定模式类型。Service Registry 为 Apache Avro、JSON 架构和 Google Protobuf 提供 SerDe Java 类。以下小节解释了如何配置 Kafka 应用程序以使用每种类型。
您还可以使用 Kafka 来实现自定义序列化和反序列化器类,并使用 Service Registry REST Java 客户端利用 Service Registry REST Java 客户端利用 Service Registry 功能。
用于 serializers/deserializers 的 Kafka 应用程序配置
使用 Kafka 应用程序中的 Service Registry 提供的 SerDe 类涉及设置正确的配置属性。以下简单的 Avro 示例演示了如何在 Kafka producer 应用程序中配置序列化器,以及如何在 Kafka 消费者应用程序中配置反序列化器。
Kafka producer 中的序列化器配置示例
// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
Properties props = new Properties();
// Configure standard Kafka settings
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
// Use Service Registry-provided Kafka serializer for Avro
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
// Configure the Service Registry location
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
// Register the schema artifact if not found in the registry.
props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
// Create the Kafka producer
Producer<Object, Object> producer = new KafkaProducer<>(props);
return producer;
}
Kafka 消费者中的 deserializer 配置示例
// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
Properties props = new Properties();
// Configure standard Kafka settings
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use Service Registry-provided Kafka deserializer for Avro
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
// Configure the Service Registry location
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
// No other configuration needed because the schema globalId the deserializer uses is sent
// in the payload. The deserializer extracts the globalId and uses it to look up the schema
// from the registry.
// Create the Kafka consumer
KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
return consumer;
}
其他资源
- 如需示例应用程序,请参阅 Simple Avro 示例
8.3.1. 使用 Service Registry 配置 Avro SerDes
本主题解释了如何使用 Apache Avro 的 Kafka 客户端序列化器(SerDes)类。
Service Registry 为 Avro 提供以下 Kafka 客户端 SerDes 类:
-
io.apicurio.registry.serde.avro.AvroKafkaSerializer -
io.apicurio.registry.serde.avro.AvroKafkaDeserializer
配置 Avro serializer
您可以使用以下方法配置 Avro serializer 类:
- Service Registry URL
- 工件解析器策略
- ID 位置
- ID 编码
- Avro datum 供应商
- Avro 编码
ID 位置
作为 Kafka 消息的一部分,序列化器会传递 schema 的唯一 ID,以便消费者使用正确的模式进行反序列化。ID 可以位于消息有效负载中,也可以在消息标头中进行。默认位置是消息有效负载。要在消息标头中发送 ID,请设置以下配置属性:
props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
属性名称为 apicurio.registry.headers.enabled。
ID 编码
您可以在 Kafka 消息正文中传递时,自定义 schema ID 的编码方式。将 apicurio.registry.id-handler 配置属性设置为实施 io.apicurio.registry.serde.IdHandler 接口的类。Service Registry 提供以下实现:
-
io.apicurio.registry.serde.DefaultIdHandler: 将 ID 存储为 8 字节长 -
io.apicurio.registry.serde.Legacy4ByteIdHandler: 将 ID 保存为 4 字节整数
Service Registry 代表 schema ID,但出于旧原因,或者出于与其它 registry 或 SerDe 类的兼容性,您可能想在发送 ID 时使用 4 字节。
Avro datum 供应商
Avro 提供不同的 datum 写入和读取数据。Service Registry 支持三种不同的类型:
- 通用
- 特定
- reflect
Service Registry AvroDatumProvider 是使用其类型的抽象,其中默认使用 DefaultAvroDatumProvider。
您可以设置以下配置选项:
-
apicurio.registry.avro-datum-provider: 指定AvroDatumProvider实施的完全限定域名,如io.apicurio.registry.serde.avro.ReflectAvroDatumProvider -
apicurio.registry.use- specific-avro-reader: 设置为true,在使用DefaultAvroDatumProvider时使用特定类型。
Avro 编码
当使用 Avro 序列化数据时,您可以使用 Avro 二进制编码格式来确保数据以高效格式编码。Avro 还支持将数据编码为 JSON,这有助于检查每条消息的有效负载,例如日志记录或调试。
您可以通过配置值为 JSON 或 BINARY 的 apicurio.registry.avro.encoding 属性来设置 Avro 编码。默认值为 BINARY。
配置 Avro deserializer
您必须将 Avro deserializer 类配置为与 serializer 的以下配置设置匹配:
- Service Registry URL
- ID 编码
- Avro datum 供应商
- Avro 编码
有关这些配置选项,请参阅 serializer 部分。属性名称和值相同。
配置 deserializer 时不需要以下选项:
- 工件解析器策略
- ID 位置
deserializer 类可以从消息中决定这些选项的值。策略并不是必需的,因为序列化器负责发送 ID 作为消息的一部分。
ID 位置是通过检查消息有效负载开始时的 magic 字节来确定的。如果找到该字节,则使用配置的处理程序从消息有效负载中读取 ID。如果没有找到音量字节,则 ID 会从邮件标头读取。
Avro SerDes 和 artifact 引用
在使用 Avro 消息和带有嵌套记录的模式时,会为每个嵌套记录注册一个新的工件。例如,以下 TradeKey 模式包括嵌套的 Exchange 模式:
带有嵌套交换模式的 TradeKey 模式
{
"namespace": "com.kubetrade.schema.trade",
"type": "record",
"name": "TradeKey",
"fields": [
{
"name": "exchange",
"type": "com.kubetrade.schema.common.Exchange"
},
{
"name": "key",
"type": "string"
}
]
}
Exchange schema
{
"namespace": "com.kubetrade.schema.common",
"type": "enum",
"name": "Exchange",
"symbols" : ["GEMINI"]
}
当将这些方案与 Avro SerDes 搭配使用时,在 Service Registry 中创建两个工件,一个用于 TradeKey 模式,另一个用于 Exchange 模式。每当使用 TradeKey 模式的消息被序列化或反序列化时,都会检索这两个模式,以便您将定义分成不同的文件中。
其他资源
- 有关 Avro 配置的详情,请查看 AvroKafkaSerdeConfig Java 类
对于 Java 示例应用程序,请参阅: