10.9. コンシューマークライアントからの Service Registry スキーマの使用

この手順では、Service Registry からのスキーマを使用するように Java コンシューマークライアントを設定する方法について説明します。

手順

  1. Service Registry の URL でクライアントを設定します。

    以下に例を示します。

    String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
        "https://my-cluster-service-registry-myproject.example.com/api");
    RegistryService service = RegistryClient.cached(registryUrl);
  2. Service Registry デシリアライザーサービスでクライアントを設定します。

    以下に例を示します。

    Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <> ( 1
        service,
        new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true)
    );
    Serde<LogInput> logSerde = Serdes.serdeFrom( 2
        new AvroKafkaSerializer<>(service),
        deserializer
    );
    KStream<String, LogInput> input = builder.stream( 3
        INPUT_TOPIC,
        Consumed.with(Serdes.String(), logSerde)
    );
    1
    Service Registry によって提供されるデシリアライザーサービス。
    2
    デシリアライズは Apache Avro JSON 形式です。
    3
    デシリアライズの入力データ。クライアントが使用するトピック値から派生します。