8.3. 如何配置不同的客户端序列化r/deserializer 类型

当在 Kafka 客户端应用程序中使用 schema 时,必须根据您的用例选择要使用的特定模式类型。Service Registry 为 Apache Avro、JSON 架构和 Google Protobuf 提供 SerDe Java 类。以下小节解释了如何配置 Kafka 应用程序以使用每种类型。

您还可以使用 Kafka 来实现自定义序列化和反序列化器类,并使用 Service Registry REST Java 客户端利用 Service Registry REST Java 客户端利用 Service Registry 功能。

用于 serializers/deserializers 的 Kafka 应用程序配置

使用 Kafka 应用程序中的 Service Registry 提供的 SerDe 类涉及设置正确的配置属性。以下简单的 Avro 示例演示了如何在 Kafka producer 应用程序中配置序列化器,以及如何在 Kafka 消费者应用程序中配置反序列化器。

Kafka producer 中的序列化器配置示例

// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use Service Registry-provided Kafka serializer for Avro
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());

    // Configure the Service Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // Register the schema artifact if not found in the registry.
    props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}

Kafka 消费者中的 deserializer 配置示例

// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use Service Registry-provided Kafka deserializer for Avro
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

    // Configure the Service Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // No other configuration needed because the schema globalId the deserializer uses is sent
    // in the payload. The deserializer extracts the globalId and uses it to look up the schema
    // from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}

其他资源

8.3.1. 使用 Service Registry 配置 Avro SerDes

本主题解释了如何使用 Apache Avro 的 Kafka 客户端序列化器(SerDes)类。

Service Registry 为 Avro 提供以下 Kafka 客户端 SerDes 类:

  • io.apicurio.registry.serde.avro.AvroKafkaSerializer
  • io.apicurio.registry.serde.avro.AvroKafkaDeserializer

配置 Avro serializer

您可以使用以下方法配置 Avro serializer 类:

  • Service Registry URL
  • 工件解析器策略
  • ID 位置
  • ID 编码
  • Avro datum 供应商
  • Avro 编码

ID 位置

作为 Kafka 消息的一部分,序列化器会传递 schema 的唯一 ID,以便消费者使用正确的模式进行反序列化。ID 可以位于消息有效负载中,也可以在消息标头中进行。默认位置是消息有效负载。要在消息标头中发送 ID,请设置以下配置属性:

props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")

属性名称为 apicurio.registry.headers.enabled

ID 编码

您可以在 Kafka 消息正文中传递时,自定义 schema ID 的编码方式。将 apicurio.registry.id-handler 配置属性设置为实施 io.apicurio.registry.serde.IdHandler 接口的类。Service Registry 提供以下实现:

  • io.apicurio.registry.serde.DefaultIdHandler: 将 ID 存储为 8 字节长
  • io.apicurio.registry.serde.Legacy4ByteIdHandler: 将 ID 保存为 4 字节整数

Service Registry 代表 schema ID,但出于旧原因,或者出于与其它 registry 或 SerDe 类的兼容性,您可能想在发送 ID 时使用 4 字节。

Avro datum 供应商

Avro 提供不同的 datum 写入和读取数据。Service Registry 支持三种不同的类型:

  • 通用
  • 特定
  • reflect

Service Registry AvroDatumProvider 是使用其类型的抽象,其中默认使用 DefaultAvroDatumProvider

您可以设置以下配置选项:

  • apicurio.registry.avro-datum-provider: 指定 AvroDatumProvider 实施的完全限定域名,如 io.apicurio.registry.serde.avro.ReflectAvroDatumProvider
  • apicurio.registry.use- specific-avro-reader: 设置为 true,在使用 DefaultAvroDatumProvider时使用特定类型。

Avro 编码

当使用 Avro 序列化数据时,您可以使用 Avro 二进制编码格式来确保数据以高效格式编码。Avro 还支持将数据编码为 JSON,这有助于检查每条消息的有效负载,例如日志记录或调试。

您可以通过配置值为 JSONBINARYapicurio.registry.avro.encoding 属性来设置 Avro 编码。默认值为 BINARY

配置 Avro deserializer

您必须将 Avro deserializer 类配置为与 serializer 的以下配置设置匹配:

  • Service Registry URL
  • ID 编码
  • Avro datum 供应商
  • Avro 编码

有关这些配置选项,请参阅 serializer 部分。属性名称和值相同。

注意

配置 deserializer 时不需要以下选项:

  • 工件解析器策略
  • ID 位置

deserializer 类可以从消息中决定这些选项的值。策略并不是必需的,因为序列化器负责发送 ID 作为消息的一部分。

ID 位置是通过检查消息有效负载开始时的 magic 字节来确定的。如果找到该字节,则使用配置的处理程序从消息有效负载中读取 ID。如果没有找到音量字节,则 ID 会从邮件标头读取。

Avro SerDes 和 artifact 引用

在使用 Avro 消息和带有嵌套记录的模式时,会为每个嵌套记录注册一个新的工件。例如,以下 TradeKey 模式包括嵌套的 Exchange 模式:

带有嵌套交换模式的 TradeKey 模式

{
  "namespace": "com.kubetrade.schema.trade",
  "type": "record",
  "name": "TradeKey",
  "fields": [
    {
      "name": "exchange",
      "type": "com.kubetrade.schema.common.Exchange"
    },
    {
      "name": "key",
      "type": "string"
    }
  ]
}

Exchange schema

{
  "namespace": "com.kubetrade.schema.common",
  "type": "enum",
  "name": "Exchange",
  "symbols" : ["GEMINI"]
}

当将这些方案与 Avro SerDes 搭配使用时,在 Service Registry 中创建两个工件,一个用于 TradeKey 模式,另一个用于 Exchange 模式。每当使用 TradeKey 模式的消息被序列化或反序列化时,都会检索这两个模式,以便您将定义分成不同的文件中。

其他资源