第8章 アプリケーションの Debezium コネクターの設定

デフォルトの Debezium コネクター動作がアプリケーションに適したではない場合、以下の Debezium 機能を使用して必要な動作を設定できます。

  • ByLogicalTableRouter SMT は、データ変更イベントレコードを指定したトピックに再度ルーティングします。
  • ExtractNewRecordState SMT は、一部の Kafka コンシューマーで必要とされる可能性のある単純な形式で、データ変更イベントレコードの複雑な構造をフラット化します。
  • PostgreSQL、MongoDB、または SQL Server コネクターの Avro シリアライズ を設定すると、イベントレコードコンシューマーの変更がレコードスキーマに対応できるようになります。
  • CloudEventsConverter では、Debezium コネクターは CloudEvents 仕様に準拠する変更イベントレコードを出力できます。

8.1. 指定したトピックへのルーティング変更イベントレコード

データ変更イベントが含まれる各 Kafka レコードには、デフォルトの宛先トピックがあります。必要な場合は、レコードが Kafka Connect コンバーターに到達する前に指定したトピックにレコードを再ルート指定できます。そのため、Debezium は SMT(Single ByLogicalTableRouter Message transformation)を提供します。Debezium コネクターの Kafka Connect 設定でこの変換を設定します。設定オプションを使用すると、以下を指定できます。

  • 再ルートを作成するレコードを識別する式
  • 宛先トピックに解決する式
  • 宛先トピックに再ルーティングされるレコード間の一意の鍵を確保する方法

変換設定が必要な動作を提供するのはユーザー次第です。Debezium は、変換の設定から生じる動作を検証しません。

ByLogicalTableRouter 変換は Kafka Connect SMT です。

詳細は以下のトピックを参照してください。

8.1.1. 指定したトピックへのルーティングレコードのユースケース

デフォルトの動作では、Debezium コネクターは、名前がデータベースの名前から形成されているトピックと、変更が行われたテーブルの名前が Debezium コネクターによって各変更イベントレコードを送信することです。つまり、トピックは 1 つの物理テーブルのレコードを受け取ります。トピックで複数の物理テーブルのレコードを受信する場合、Debezium コネクターがそのトピックにレコードを再ルーティングするように設定する必要があります。

論理テーブル

論理表は、複数の物理テーブルのルーティングレコードの一般的なユースケースです。論理表には、スキーマがすべて同じスキーマを持つ複数の物理テーブルがあります。たとえば、シャード化されたテーブルには同じスキーマがあります。論理テーブルは、db_shard1.my_table およびという 2 つ以上のシャードテーブルで構成される場合があります db_shard2.my_table。テーブルはシャードごとに異なり、物理的には異なりますが、それらが論理テーブルを形成します。シャードのテーブルのテーブルの変更イベントレコードを同じトピックに再ルーティングできます。

パーティション化された PostgreSQL テーブル

Debezium PostgreSQL コネクターがパーティション設定されたテーブルの変更を取得する場合、イベントレコードの変更が各パーティションに対して異なるトピックにルーティングされることがデフォルトの動作になります。すべてのパーティションから 1 つのトピックにレコードを出力するには、ByLogicalTableRouter SMT を設定します。パーティションテーブルの各キーは一意であることが保証されるので、SMT がキーを固有のキー key.enforce.uniqueness=false を確保するようにキーフィールドを追加しないようにを設定します。キーフィールドを追加するのはデフォルトの動作です。

8.1.2. 複数テーブルのルーティングレコードの例

複数の物理テーブルのイベントレコードを同じトピックにルーティングするには、Debezium コネクターの Kafka Connect 設定で ByLogicalTableRouter 変換を設定します。ByLogicalTableRouter SMT の設定では、以下を判断する正規表現を指定する必要があります。

  • レコードをルーティングするテーブル。これらのテーブルはすべて同じスキーマを持つ必要があります。
  • 宛先トピック名。

たとえば、.properties ファイルの設定は以下のようになります。

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex

変換が各変更イベントレコードに適用され、特定のトピックにルーティングするかどうかを判断する正規表現を指定します。

この例では、正規表現は名前に customers_shard 文字列が含まれるテーブルへの変更の記録 (.)customers_shard(.) と一致します。これにより、以下の名前を持つテーブルのレコードを再ルート化します。

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

topic.replacement
宛先トピック名を表す正規表現を指定します。変換は、各一致するレコードをこの式で識別したトピックにルーティングします。この例では、上記の 3 つのシャードテーブルのレコードは myserver.mydb.customers_all_shards トピックにルーティングされます。

8.1.3. 同じトピックにルーティングされるレコード全体で一意の鍵を確保

Debezium Change イベントキーは、テーブルのプライマリーキーを構成するテーブル列を使用します。複数の物理テーブルのレコードを 1 つのトピックにルーティングするには、イベントキーはそれらのすべてのテーブル全体で一意である必要があります。ただし、各物理テーブルには、そのテーブル内でのみ一意のプライマリーキーを設定できます。たとえば、テーブルの行には、myserver.mydb.customers_shard1 テーブルの行と同じキーの値がある場合があり myserver.mydb.customers_shard2 ます。

変更イベントレコードが同じトピックにあるテーブル全体で各イベントキーが一意になるように、ByLogicalTableRouter 変換によってフィールドが変更されイベントキーに挿入されます。デフォルトでは、挿入されたフィールドの名前はです __dbz__physicalTableIdentifier。挿入されたフィールドの値は、デフォルトの宛先トピック名です。

必要に応じて、別のフィールドをキーに挿入するように ByLogicalTableRouter 変換を設定できます。これを実行するには、key.field.name オプションを指定し、既存のプライマリーキーフィールド名と競合しないフィールド名に設定します。以下に例を示します。

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
transforms.Reroute.key.field.name=shard_id

この例では、shard_id フィールドをルーティングされたレコードのキー構造に追加します。

キーの新しいフィールドの値を調整するには、以下のいずれかのオプションを設定します。

key.field.regex
変換がデフォルトの宛先トピック名に適用され、1 つ以上の文字グループをキャプチャーする正規表現を指定します。
key.field.replacement
取得したグループの観点から、挿入されたキーフィールドの値を決定する正規表現を指定します。

以下に例を示します。

transforms.Reroute.key.field.regex=(.*)customers_shard(.*)
transforms.Reroute.key.field.replacement=$2

この設定では、宛先のデフォルトのトピック名が以下のようになります。

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

変換では、2 番目のキャプチャーグループ(シャード番号)の値をキーの新しいフィールドの値として使用します。この例では、挿入された key フィールドの値は、12、またはです 3

テーブルにグローバルに一意の鍵が含まれ、キー構造を変更する必要がない場合は、key.enforce.uniqueness プロパティーをに設定し falseます。

...
transforms.Reroute.key.enforce.uniqueness=false
...

8.1.4. ByLogicalTableRouter 変換を設定するオプション

プロパティー

デフォルト

説明

topic.regex

 

変換が各変更イベントレコードに適用され、特定のトピックにルーティングするかどうかを判断する正規表現を指定します。

topic.replacement

 

宛先トピック名を表す正規表現を指定します。変換は、各一致するレコードをこの式で識別したトピックにルーティングします。この式は、に指定する正規表現によってキャプチャーされるグループを参照でき topic.regexます。グループを参照するには、、$1 $2、などを指定します。

key.enforce.uniqueness

true

レコードの変更イベントキーにフィールドを追加するかどうかを示します。キーフィールドを追加すると、変更イベントレコードが同じトピックになるテーブル全体で各イベントキーが一意になります。これは、同じキーを持つものの、異なるソーステーブルから生成されたレコードの変更イベントの競合を防ぐのに役立ちます。

変換でキーフィールドを追加したくない false かどうかを指定します。たとえば、パーティション化された PostgreSQL テーブルから 1 つのトピックにルーティングレコードがある場合には、パーティションが設定された PostgreSQL テーブルで一意の鍵 key.enforce.uniqueness=false が保証されているように設定できます。

key.field.name

__dbz__physicalTableIdentifier

変更イベントキーに追加するフィールドの名前。このフィールドの値は、元のテーブル名を識別します。SMT でこのフィールドを追加するには、がデフォルトであるで trueある key.enforce.uniqueness 必要があります。

key.field.regex

 

変換がデフォルトの宛先トピック名に適用され、1 つ以上の文字グループをキャプチャーする正規表現を指定します。SMT がこの式を適用するには、がデフォルトであるで trueある key.enforce.uniqueness 必要があります。

key.field.replacement

 

に指定された式がキャプチャーしたグループの形式で、挿入されたキーフィールドの値を決定する正規表現を指定し key.field.regexます。SMT がこの式を適用するには、がデフォルトであるで trueある key.enforce.uniqueness 必要があります。