3.6.6. Debezium Db2 コネクター設定プロパティーの説明

Debezium Db2 コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。

必要な Debezium Db2 コネクター設定プロパティー

以下の設定プロパティーは、デフォルト値がない場合は必須です。

プロパティーデフォルト説明

name

デフォルトなし

コネクターの一意名。同じ名前で再登録を試みると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。

connector.class

デフォルトなし

コネクターの Java クラスの名前。Db2 コネクターには、常に io.debezium.connector.db2.Db2Connector の値を使用します。

tasks.max

1

このコネクターのために作成する必要のあるタスクの最大数。Db2 コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。

database.hostname

デフォルトなし

Db2 データベースサーバーの IP アドレスまたはホスト名。

database.port

50000

Db2 データベースサーバーの整数のポート番号。

database.user

デフォルトなし

Db2 データベースサーバーに接続するための Db2 データベースユーザーの名前。

database.password

デフォルトなし

Db2 データベースサーバーへの接続時に使用するパスワード。

database.dbname

デフォルトなし

変更をストリーミングする Db2 データベースの名前

database.server.name

デフォルトなし

Debezium が変更をキャプチャーするデータベースをホストする特定の Db2 データベースサーバーの namespace を特定および提供する論理名。データベースサーバーの論理名には英数字とハイフン、ドット、アンダースコアのみを使用する必要があります。論理名は、他のコネクター全体で一意となる必要があります。これは、このコネクターからレコードを受信するすべての Kafka トピックのトピック名接頭辞として使用されるためです。

table.include.list

デフォルトなし

コネクターで変更をキャプチャーするテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。include リストに含まれていないテーブルの変更はキャプチャーされません。各識別子の形式は schemaName.tableName です。デフォルトでは、コネクターはシステム以外のテーブルすべての変更をキャプチャーします。また、table.exclude.list プロパティーを設定しないでください。

table.exclude.list

デフォルトなし

コネクターで変更をキャプチャーしないテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。コネクターは exclude リストに含まれていないシステム以外のテーブルごとに変更をキャプチャーします。各識別子の形式は schemaName.tableName です。また、table.include.list プロパティーを設定しないでください。

column.exclude.list

空の文字列

変更イベント値から除外する列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。プライマリーキー列は、値から除外された場合でも、イベントのキーに常に含まれます。

column.mask.hash.hashAlgorithm.with.salt.salt

該当なし

文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。作成された変更イベントレコードでは、指定された列の値は仮名に置き換えられます。

仮名は、指定された hashAlgorithmsalt を適用すると得られるハッシュ化された値で設定されます。使用されるハッシュ関数に基づいて、参照整合性は維持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest section に説明されています。

以下の例では、CzQMA0cB5K が無作為に選択された salt になります。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。

使用される hashAlgorithm、選択された salt、および実際のデータセットによっては、結果として得られるデータセットが完全にマスクされないことがあります。

time.precision.mode

adaptive

時間、日付、およびタイムスタンプは、異なる精度の種類で表すことができます。

adaptive は、データベース列の型を基にして、ミリ秒、マイクロ秒、またはナノ秒の精度値のいずれかを使用して、データベースの値と全く同じように時間とタイムスタンプをキャプチャーします。

connect は、Kafka Connect の TimeDate、および Timestamp の組み込み表現を使用して、常に時間とタイムスタンプ値を表し、データベース列の精度に関わらず、ミリ秒の精度を使用します。時間的価値 を参照します。

tombstones.on.delete

true

廃棄 (tombstone) イベントの後に削除イベントが続くかどうかを制御します。

true - 削除操作は、削除 イベントと後続の破棄 (tombstone) イベントで表されます。

false - delete イベントのみが出力されます。

log compaction がトピックで有効になっている場合には、ソースレコードの削除後に廃棄 (tombstone) イベントを出力すると (デフォルト動作)、Kafka は削除された行のキーに関連するすべてのイベントを完全に削除できます。

include.schema.changes

true

コネクターがデータベーススキーマの変更を、データベースサーバー ID と同じ名前の Kafka トピックに公開するかどうかを指定するブール値。各スキーマの変更は、データベース名が含まれるキーと、スキーマ更新を記述する JSON 構造である値で記録されます。これは、コネクターがデータベース履歴を内部で記録する方法には依存しません。

column.truncate.to._length_.chars

該当なし

文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。変更イベントレコードでは、これらの列の値がプロパティー名の 長さ によって指定される文字数よりも長い場合は切り捨てられます。単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。長さは正の整数である必要があります (例:column.truncate.to.20.chars )。

column.mask.with._length_.chars

該当なし

文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。変更イベント値では、指定のテーブルコラムの値はアスタリスク (*) の 長さ (数) に置き換えられます。単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。長さは正の整数またはゼロでなければなりません。ゼロを指定すると、コネクターは値を空の文字列に置き換えます。

column.propagate.source.type

該当なし

列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は、databaseName.tableName.columnName または databaseName.schemaName.tableName.columnName です。

コネクターは指定された各列に対して、列の元の型と元の長さをパラメーターとして、出力された変更レコードの対応するフィールドスキーマに追加します。以下の追加されたスキーマパラメーターは、元の型名と可変幅型の元の長さを伝播します。

__debezium.source.column.type
__debezium.source.column.length
__debezium.source.column.scale

このプロパティーは、シンクデータベースの対応するコラムのサイズを適切に調整する場合に便利です。

datatype.propagate.source.type

該当なし

一部の列のデータベース固有のデータ型名と一致する正規表現のコンマ区切りリスト (任意)。完全修飾データ型名の形式は、databaseName.tableName.typeName または databaseName.schemaName.tableName.typeName です。

これらのデータタイプでは、コネクターは出力された変更レコードの対応するフィールドスキーマにパラメーターを追加します。追加されたパラメーターは、列の元の型と長さを指定します。

__debezium.source.column.type
__debezium.source.column.length
__debezium.source.column.scale

これらのパラメーターは、それぞれ可変幅型の列の元の型名と長さを伝播します。このプロパティーは、シンクデータベースの対応する列のサイズを適切に調整するのに便利です。

Db2 固有のデータ型名の一覧は、Db2 data types を参照してください。

message.key.columns

空の文字列

指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。

デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。

テーブルのカスタムメッセージキーを作成するには、テーブルとメッセージキーとして使用する列をリストします。各リストエントリーは以下の形式を取ります。

<fully-qualified_tableName>:_<keyColumn>_,<keyColumn>

複数の列名をベースにテーブルキーを作成するには、列名の間にコンマを挿入します。
各完全修飾テーブル名は、以下の形式の正規表現です。

<schemaName>.<tableName>

プロパティーは複数のテーブルのエントリーをリストできます。リスト内の異なるテーブルのエントリーは、セミコロンを使用して、区切ります。

以下の例では、テーブル inventory.customers および buy orders:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

のメッセージキーを設定します。上記の例では、pk1pk2 列はテーブル inventory.customer のメッセージキーとして指定されます。スキーマで purchaseorders を解決する場合には、列 pk3pk4 はメッセージキーとして機能します。

高度なコネクター設定プロパティー

以下の 高度な 設定プロパティーには、ほとんどの状況で機能するデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。

プロパティーデフォルト説明

snapshot.mode

Initial

コネクターの起動時にスナップショットを実行する基準を指定します。

initial - キャプチャーモードのテーブルの場合、コネクターはテーブルとそのテーブルのデータのスナップショットを作成します。これは、データの完全な表現で Kafka トピックに入力する場合に便利です。

schema_only - キャプチャーモードのテーブルの場合、コネクターはテーブルのスキーマのみのスナップショットを作成します。これは、現時点以降に発生する変更のみを Kafka トピックに出力する必要がある場合に便利です。スナップショットの完了後、コネクターはデータベースのやり直し (redo) ログから変更イベントの読み取りを続行します。

snapshot.isolation.mode

repeatable_read

スナップショットの実行中に、トランザクション分離レベルとキャプチャーモードのテーブルをロックする期間を制御します。使用できる値は次のとおりです。

read_uncommitted - 最初のスナップショットの実行中に、他のトランザクションによるテーブル行の更新を防ぎません。このモードでは、データの整合性は保証されず、一部のデータが損失または破損する可能性があります。

read_committed - 最初のスナップショットの実行中に、他のトランザクションによるテーブル行の更新を防ぎません。新しいレコードが初回のスナップショットで 1 回、ストリーミングフェーズで 1 回の計 2 回発生する可能性があります。しかし、この整合性レベルはデータのミラーリングに適しています。

repeatable_read - 最初のスナップショットの実行中に、他のトランザクションがテーブル行を更新しないようにします。新しいレコードが初回のスナップショットで 1 回、ストリーミングフェーズで 1 回の計 2 回発生する可能性があります。しかし、この整合性レベルはデータのミラーリングに適しています。

exclusive - 繰り返し可能な読み取り分離レベルを使用しますが、すべてのテーブルを読み取るために排他的ロックを使用します。このモードは、最初のスナップショットの実行中に他のトランザクションがテーブル行を更新しないようにします。exclusive モードのみが完全な整合性を保証し、最初のスナップショットとログのストリーミングが履歴の線形を設定します。

event.processing.failure.handling.mode

fail

イベントの処理中にコネクターが例外を処理する方法を指定します。使用できる値は次のとおりです。

fail - コネクターは問題のあるイベントのオフセットをログに記録し、処理を停止します。

warn - コネクターは問題のあるイベントのオフセットをログに記録し、次のイベントの処理を続行します。

skip - コネクターは問題のあるイベントをスキップし、次のイベントの処理を続行します。

poll.interval.ms

1000

コネクターがイベントのバッチの処理を開始する前に、新しい変更イベントの発生を待つ期間をミリ秒単位で指定する正の整数値。デフォルトは 1000 ミリ秒 (1 秒) です。

max.queue.size

8192

ブロッキングキューの最大サイズの正の整数値。コネクターは、データベースログから読み取る変更イベントをブロッキングキューに配置してから Kafka に書き込みます。このキューは、たとえば Kafka へのレコードの書き込みが遅い場合や Kafka が利用できない場合などに、変更データテーブルを読み取るためのバックプレシャーを提供できます。キューに表示されるイベントは、コネクターによって定期的に記録されるオフセットには含まれません。max.queue.size の値は常に max.batch.size コネクター設定プロパティーの値よりも大きくする必要があります。

max.batch.size

2048

コネクターが処理するイベントの各バッチの最大サイズを指定する正の整数値。

max.queue.size.in.bytes

0

ブロッキングキューの最大サイズ (バイト単位) の long 値。この機能はデフォルトで無効になっています。正の long 値が設定されると有効になります。

heartbeat.interval.ms

0

コネクターがハートビートメッセージを Kafka トピックに送信する頻度を制御します。デフォルトの動作では、コネクターはハートビートメッセージを送信しません。

ハートビートメッセージは、コネクターがデータベースから変更イベントを受信しているかどうかを監視するのに便利です。ハートビートメッセージは、コネクターの再起動時に再送信する必要がある変更イベントの数を減らすのに役立つ可能性があります。ハートビートメッセージを送信するには、このプロパティーを、ハートビートメッセージの間隔をミリ秒単位で示す正の整数に設定します。

ハートビートメッセージは、追跡されているデータベースには多くの更新があるにも関わらず、キャプチャーモードのテーブルにある更新はわずかである場合に便利です。この場合、コネクターは通常どおりにデータベーストランザクションログから読み取りしますが、変更レコードを Kafka に出力することはほとんどありません。そのため、コネクターが最新のオフセットを Kafka に送信することはほとんどありません。ハートビートメッセージを送信すると、コネクターは最新のオフセットを Kafka に送信できます。

heartbeat.topics.prefix

__debezium-heartbeat

コネクターがハートビートメッセージを送信するトピック名の接頭辞を指定します。このトピック名の形式は <heartbeat.topics.prefix>.<server.name> です。

snapshot.delay.ms

デフォルトなし

コネクターの起動時にスナップショットを実行するまでコネクターが待つ必要がある間隔 (ミリ秒単位)。クラスターで複数のコネクターを起動する場合、このプロパティーは、コネクターのリバランスが行われる原因となるスナップショットの中断を防ぐのに役立ちます。

snapshot.fetch.size

2000

スナップショットの実行中、コネクターは行のバッチでテーブルの内容を読み取ります。このプロパティーは、バッチの行の最大数を指定します。

snapshot.lock.timeout.ms

10000

スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する正の整数値。コネクターがこの間隔でテーブルロックを取得できないと、スナップショットは失敗します。詳細は、コネクターによるスナップショットの実行方法 を参照してください。他の設定可能な値は以下のとおりです。

0 - ロックを取得できないとすぐに失敗します。

-1 - コネクターは永久に待機します。

snapshot.select.statement.overrides

デフォルトなし

スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。

プロパティーには、<schemaName>.<tableName> の形式で完全修飾テーブル名のコンマ区切りリストが含まれます。たとえば、

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

をリスト内の各テーブルに対して、スナップショットを作成する場合には、その他の設定プロパティーを追加して、コネクターがテーブルで実行するように SELECT ステートメントを指定します。指定した SELECT ステートメントは、スナップショットに追加するテーブル行のサブセットを決定します。以下の形式を使用して、この SELECT ステートメントプロパティーの名前 (

snapshot.select.statement.overrides.<schemaName>.<tableName>) を指定します。例: snapshot.select.statement.overrides.customers.orders.

例:

スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 (delete_flag ) を含む customers.orders テーブルから、以下のプロパティーを追加します。

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"

作成されるスナップショットでは、コネクターには delete_flag = 0 のレコードのみが含まれます。

sanitize.field.names

コネクターが key.converter または value.converter プロパティーを Avro コンバーターに設定する場合は true に設定します。

そうでない場合は false に設定します。

Avro の命名要件 に準拠するためにフィールド名がサニタイズされるかどうかを示します。

provide.transaction.metadata

false

コネクターがトランザクション境界でイベントを生成し、トランザクションメタデータで変更イベントエンベロープを強化するかどうかを決定します。コネクターにこれを実行させる場合は true を指定します。詳細は、Transaction metadata を参照してください。

skipped.operations

デフォルトなし

ストリーミング中にスキップされる操作タイプのコンマ区切りリスト。操作には、c (挿入/作成)、u (更新)、および d (削除) が含まれます。デフォルトでは、操作はスキップされません。

signal.data.collection

デフォルトなし

シグナルをコネクターへの送信に使用されるデータコレクションの完全修飾名
<schemaName>.<tableName> の形式を使用してコレクション名を指定します。

シグナル機能はテクノロジープレビュー機能です。

incremental.snapshot.chunk.size

1024

増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。

増分スナップショットはテクノロジープレビュー機能です。

Debezium コネクターデータベース履歴設定プロパティー

Debezium には、コネクターがスキーマ履歴トピックと対話する方法を制御する database.history.* プロパティーのセットが含まれています。

以下の表は、Debezium コネクターを設定するための database.history プロパティーについて説明しています。

表3.16 コネクターデータベース履歴設定プロパティー

プロパティーデフォルト説明

database.history.kafka.topic

 

コネクターがデータベーススキーマの履歴を保存する Kafka トピックの完全名。

database.history.kafka.bootstrap.servers

 

Kafka クラスターへの最初の接続を確立するために コネクターが使用するホストとポートのペアの一覧。このコネクションは、コネクターによって以前に保存されたデータベーススキーマ履歴の取得や、ソースデータベースから読み取られる各 DDL ステートメントの書き込みに使用されます。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。

database.history.kafka.recovery.poll.interval.ms

100

永続化されたデータのポーリングが行われている間にコネクターが起動/回復を待つ最大時間 (ミリ秒単位) を指定する整数値。デフォルトは 100 ミリ秒です。

database.history.kafka.recovery.attempts

4

エラーでコネクターのリカバリーが失敗する前に、コネクターが永続化された履歴データの読み取りを試行する最大回数。データが受信されなかった場合に最大待機する時間は、recovery.attempts x recovery.poll.interval.ms です。

database.history.skip.unparseable.ddl

false

コネクターが不正または不明なデータベースのステートメントを無視するかどうか、または人が問題を修正するために処理を停止するかどうかを指定するブール値。安全なデフォルトは false です。スキップは、binlog の処理中にデータの損失や分割を引き起こす可能性があるため、必ず注意して使用する必要があります。

database.history.store.only.monitored.tables.ddl

今後のリリースで非推奨になり、削除される予定です。代わりに database.history.store.only.captured.tables.ddl を使用してください。

false

コネクターがすべての DDL ステートメントを記録するかどうかを指定するブール値

true は、変更が Debezium によってキャプチャーされるテーブルに関連する DDL ステートメントのみを記録します。変更がキャプチャーされるテーブルを変更すると、不足しているデータが必要になる可能性があるため、は、不足しているデータが必要になるため、注意して true に設定してください。

安全なデフォルトは false です。

database.history.store.only.captured.tables.ddl

false

コネクターがすべての DDL ステートメントを記録するかどうかを指定するブール値

true は、変更が Debezium によってキャプチャーされるテーブルに関連する DDL ステートメントのみを記録します。変更がキャプチャーされるテーブルを変更すると、不足しているデータが必要になる可能性があるため、は、不足しているデータが必要になるため、注意して true に設定してください。

安全なデフォルトは false です。

プロデューサーおよびコンシューマークライアントを設定するためのパススルーデータベース履歴プロパティー


Debezium は、Kafka プロデューサーを使用して、データベース履歴トピックにスキーマの変更を書き込みます。同様に、コネクターが起動すると、データベース履歴トピックから読み取る Kafka コンシューマーに依存します。database.history.producer.* および database.history.consumer.* 接頭辞で始まるパススルー設定プロパティーのセットに値を割り当てて、Kafka プロデューサーおよびコンシューマークライアントの設定を定義します。パススループロデューサーおよびコンシューマーデータベース履歴プロパティーは、以下の例のように Kafka ブローカーとのこれらのクライアントの接続をセキュアにする方法など、さまざまな動作を制御します。

database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234

database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234

Debezium は、プロパティーを Kafka クライアントに渡す前に、プロパティー名から接頭辞を削除します。

Kafka プロデューサー設定プロパティー および Kafka コンシューマー設定プロパティーの詳細は、Kafka のドキュメントを参照してください。

Debezium コネクターのパススルーデータベースドライバー設定プロパティー

Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは、接頭辞 database.* で始まります。たとえば、コネクターは database.foobar=false などのプロパティーを JDBC URL に渡します。

データベース履歴クライアントのパススループロパティー の場合のように、Debezium はプロパティーから接頭辞を削除してからデータベースドライバーに渡します。