7.6. 使用 Kafka Streams 应用程序的 schema

此流程描述了如何配置以 Java 编写的 Kafka Streams 客户端,以使用 Service Registry 的 Apache Avro 模式。

先决条件

  • 已安装 Service Registry
  • 该 schema 使用 Service Registry 注册

流程

  1. 使用 Service Registry URL 创建并配置 Java 客户端:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    
    RegistryService client = RegistryClient.cached(registryUrl);
  2. 配置 serializer 和 deserializer:

    Serializer<LogInput> serializer = new AvroKafkaSerializer<LogInput>(); 1
    
    Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <LogInput>(); 2
    
    Serde<LogInput> logSerde = Serdes.serdeFrom(
        serializer,
        deserializer
    );
    
    Map<String, Object> config = new HashMap<>();
    config.put(SerdeConfig.REGISTRY_URL, registryUrl);
    config.put(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true);
    logSerde.configure(config, false); 3
    • 1. Service Registry 提供的 Avro serializer。
    • 2.Service Registry 提供的 Avro deserializer。
    • 3.以 Avro 格式配置 Service Registry URL 和 Avro 读取器进行反序列化。
  3. 创建 Kafka Streams 客户端:

    KStream<String, LogInput> input = builder.stream(
        INPUT_TOPIC,
        Consumed.with(Serdes.String(), logSerde)
    );