第13章 Debezium カスタムデータ型コンバーターの開発
カスタム開発コンバーターの使用は、テクノロジープレビュー機能のみです。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は実稼働環境でこれらを使用することを推奨していません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行い、フィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
Debezium 変更イベントレコードの各フィールドは、ソーステーブルまたはデータコレクションのフィールドまたは列を表します。コネクターが変更イベントレコードを Kafka に出力すると、ソースの各フィールドのデータ型を Kafka Connect スキーマタイプに変換します。列の値は、宛先フィールドのスキーマ型に一致するように変換されます。コネクターごとに、デフォルトのマッピングはコネクターが各データ型を変換する方法を指定します。これらのデフォルトマッピングは、各コネクターのデータ型のドキュメントで説明されています。
通常、デフォルトのマッピングで十分ですが、一部のアプリケーションでは別のマッピングを適用する場合があります。たとえば、デフォルトのマッピングが UNIX エポック以降にミリ秒単位の形式で列をエクスポートする場合、カスタムマッピングが必要になる場合がありますが、ダウンストリームアプリケーションは列値をフォーマットされた文字列としてのみ使用できます。カスタムコンバーターを開発およびデプロイして、データ型マッピングをカスタマイズします。特定のタイプのすべての列で動作するようにカスタムコンバーターを設定するか、特定のテーブル列にのみ適用されるようにスコープを絞り込むことができます。コンバーター関数は、指定された基準に一致する列のデータ型変換リクエストをインターセプトし、指定の変換を実行します。コンバーターは、指定された基準に一致しない列を無視します。
カスタムコンバーターは、Debezium サービスプロバイダーインターフェイス(SPI)を実装する Java クラスです。コネクター設定で converters プロパティーを設定して、カスタムコンバーターを有効化および設定します。converters プロパティーは、コネクターが使用できるコンバーターを指定し、変換動作をさらに変更するサブプロパティーを含めることができます。
コネクターの起動後、コネクター設定で有効になっているコンバーターはインスタンス化され、レジストリーに追加されます。レジストリーは、各コンバーターを、処理する列またはフィールドに関連付けます。Debezium が新しい変更イベントを処理するたびに、設定されたコンバーターを呼び出して、登録される列またはフィールドを変換します。
13.1. Debezium カスタムデータ型コンバーターの作成
以下の例は、インターフェイス io.debezium.spi.converter.CustomConverter を実装する Java クラスのコンバーター実装を示しています。
public interface CustomConverter<S, F extends ConvertedField> {
@FunctionalInterface
interface Converter { 1
Object convert(Object input);
}
public interface ConverterRegistration<S> { 2
void register(S fieldSchema, Converter converter); 3
}
void configure(Properties props);
void converterFor(F field, ConverterRegistration<S> registration); 4
}カスタムコンバーターメソッド
CustomConverter インターフェイスの実装には、以下のメソッドが含まれている必要があります。
configure()コネクター設定に指定されたプロパティーをコンバーターインスタンスに渡します。
configureメソッドは、コネクターが初期化されると実行されます。複数のコネクターでコンバーターを使用し、コネクターのプロパティー設定に基づいてその動作を変更できます。configureメソッドは以下の引数を受け入れます。props- コンバーターインスタンスに渡すプロパティーが含まれています。各プロパティーは、特定のタイプの列の値を変換するための形式を指定します。
converterFor()コンバーターを登録して、データソースの特定の列またはフィールドを処理します。Debezium
は converterFor ()メソッドを呼び出すと、コンバーターが変換の登録を呼び出すよう求められます。converterForメソッドは、各列に対して 1 回実行されます。
メソッドは以下の引数を受け入れます。field- 処理されるフィールドまたは列に関するメタデータを渡すオブジェクト。列のメタデータには、列またはフィールドの名前、テーブルまたはコレクションの名前、データタイプ、サイズなどを含めることができます。
登録 (registration)-
ターゲットスキーマ定義と列データを変換するためのコードを提供する
io.debezium.spi.converter.CustomConverter.ConverterRegistrationタイプのオブジェクト。コンバーターは、ソース列が、コンバーターが処理するタイプと一致する場合にregistrationパラメーターを呼び出します。registerメソッドを呼び出して、スキーマの各列のコンバーターを定義します。スキーマは、Kafka ConnectSchemaBuilderAPI を使用して表されます。
13.1.1. Debezium カスタムコンバーターの例
以下の例は、以下の操作を実行する単純なコンバーターを実装します。
-
configureメソッドを実行し、コネクター設定に指定されたschema.nameプロパティーの値に基づいてコンバーターを設定します。コンバーター設定は、各インスタンスに固有のものです。 converterForメソッドを実行し、データ型がisbnに設定されているソース列の値を処理するためにコンバーターを登録します。-
schema.nameプロパティーに指定された値に基づいてターゲットSTRINGスキーマを識別します。 -
ソース列の ISBN データを
String値に変換します。
-
例13.1 簡単なカスタムコンバーター
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private SchemaBuilder isbnSchema;
@Override
public void configure(Properties props) {
isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
}
@Override
public void converterFor(RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
if ("isbn".equals(column.typeName())) {
registration.register(isbnSchema, x -> x.toString());
}
}
}13.1.2. Debezium および Kafka Connect API モジュールの依存関係
カスタムコンバーター Java プロジェクトには、Debezium API および Kafka Connect API ライブラリーモジュールへの依存関係がコンパイルされます。以下の例のように、これらのコンパイル依存関係はプロジェクトの pom.xml に含まれている必要があります。
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version> 1
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${version.kafka}</version> 2
</dependency>