第13章 Debezium カスタムデータ型コンバーターの開発

重要

カスタム開発コンバーターの使用は、テクノロジープレビュー機能のみです。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は実稼働環境でこれらを使用することを推奨していません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行い、フィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

Debezium 変更イベントレコードの各フィールドは、ソーステーブルまたはデータコレクションのフィールドまたは列を表します。コネクターが変更イベントレコードを Kafka に出力すると、ソースの各フィールドのデータ型を Kafka Connect スキーマタイプに変換します。列の値は、宛先フィールドのスキーマ型に一致するように変換されます。コネクターごとに、デフォルトのマッピングはコネクターが各データ型を変換する方法を指定します。これらのデフォルトマッピングは、各コネクターのデータ型のドキュメントで説明されています。

通常、デフォルトのマッピングで十分ですが、一部のアプリケーションでは別のマッピングを適用する場合があります。たとえば、デフォルトのマッピングが UNIX エポック以降にミリ秒単位の形式で列をエクスポートする場合、カスタムマッピングが必要になる場合がありますが、ダウンストリームアプリケーションは列値をフォーマットされた文字列としてのみ使用できます。カスタムコンバーターを開発およびデプロイして、データ型マッピングをカスタマイズします。特定のタイプのすべての列で動作するようにカスタムコンバーターを設定するか、特定のテーブル列にのみ適用されるようにスコープを絞り込むことができます。コンバーター関数は、指定された基準に一致する列のデータ型変換リクエストをインターセプトし、指定の変換を実行します。コンバーターは、指定された基準に一致しない列を無視します。

カスタムコンバーターは、Debezium サービスプロバイダーインターフェイス(SPI)を実装する Java クラスです。コネクター設定で converters プロパティーを設定して、カスタムコンバーターを有効化および設定します。converters プロパティーは、コネクターが使用できるコンバーターを指定し、変換動作をさらに変更するサブプロパティーを含めることができます。

コネクターの起動後、コネクター設定で有効になっているコンバーターはインスタンス化され、レジストリーに追加されます。レジストリーは、各コンバーターを、処理する列またはフィールドに関連付けます。Debezium が新しい変更イベントを処理するたびに、設定されたコンバーターを呼び出して、登録される列またはフィールドを変換します。

13.1. Debezium カスタムデータ型コンバーターの作成

以下の例は、インターフェイス io.debezium.spi.converter.CustomConverter を実装する Java クラスのコンバーター実装を示しています。

public interface CustomConverter<S, F extends ConvertedField> {

    @FunctionalInterface
    interface Converter {  1
        Object convert(Object input);
    }

    public interface ConverterRegistration<S> { 2
        void register(S fieldSchema, Converter converter); 3
    }

    void configure(Properties props);

    void converterFor(F field, ConverterRegistration<S> registration); 4
}
1
あるタイプから別のタイプにデータを変換するための関数。
2
コンバーターを登録するためのコールバック。
3
現在のフィールドの指定のスキーマとコンバーターを登録します。同じフィールドに対して複数回呼び出すことはできません。
4
特定のフィールドで使用するカスタマイズされた値とスキーマコンバーターを登録します。

カスタムコンバーターメソッド

CustomConverter インターフェイスの実装には、以下のメソッドが含まれている必要があります。

configure()

コネクター設定に指定されたプロパティーをコンバーターインスタンスに渡します。configure メソッドは、コネクターが初期化されると実行されます。複数のコネクターでコンバーターを使用し、コネクターのプロパティー設定に基づいてその動作を変更できます。
configure メソッドは以下の引数を受け入れます。

props
コンバーターインスタンスに渡すプロパティーが含まれています。各プロパティーは、特定のタイプの列の値を変換するための形式を指定します。
converterFor()

コンバーターを登録して、データソースの特定の列またはフィールドを処理します。Debezium は converterFor () メソッドを呼び出すと、コンバーターが変換の 登録 を呼び出すよう求められます。converterFor メソッドは、各列に対して 1 回実行されます。
メソッドは以下の引数を受け入れます。

field
処理されるフィールドまたは列に関するメタデータを渡すオブジェクト。列のメタデータには、列またはフィールドの名前、テーブルまたはコレクションの名前、データタイプ、サイズなどを含めることができます。
登録 (registration)
ターゲットスキーマ定義と列データを変換するためのコードを提供する io.debezium.spi.converter.CustomConverter.ConverterRegistration タイプのオブジェクト。コンバーターは、ソース列が、コンバーターが処理するタイプと一致する場合に registration パラメーターを呼び出します。register メソッドを呼び出して、スキーマの各列のコンバーターを定義します。スキーマは、Kafka Connect SchemaBuilder API を使用して表されます。

13.1.1. Debezium カスタムコンバーターの例

以下の例は、以下の操作を実行する単純なコンバーターを実装します。

  • configure メソッドを実行し、コネクター設定に指定された schema.name プロパティーの値に基づいてコンバーターを設定します。コンバーター設定は、各インスタンスに固有のものです。
  • converterFor メソッドを実行し、データ型が isbn に設定されているソース列の値を処理するためにコンバーターを登録します。

    • schema.name プロパティーに指定された値に基づいてターゲット STRING スキーマを識別します。
    • ソース列の ISBN データを String 値に変換します。

例13.1 簡単なカスタムコンバーター

public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private SchemaBuilder isbnSchema;

    @Override
    public void configure(Properties props) {
        isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
    }

    @Override
    public void converterFor(RelationalColumn column,
            ConverterRegistration<SchemaBuilder> registration) {

        if ("isbn".equals(column.typeName())) {
            registration.register(isbnSchema, x -> x.toString());
        }
    }
}

13.1.2. Debezium および Kafka Connect API モジュールの依存関係

カスタムコンバーター Java プロジェクトには、Debezium API および Kafka Connect API ライブラリーモジュールへの依存関係がコンパイルされます。以下の例のように、これらのコンパイル依存関係はプロジェクトの pom.xml に含まれている必要があります。

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version> 1
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>${version.kafka}</version> 2
</dependency>
1
${version.debezium} は Debezium コネクターのバージョンを表します。
2
${version.kafka} は、環境内の Apache Kafka のバージョンを表します。