第10章 Service Registry を使用したスキーマの管理

本章では、AMQ Streams をデプロイし Red Hat Service Registry と統合する方法について解説します。Service Registry は、データストリーミングのサービススキーマの集中型ストアとして使用できます。

Service Registry は、多くの標準的なアーティファクトタイプのストレージおよび管理をサポートします。たとえば、Kafka では、AVRO または JSON を基にしてスキーマ定義を使用できます。

Service Registry は、REST API および Java REST クライアントを提供し、サーバー側のエンドポイントを介してクライアントアプリケーションからスキーマを登録およびクエリーします。Service Registry Web コンソールを使用して、スキーマを直接閲覧および更新することもできます。プロデューサークライアントおよびコンシューマークライアントが Service Registry を使用するように設定できます。

Maven プラグインも提供されるので、ビルドの一部としてスキーマをアップロードおよびダウンロードできます。スキーマの更新がクライアントアプリケーションと互換性があることを確認する場合、Maven プラグインはテストおよび検証に役立ちます。

その他のリソース

10.1. Service Registry を使用する理由

Service Registry を使用すると、クライアントアプリケーションの設定からスキーマ管理のプロセスが分離されます。クライアントコードに URL を指定して、アプリケーションがレジストリーからスキーマを使用できるようにします。

たとえば、メッセージをシリアライズおよびデシリアライズするスキーマをレジストリーに保存できます。保存後、スキーマを使用するアプリケーションから参照され、アプリケーションが送受信するメッセージがこれらのスキーマと互換性を維持するようにします。

Kafka クライアントアプリケーションは実行時にスキーマを Service Registry からプッシュまたはプルできます。

スキーマは進化するので、Service Registry でルールを定義できます。たとえば、スキーマへの変更が有効で、アプリケーションによって使用される以前のバージョンとの互換性を維持するようにします。Service Registry は、変更済みのスキーマと前バージョンのスキーマを比較することで、互換性をチェックします。

Service Registry は Avro スキーマのスキーマレジストリーを完全にサポートします。Avro スキーマは、Service Registry で提供される Kafka クライアントのシリアライザー/デシリアライザー (SerDe) サービスを通じてクライアントアプリケーションによって使用されます。

10.2. プロデューサースキーマの設定

プロデューサークライアントアプリケーションは、シリアライザーを使用して、特定のブローカートピックに送信するメッセージを正しいデータ形式にします。

プロデューサーが Service Registry を使用してシリアライズできるようにするには、以下を行います。

スキーマを登録したら、Kafka および Service Registry を開始するときに、スキーマにアクセスして、プロデューサーにより Kafka ブローカートピックに送信されるメッセージをフォーマットできます。

スキーマがすでに存在する場合、Service Registry に定義される互換性ルールに基づいて、REST API により新バージョンのスキーマを作成できます。バージョンは、スキーマの進化にともなう互換性チェックに使用します。アーティファクト ID およびスキーマバージョンは、スキーマを識別する一意のタプルを表します。

10.3. コンシューマースキーマの設定

コンシューマークライアントアプリケーションは、デシリアライザーを使用することで、そのアプリケーションが消費するメッセージを特定のブローカートピックから正しいデータ形式にします。

コンシューマーがデシリアライズに Service Registry を使用できるようにするには、以下を実行します。

次に、消費されるメッセージに書き込まれたグローバル ID を使用して、デシリアライザーによってスキーマが取得されます。このため、受信されるメッセージにはグローバル ID およびメッセージデータが含まれる必要があります。

以下に例を示します。

# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]

これで、Kafka および Service Registry を開始するとき、スキーマにアクセスして、Kafka ブローカートピックから受信するメッセージをフォーマットできます。

10.4. スキーマ検索のストラテジー

Service Registry ストラテジー は、Service Registry でメッセージスキーマが登録されるアーティファクト ID またはグローバル ID を判断するために、Kafka クライアントシリアライザー/デシリアライザーによって使用されます。

特定のトピックおよびメッセージで、以下の Java クラスの実装を使用できます。

  • ArtifactIdStrategy、アーティファクト ID を返します。
  • GlobalIdStrategy、グローバル ID を返します。

返されるアーティファクト ID は、メッセージの キー または のどちらがシリアライズされるかによって異なります。

ストラテジー のクラスは、io.apicurio.registry.utils.serde.strategy パッケージにまとめられています。

デフォルトのストラテジー、TopicIdStrategy は、メッセージを受信する Kafka トピックと同じ名前の Service Registry アーティファクトを検索します。

以下に例を示します。

public String artifactId(String topic, boolean isKey, T schema) {
    return String.format("%s-%s", topic, isKey ? "key" : "value");
}
  • topic パラメーターは、メッセージを受信する Kafka トピックの名前です。
  • isKey パラメーター は、メッセージキーがシリアライズされる場合は true、メッセージ値がシリアライズされる場合は false です。
  • schema パラメーターは、シリアライズ/デシリアライズされるメッセージのスキーマです。
  • 返される artifactID は、スキーマが Service Registry に登録される ID です。

使用する検索アップストラテジーは、スキーマを保存する方法と場所によって異なります。たとえば、同じ Avro メッセージタイプを持つ Kafka トピックが複数ある場合、レコード ID を使用するストラテジーを使用することがあります。

アーティファクト ID を返すストラテジー

これらのストラテジーは、ArtifactIdStrategy の実装に基づいてアーティファクト ID を返します。

RecordIdStrategy
スキーマのフルネームを使用する Avro 固有のストラテジー。
TopicRecordIdStrategy
トピック名およびスキーマのフルネームを使用する Avro 固有のストラテジー。
TopicIdStrategy
(デフォルト) トピック名と、key または value 接尾辞を使用するストラテジー。
SimpleTopicIdStrategy
トピック名のみを使用する単純なストラテジー。

グローバル ID を返すストラテジー

これらのストラテジーは、GlobalIdStrategy の実装に基づいてグローバル ID を返します。

FindLatestIdStrategy
アーティファクト ID に基づいて最新のスキーマバージョンのグローバル ID を返すストラテジー。
FindBySchemaIdStrategy
アーティファクト ID に基づいてスキーマコンテンツと一致する、グローバル ID を返すストラテジー。
GetOrCreateIdStrategy
アーティファクト ID に基づいて最新スキーマの取得を試み、スキーマが存在しなければ新規スキーマを作成するストラテジー。
AutoRegisterIdStrategy
スキーマを更新し、更新されたスキーマのグローバル ID を使用するストラテジー。

10.5. Service Registry の定数

このセクションで概説する定数を使用して、特定のクライアントの SerDe サービスおよびスキーマ検索ストラテジーを直接クライアントに設定できます。

または、プロパティーファイルまたはプロパティーインスタンスで定数を指定することもできます。

シリアライザー/デシリアライザー (SerDe) サービスの定数

public abstract class AbstractKafkaSerDe<T extends AbstractKafkaSerDe<T>> implements AutoCloseable {
      protected final Logger log = LoggerFactory.getLogger(getClass());

      public static final String REGISTRY_URL_CONFIG_PARAM = "apicurio.registry.url"; 1
      public static final String REGISTRY_CACHED_CONFIG_PARAM = "apicurio.registry.cached"; 2
      public static final String REGISTRY_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.id-handler"; 3
      public static final String REGISTRY_CONFLUENT_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.as-confluent"; 4
1
(必須) Service Registry の URL。
2
クライアントがリクエストを実行し、以前の結果のキャッシュから情報を検索して処理時間を短縮できるようにします。キャッシュが空の場合、検索は Service Registry から実行されます。
3
ID 処理を拡張することで、他の ID 形式をサポートし、その形式に Service Registry SerDe サービスとの互換性を持たせます。たとえば、ID 形式を Long から Integer に変更すると Confluent ID 形式がサポートされます。
4
Confluent ID の処理を簡素化するフラグ。true に設定すると、Integer がグローバル ID の検索に使用されます。

検索ストラテジーの定数

public abstract class AbstractKafkaStrategyAwareSerDe<T, S extends AbstractKafkaStrategyAwareSerDe<T, S>> extends AbstractKafkaSerDe<S> {
      public static final String REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.artifact-id"; 1
      public static final String REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.global-id"; 2

コンバーターの定数

public class SchemalessConverter<T> extends AbstractKafkaSerDe<SchemalessConverter<T>> implements Converter {
      public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; 1
      public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; 2
1
(必須) コンバーターと使用するシリアライザー。
2
(必須) コンバーターと使用するデシリアライザー。

Avro データプロバイダーの定数

public interface AvroDatumProvider<T> {
      String REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM = "apicurio.registry.avro-datum-provider"; 1
      String REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM = "apicurio.registry.use-specific-avro-reader"; 2
1
スキーマにデータを書き込む Avro データプロバイダー。リフレクションを使用する場合としない場合があります。
2
Avro 固有のデータリーダーの使用を設定するフラグ。
DefaultAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 1
ReflectAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 2
1
デフォルトのデータリーダー。
2
リフレクションを使用するデータリーダー。

10.6. Service Registry のインストール

AMQ Streams ストレージで Service Registry をインストールする手順は、Service Registry のドキュメントを参照してください。

クラスターの設定に応じて、複数の Service Registry インスタンスをインストールできます。インスタンス数は、使用するストレージタイプと、処理する必要のあるスキーマの数によって異なります。

10.7. スキーマの Service Registry への登録

スキーマを Apache Avro などの適切な形式で定義したら、スキーマを Service Registry に追加できます。

スキーマは以下を使用して追加できます。

  • Service Registry Web コンソール
  • Service Registry API を使用する curl コマンド
  • Service Registry に付属する Maven プラグイン
  • クライアントコードに加えられたスキーマ設定

スキーマを登録するまでは、クライアントアプリケーションは Service Registry を使用できません。

Service Registry Web コンソール

Service Registry をインストールしたら、ui エンドポイントから Web コンソールに接続します。

http://MY-REGISTRY-URL/ui

コンソールから、スキーマを追加、表示、および設定できます。また、レジストリーに無効なコンテンツが追加されないようにするルールを作成することもできます。

Service Registry Web コンソールの使用に関する詳細は、Service Registry のドキュメント を参照してください。

curl の例

curl -X POST -H "Content-type: application/json; artifactType=AVRO" \
  -H "X-Registry-ArtifactId: prices-value" \
  --data '{ 1
      "type":"record",
      "name":"price",
      "namespace":"com.redhat",
      "fields":[{"name":"symbol","type":"string"},
      {"name":"price","type":"string"}]
    }'
  https://my-cluster-service-registry-myproject.example.com/api/artifacts -s 2
1
Avro スキーマ
2
Service Registry を公開する OpenShift ルート名

プラグインの例

<plugin>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-maven-plugin</artifactId>
<version>${registry.version}</version>
<executions>
  <execution>
    <phase>generate-sources</phase>
    <goals>
      <goal>register</goal>
    </goals>
    <configuration>
      <registryUrl>https://my-cluster-service-registry-myproject.example.com/api</registryUrl>
      <artifactType>AVRO</artifactType>
      <artifacts>
        <schema1>${project.basedir}/schemas/schema1.avsc</schema1>
      </artifacts>
    </configuration>
  </execution>
</executions>
</plugin>

(プロデューサー) クライアントによる設定例

String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", 1
    "https://my-cluster-service-registry-myproject.example.com/api");
try (RegistryService service = RegistryClient.create(registryUrl_node1)) {
    String artifactId = ApplicationImpl.INPUT_TOPIC + "-value";
    try {
        service.getArtifactMetaData(artifactId); 2
    } catch (WebApplicationException e) {
        CompletionStage <ArtifactMetaData> csa = service.createArtifact(
            ArtifactType.AVRO,
            artifactId,
            new ByteArrayInputStream(LogInput.SCHEMA$.toString().getBytes())
        );
        csa.toCompletableFuture().get();
    }
}
1
プロパティーが登録されています。複数のノードに対してプロパティーを登録できます。
2
アーティファクト ID に基づいてスキーマがすでに存在しているかを確認します。

10.8. プロデューサークライアントからの Service Registry スキーマの使用

この手順では、Service Registry からのスキーマを使用するように Java プロデューサークライアントを設定する方法について説明します。

手順

  1. Service Registry の URL でクライアントを設定します。

    以下に例を示します。

    String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
        "https://my-cluster-service-registry-myproject.example.com/api");
    RegistryService service = RegistryClient.cached(registryUrl);
  2. クライアントをシリアライザーサービスで設定し、Service Registry でスキーマを検索するようにストラテジーを設定します。

    以下に例を示します。

    String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
        "https://my-cluster-service-registry-myproject.example.com/api");
    
        clientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, property(clientProperties, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"));
        clientProperties.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, registryUrl_node1); 1
        clientProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 2
        clientProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 3
        clientProperties.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, FindLatestIdStrategy.class.getName()); 4
    1
    Service Registry の URL。
    2
    Service Registry により提供されるメッセージ キー のシリアライザーサービス。
    3
    Service Registry により提供されるメッセージ のシリアライザーサービス。
    4
    スキーマのグローバル ID を検索する検索ストラテジー。Service Registry のグローバル ID (アーティファクト ID およびスキーマバージョン) に対してメッセージのスキーマを照合します。

10.9. コンシューマークライアントからの Service Registry スキーマの使用

この手順では、Service Registry からのスキーマを使用するように Java コンシューマークライアントを設定する方法について説明します。

手順

  1. Service Registry の URL でクライアントを設定します。

    以下に例を示します。

    String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
        "https://my-cluster-service-registry-myproject.example.com/api");
    RegistryService service = RegistryClient.cached(registryUrl);
  2. Service Registry デシリアライザーサービスでクライアントを設定します。

    以下に例を示します。

    Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <> ( 1
        service,
        new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true)
    );
    Serde<LogInput> logSerde = Serdes.serdeFrom( 2
        new AvroKafkaSerializer<>(service),
        deserializer
    );
    KStream<String, LogInput> input = builder.stream( 3
        INPUT_TOPIC,
        Consumed.with(Serdes.String(), logSerde)
    );
    1
    Service Registry によって提供されるデシリアライザーサービス。
    2
    デシリアライズは Apache Avro JSON 形式です。
    3
    デシリアライズの入力データ。クライアントが使用するトピック値から派生します。