2.3.3. コネクタープロパティー

以下の設定プロパティーは、デフォルト値がない場合は必須です。

プロパティーデフォルト説明

name

 

コネクターの一意名。同じ名前で再登録を試みると失敗します。(このプロパティーはすべての Kafka Connect コネクターに必要です)

connector.class

 

コネクターの Java クラスの名前。Postgre SQL コネクターには、常に io.debezium.connector.postgresql.PostgresConnector の値を使用してください。

tasks.max

1

このコネクターのために作成する必要のあるタスクの最大数。PostgreSQL コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。

plugin.name

decoderbufs

サーバーにインストールされている Postgres 論理デコードプラグイン の名前。サポートされている値は pgoutput のみです。

処理されたトランザクションが非常に大きい場合は、トランザクションにすべての変更が含まれる JSON バッチイベントが、サイズが 1 GB のハードコーディングされたメモリーバッファーに収まらないことがあります。このような場合は、トランザクションの変更がすべて PostgreSQL から Debezium とは別のメッセージとして送信されると、so-called streaming モードに切り替えることができます。

slot.name

debezium

プラグインおよびデータベースインスタンスから変更をストリーミングするために作成される Postgres 論理デコードスロットの名前。値は Postgres レプリケーションスロットの命名ルール に準拠する必要があります。各レプリケーションスロットには名前があり、これには小文字、数字、およびアンダースコア文字が含まれます。"

slot.drop.on.stop

false

コネクターが順番に終了したときに論理レプリケーションスロットをドロップするかどうか。テストまたは開発環境でのみ true に設定する必要があります。スロットを削除すると、WAL セグメントをデータベースで破棄できるため、再起動後にコネクターが停止した WAL の位置から再開できないことがあります。

publication.name

dbz_publication

pgoutput の使用時に変更をストリーミングするために作成される PostgreSQL パブリケーションの名前。

すべてのテーブル を含めるためにこのパブリケーションがまだ存在しない場合は、起動時にこのパブリケーションが作成されます。その後、Debezium は独自の white-/blacklist フィルターリング機能を使用して、変更イベントを該当するテーブル(設定されている場合)に制限します。コネクターユーザーはこのパブリケーションを作成するにはスーパーユーザー権限が必要であるため、通常はパブリケーションを事前に作成することが推奨されます。

パブリケーションがすでに存在する場合(すべてのテーブルまたはテーブルのサブセットのいずれかに対して)、Debezium は代わりに定義された通りにパブリケーションを使用します。

database.hostname

 

PostgreSQL データベースサーバーの IP アドレスまたはホスト名。

database.port

5432

PostgreSQL データベースサーバーのポート番号 (整数)。

database.user

 

PostgreSQL データベースサーバーへの接続時に使用する PostgreSQL データベースの名前。

database.password

 

PostgreSQL データベースサーバーへの接続時に使用するパスワード。

database.dbname

 

変更をストリーミングする PostgreSQL データベースの名前

database.server.name

 

監視対象の特定の PostgreSQL データベースサーバー/クラスターの namespace を識別および提供する論理名。論理名は、他のコネクター全体で一意となる必要があります。これは、このコネクターからのすべての Kafka トピック名の接頭辞として使用されるためです。英数字とアンダースコアのみを使用する必要があります。

schema.whitelist

 

監視するスキーマ名と一致する正規表現のコンマ区切りリスト(任意)。ホワイトリストに含まれていないスキーマ名は監視から除外されます。デフォルトでは、システム以外のスキーマがすべて監視されます。schema.blacklist と併用できません。

schema.blacklist

 

監視から除外されるスキーマ名と一致する正規表現のコンマ区切りリスト(任意)。ブラックリストに含まれていないスキーマ名は、システムスキーマを除き監視されます。schema.whitelist と併用できません。

table.whitelist

 

監視するテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト(任意)。ホワイトリストに含まれていないテーブルはすべて監視から除外されます。各識別子の形式は schemaName.tableName です。デフォルトでは、コネクターは監視される各スキーマのシステム以外のテーブルをすべて監視します。table.blacklist と併用できません。

table.blacklist

 

監視から除外されるテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト(任意)。ブラックリストに含まれていないテーブルはすべて監視されます。各識別子の形式は schemaName.tableName です。table.whitelist と併用できません。

column.blacklist

 

変更イベントメッセージの値から除外される必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName です。tableName.columnName

time.precision.mode

adaptive

時間、日付、およびタイムスタンプは、以下を含む異なる精度の種類で表すことができます。 adaptive (デフォルト)は、データベース列のタイプに基づいて、ミリ秒、マイクロ秒、またはナノ秒の精度値のいずれかを使用して、データベースの値と全く同じように時間とタイムスタンプ値をキャプチャーします。adaptive_time_microseconds は、ミリ秒のいずれかを使用してデータベースの値と全く同じように日付、日時、およびタイムスタンプ値をキャプチャーします。 データベース列の型に基づいたマイクロ秒、またはナノ秒の精度値。ただし、常にマイクロ秒としてキャプチャーされる TIME 型フィールドを除き、connect は Kafka Connect の Time、Date、および Timestamp の組み込み表現を使用して、常に時間とタイムスタンプ値を表します。これは、データベース列の精度に関わらず、ミリ秒の精度を使用します。時間値 を参照してください。

decimal.handling.mode

precise

コネクターによる DECIMAL 列および NUMERIC 列の値の処理方法を指定します。precise (デフォルト)はバイナリー形式で変更イベントで表される java.math.BigDecimal 値を使用して正確に表します。または doubledouble 値を使用して表します。精度が失われる可能性はありますが、非常に簡単に使用できます。string オプションは値をフォーマットされた文字列としてエンコードします。これは簡単に使用できますが、実際の型に関するセマンティック情報は失われます。「10 進数値」を参照してください。

hstore.handling.mode

map

コネクターによる hstore 列の値の処理方法を指定します。map (デフォルト)は MAP を使用して表します。または json は json 文字 列を使用して値を表します。json オプションは、値を {"key" : "val"} などのフォーマットされた文字列としてエンコードします。「hstore の値」を参照してください。

interval.handling.mode

numeric

コネクターによる 間隔 列の値の処理方法を指定します。数値 (デフォルト)はマイクロ 秒の概算数を使用して間隔を表します。文字 列は、文字列パターン表現 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S、例: P1Y2M3DT4H5M6.78S を使用してそれらを正確に表します。「データ型」を参照してください。

database.sslmode

disable

PostgreSQL サーバーへの暗号化された接続を使用するかどうか。オプションには以下が含まれます。暗号化されていない接続を使用するには を 無効 にします。安全な(暗号化された)接続を使用し、接続を確立できない場合は失敗します。 require のような verify-ca は、設定された認証局(CA)証明書に対してサーバー TLS 証明書を追加で検証するか、有効な一致する CA 証明書が見つからない場合は失敗します。verify-fullverify-ca としますが、サーバー証明書が接続を試行するホストと一致することを確認します。詳細は PostgreSQL のドキュメント を参照してください。

database.sslcert

 

クライアントの SSL 証明書を含むファイルへのパス。詳細は PostgreSQL のドキュメント を参照してください。

database.sslkey

 

クライアントの SSL 秘密鍵が含まれるファイルへのパス。詳細は PostgreSQL のドキュメント を参照してください。

database.sslpassword

 

database.sslkey で指定されたファイルからクライアントの秘密鍵にアクセスするためのパスワード。詳細は PostgreSQL のドキュメント を参照してください。

database.sslrootcert

 

サーバーが検証されるルート証明書が含まれるファイルへのパス。詳細は PostgreSQL のドキュメント を参照してください。

database.tcpKeepAlive

 

TCP keep-alive プローブを有効にして、データベース接続がまだ有効であることを確認します(デフォルトでは有効)。詳細は PostgreSQL のドキュメント を参照してください。

tombstones.on.delete

true

削除イベント後に廃棄 (tombstone) イベントを生成するかどうかを制御します。
true の場合、削除操作は 削除 イベントと後続の廃棄(tombstone)イベントで表されます。false の場合、削除 イベントのみが送信されます。
廃棄 (tombstone) イベントを生成すると (デフォルトの動作)、Kafka はソースレコードが削除されると、指定のキーに関連するすべてのイベントを完全に削除できます。

column.truncate.to.length.chars

該当なし

フィールド値が指定された文字数より長い場合に、変更イベントメッセージ値で値を省略する必要がある文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。長さが異なる複数のプロパティーを単一の設定で使用できますが、それぞれの長さは正の整数である必要があります。列の完全修飾名の形式は databaseName です。tableName.columnName または databaseName.schemaName.tableName.columnName.

column.mask.with.length.chars

該当なし

文字ベースの列の完全修飾名にマッチする正規表現のコンマ区切りリスト (オプション) で、変更イベントメッセージの値を、指定された数のアスタリスク (*) 文字で設定されるフィールド値に置き換える必要があります。長さが異なる複数のプロパティーを単一の設定で使用できますが、それぞれの長さは正の整数またはゼロである必要があります。列の完全修飾名の形式は databaseName です。tableName.columnName または databaseName.schemaName.tableName.columnName.

column.propagate.source.type

該当なし

出力された変更メッセージの該当するフィールドスキーマに元の型および長さをパラメーターとして追加する必要がある列の完全修飾名と一致する、正規表現のコンマ区切りリスト (任意)。スキーマパラメーター (__debezium.source.column.type__debezium.source.column.length、および __debezium.source.column.scale) は、それぞれ元の型名と長さ (可変幅型) を伝播するために使用されます。シンクデータベースの対応する列を適切にサイズ調整するのに便利です。列の完全修飾名の形式は databaseName です。tableName.columnName または databaseName.schemaName.tableName.columnName.

message.key.columns

空の文字列

プライマリーキーをマップする完全修飾テーブルおよび列と一致する正規表現のセミコロン区切りリスト。
各項目(正規表現)は、カスタムキーを表す完全修飾 < fully-qualified table>:<a comma-separated list of columns > と一致する必要があります。
特定のコネクターに応じて、完全修飾テーブルは DB_NAME.TABLE_NAME または SCHEMA_NAME.TABLE_NAME として定義できます。

以下の 高度な 設定プロパティーには、ほとんどの状況で機能する適切なデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。

プロパティーデフォルト説明

snapshot.mode

Initial

コネクターの起動時にスナップショットを実行する基準を指定します。デフォルトは initial で、論理サーバー名に対してオフセットが記録されていない場合にのみコネクターがスナップショットを実行できます。always オプションは、起動時にコネクターが常にスナップショットを実行するように指定します。never オプションは、接続でスナップショットを使用しないことを指定し、論理サーバー名を使用して初回起動時に、コネクターは最後に停止した場所(最後の LSN の位置)から読み取るか、論理レプリケーションスロットのビューから最初から開始するように指定します。initial_only オプションは、後続の変更を処理せずにコネクターが最初のスナップショットのみを取得し、停止することを指定します。exported オプションは、データベーススナップショットがレプリケーションスロットが作成された時点に基づいていることを指定します。これは、ロックのない方法でスナップショットを実行するための優れた方法です。

snapshot.lock.timeout.ms

10000

スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する正の整数値。この時間間隔でテーブルロックを取得できない場合、スナップショットは失敗します。スナップショットを参照してください。

snapshot.select.statement.overrides

 

スナップショットに含まれるテーブルの行を制御します。
このプロパティーには、完全修飾テーブル (DB_NAME.TABLE_NAME) のコンマ区切りリストが含まれます。各テーブルの select ステートメントは、ID snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME] によって識別される、テーブルごとに 1 つずつ追加の設定プロパティーに指定されます。これらのプロパティーの値は、スナップショットの実行中に特定のテーブルからデータを取得するときに使用する SELECT ステートメントです。大規模な追加専用テーブルで可能なユースケースとしては、前のスナップショットが中断された場合にスナップショットの開始 (再開) 点を設定することが挙げられます。
注記:この設定はスナップショットにのみ影響します。論理デコーダによって生成されるイベントは、それによる影響を受けません。

event.processing​.failure.handling.mode

fail

イベントの処理中にコネクターが例外に対応する方法を指定します。fail は例外 (問題のあるイベントのオフセットを示す) を伝達するため、コネクターが停止します。
warn を指定すると問題のあるイベントがスキップされ、問題のあるイベントのオフセットがログに記録されます。
skip を指定すると、問題のあるイベントがスキップされます。

max.queue.size

20240

ストリーミングレプリケーションを介して受信される変更イベントが Kafka に書き込まれる前に配置されるブロッキングキューの最大サイズを指定する正の整数値。このキューは、Kafka への書き込みが遅い場合や Kafka が利用できない場合などにバックプレシャーを提供できます。

max.batch.size

10240

このコネクターの反復処理中に処理される必要があるイベントの各バッチの最大サイズを指定する正の整数値。

poll.interval.ms

1000

各反復処理の実行中に新しい変更イベントが表示されるまでコネクターが待機する時間 (ミリ秒単位) を指定する正の整数値。デフォルトは 1000 ミリ秒 (1 秒) です。

include.unknown.datatypes

false

Debezium がデータタイプが不明なフィールドを満たす場合、デフォルトでは、フィールドは変更イベントから省略され、警告がログに記録されます。フィールドを組み込み、不透明なバイナリー表現のクライアントにダウンストリームを送信して、クライアントが自分でデコードできるようにする方が望ましい場合があります。イベントから不明なデータをフィルターリングする場合は false に設定し、バイナリー形式で保持するには true に設定します。

注記

クライアントが後方互換性の問題を危険にさらす。リリース間でデータベース固有のバイナリー表現の変更があるだけでなく、最終的に Debezium によってデータタイプがサポートされる場合でも、論理型でダウンストリームに送信され、コンシューマーによる調整が必要になります。一般的に、サポートされていないデータ型に遭遇する場合は、機能リクエストを提出して、サポートを追加できるようにします。

database.initial.statements

 

データベースへの JDBC 接続(トランザクションログの読み取り接続ではない)が確立されたときに実行される SQL ステートメントのセミコロン区切りリスト。セミコロン(';;')を使用してセミコロンを区切り文字としてではなく、文字として使用します。

注記

コネクターは独自の判断で JDBC 接続を確立するため、通常これはセッションパラメーターの設定にのみ使用してくださいが、DML ステートメントの実行にはは使用しないでください。

heartbeat.interval.ms

0

ハートビートメッセージが送信される頻度を制御します。
このプロパティーには、コネクターがメッセージをハートビートトピックに送信する頻度を定義するミリ秒単位の間隔が含まれます。これは、コネクターがデータベースから変更イベントを受信しているかどうかを監視するために使用できます。また、長期に渡り変更されるのはキャプチャーされていないテーブルのレコードのみである場合は、ハートビートメッセージを利用する必要があります。このような場合、コネクターはデータベースからのログの読み取りを続行しますが、変更メッセージを Kafka に出力しないため、オフセットの更新は Kafka にコミットされません。これにより、(コネクターが実際に処理されているが、最新の LSN をデータベースにフラッシュする機会がない)WAL ファイルがデータベースによって保持され、コネクターの再起動後により多くの変更イベントが再送信される可能性があります。このプロパティーを 0 に設定して、ハートビートメッセージが全く送信されないようにします。
デフォルトでは無効にされています。

heartbeat.topics.prefix

__debezium-heartbeat

ハートビートメッセージが送信されるトピックの命名を制御します。
トピックは、<heartbeat.topics.prefix>.<server.name> パターンに従って名前が付けられます。

schema.refresh.mode

columns_diff

テーブルのインメモリースキーマの更新をトリガーする条件を指定します。

columns_diff (デフォルト)は安全なモードで、インメモリースキーマが常にデータベーステーブルのスキーマと同期したままになるようにします。

columns_diff_exclude_unchanged_toast は、変更されていない TOASTable データが完全に不一致がない限り、受信メッセージから派生するスキーマとの間に不一致がある場合に備えて、インメモリースキーマキャッシュを更新するようコネクターに指示します。

この設定は、更新の一部がほとんどない TOASTed データを持つ頻繁に更新されるテーブルがある場合、コネクターのパフォーマンスが大幅に向上します。ただし、TOASTable 列がテーブルから削除されると、インメモリースキーマが古い状態になる可能性があります。

snapshot.delay.ms

 

コネクターの起動後、スナップショットを取得するまで待機する間隔 (ミリ秒単位)。
クラスター内で複数のコネクターを開始する際に スナップショットが中断されないようにするために使用でき、コネクターのリバランスが実行される可能性があります。

snapshot.fetch.size

10240

スナップショットの実行中に各テーブルから 1 度に読み取る必要がある行の最大数を指定します。コネクターは、このサイズの複数のバッチでテーブルの内容を読み取ります。デフォルトは 10240 です。

slot.stream.params

 

設定された論理デコードプラグインに渡されるパラメーターのオプションの一覧です。例えば、add-tables=public.table,public.table2;include-lsn=true のようにします。

sanitize.field.names

コネクター設定が、Avro を使用するように key.converter または value.converter パラメーターを明示的に指定する場合は true です。それ以外の場合のデフォルトは false です。

Avro の命名要件 に準拠するためにフィールド名がサニタイズされるかどうか。詳細は Avro の命名 を参照してください。

slot.max.retries

6

試行に失敗した場合にレプリケーションスロットへの接続を再試行する回数。

slot.retry.delay.ms

10000 (10 秒)

コネクターがレプリケーションスロットへの接続に失敗した場合に再試行するまで待機する時間(ミリ秒単位)。

toasted.value.placeholder

__debezium_unavailable_value

元の値がデータベースによって提供されないトランピングされた値であることを示す定数を指定します。が hex: 接頭辞で始まる場合は、残りの文字列が 16 進数でエンコードされたオクテットであることが想定されます。補足情報は、セクション を参照してください。

コネクターは、Kafka プロデューサーおよびコンシューマーの作成時に使用される パススルー 設定プロパティーもサポートします。

Kafka プロデューサーおよびコンシューマーのすべての設定プロパティーについては、必ず Kafka ドキュメント を参照してください。(PostgreSQL コネクターは 新しいコンシューマー を使用します。)