8.3. さまざまなクライアントシリアライザー/デシリアライザータイプの設定方法
Kafka クライアントアプリケーションでスキーマを使用する場合は、ユースケースに応じて、使用する特定のスキーマタイプを選択する必要があります。Service Registry は、Apache Avro、JSON スキーマ、および Google Protobuf 用の SerDe Java クラスを提供します。以下のセクションでは、各タイプを使用するように Kafka アプリケーションを設定する方法を説明します。
また、Kafka を使用してカスタムシリアライザーおよびデシリアライザークラスを実装し、Service Registry REST Java クライアントを使用して Service Registry 機能を活用することもできます。
シリアライザー/デシリアライザーの Kafka アプリケーション設定
Kafka アプリケーションで Service Registry によって提供される SerDe クラスを使用するには、正しい設定プロパティーを設定する必要があります。以下の簡単な Avro の例は、Kafka プロデューサーアプリケーションでシリアライザーを設定する方法と、Kafka コンシューマーアプリケーションでデシリアライザーを設定する方法を示しています。
Kafka プロデューサーのシリアライザー設定の例
// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
Properties props = new Properties();
// Configure standard Kafka settings
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
// Use Service Registry-provided Kafka serializer for Avro
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
// Configure the Service Registry location
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
// Register the schema artifact if not found in the registry.
props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
// Create the Kafka producer
Producer<Object, Object> producer = new KafkaProducer<>(props);
return producer;
}
Kafka コンシューマーのデシリアライザー設定の例
// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
Properties props = new Properties();
// Configure standard Kafka settings
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use Service Registry-provided Kafka deserializer for Avro
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
// Configure the Service Registry location
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
// No other configuration needed because the schema globalId the deserializer uses is sent
// in the payload. The deserializer extracts the globalId and uses it to look up the schema
// from the registry.
// Create the Kafka consumer
KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
return consumer;
}
関連情報
- アプリケーションの例については Simple Avro example を参照してください。
8.3.1. Service Registry を使用した Avro SerDes の設定
このトピックでは、Apache Avro の Kafka クライアントシリアライザーおよびデシリアライザー (SerDes) クラスを使用する方法について説明します。
Service Registry は、Avro 用に次の Kafka クライアント SerDes クラスを提供します。
-
io.apicurio.registry.serde.avro.AvroKafkaSerializer -
io.apicurio.registry.serde.avro.AvroKafkaDeserializer
Avro シリアライザーの設定
以下のように Avro シリアライザークラスを設定することができます。
- Service Registry の URL
- アーティファクトリーゾルバーストラテジー
- ID の場所
- ID エンコーディング
- Avro datum プロバイダー
- Avro エンコーディング
ID の場所
シリアライザーは、スキーマの一意の ID を Kafka メッセージの一部として渡し、コンシューマーがデシリアライズに正しいスキーマを使用できるようにします。ID は、メッセージのペイロードまたはメッセージヘッダーに存在できます。デフォルトの場所はメッセージペイロードです。メッセージヘッダーで ID を送信するには、以下の設定プロパティーを設定します。
props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")
プロパティー名は apicurio.registry.headers.enabled です。
ID エンコーディング
Kafka メッセージボディーに渡すときにスキーマ ID をエンコードする方法をカスタマイズできます。apicurio.registry.id-handler 設定プロパティーを、io.apicurio.registry.serde.IdHandler インターフェイスを実装するクラスに設定します。Service Registry は以下の実装を提供します。
-
io.apicurio.registry.serde.DefaultIdHandler:ID を 8 バイト長として格納します -
io.apicurio.registry.serde.Legacy4ByteIdHandler:ID を 4 バイト整数として格納します
Service Registry はスキーマ ID を long として表しますが、従来の理由、または他のレジストリーまたは SerDe クラスとの互換性のため、ID の送信時に 4 バイトの使用が推奨される場合があります。
Avro datum プロバイダー
Avro は、データを読み書きするためのさまざまなデータライターとリーダーを提供します。Service Registry は、3 つの異なるタイプをサポートします。
- Generic
- Specific
- Reflect
Service Registry AvroDatumProvider は、使用されるタイプの抽象概念であり、DefaultAvroDatumProvider がデフォルトで使用されます。
以下の設定オプションを設定できます。
-
apicurio.registry.avro-datum-provider:AvroDatumProvider実装の完全修飾 Java クラス名を指定します (例えば、io.apicurio.registry.serde.avro.ReflectAvroDatumProvider)。 -
apicurio.registry.use-specific-avro-reader:DefaultAvroDatumProviderを使用するときに特定のタイプを使用するには、trueに設定します
Avro エンコーディング
Avro を使用してデータをシリアライズする場合は、Avro バイナリーエンコーディング形式を使用して、データを可能な限り効率的な形式でエンコードすることができます。Avro は JSON としてデータのエンコードもサポートします。これにより、ロギングやデバッグなどの各メッセージのペイロードの検証が容易になります。
apicurio.registry.avro.encoding プロパティーを JSON または BINARY の値で設定することにより、Avro エンコーディングを設定できます。デフォルトは BINARY です。
Avro デシリアライザーの設定
Avro デシリアライザーの設定設定を、シリアライザーの設定と一致するように、Avro デシリアライザークラスを設定する必要があります。
- Service Registry の URL
- ID エンコーディング
- Avro datum プロバイダー
- Avro エンコーディング
これらの設定オプションは、シリアライザーセクションを参照してください。プロパティー名と値は同じです。
デシリアライザーの設定時には、以下のオプションは必要ありません。
- アーティファクトリーゾルバーストラテジー
- ID の場所
デシリアライザークラスは、メッセージからこれらのオプションの値を判断できます。シリアライザーはメッセージの一部として ID を送信するため、ストラテジーは必要ありません。
ID の位置は、メッセージペイロードの先頭にあるマジックバイトを確認することで決定されます。そのバイトが見つかると、設定されたハンドラーを使用してメッセージペイロードから ID が読み取られます。マジックバイトが見つからない場合、ID はメッセージヘッダーから読み込まれます。
Avro SerDes とアーティファクトの参照
レコードがネスト化された Avro メッセージとスキーマを使用する場合に、ネストされたレコードごとに新しいアーティファクトが登録されます。たとえば、次の TradeKey スキーマには、ネストされた Exchange スキーマが含まれています。
ネストされた Exchange スキーマを持つ TradeKey スキーマ
{
"namespace": "com.kubetrade.schema.trade",
"type": "record",
"name": "TradeKey",
"fields": [
{
"name": "exchange",
"type": "com.kubetrade.schema.common.Exchange"
},
{
"name": "key",
"type": "string"
}
]
}
交換スキーマ
{
"namespace": "com.kubetrade.schema.common",
"type": "enum",
"name": "Exchange",
"symbols" : ["GEMINI"]
}
これらのスキーマを Avro SerDes で使用すると、Service Registry に、TradeKey スキーマ用と、Exchange スキーマ用の 2 つのアーティファクトが作成されます。TradeKey スキーマを使用するメッセージがシリアライズまたはデシリアライズされるたびに、両方のスキーマが取得されるため、定義を異なるファイルに分割できます。
関連情報
- Avro 設定の詳細は、AvroKafkaSerdeConfig Java クラス を参照してください。
Java のサンプルアプリケーションについては、以下を参照してください。