5.4.3. コネクタープロパティー

以下の設定プロパティーは、デフォルト値が利用可能でない限り 必要 になります。

プロパティー

デフォルト

説明

name

 

コネクターの一意の名前。同じ名前の再登録を試みると失敗します。(このプロパティーはすべての Kafka Connect コネクターで必要です。)

connector.class

 

コネクターの Java クラスの名前。SQL Server コネクターに常に io.debezium.connector.sqlserver.SqlServerConnector の値を使用してください。

tasks.max

1

このコネクターに作成する必要のあるタスクの最大数。SQL Server コネクターは常に単一のタスクを使用するため、この値は使用されないため、デフォルト値は常に受け入れ可能です。

database.hostname

 

SQL Server データベースサーバーの IP アドレスまたはホスト名。

database.port

1433

SQL Server データベースサーバーの整数ポート番号。

database.user

 

SQL Server データベースサーバーに接続するときに使用するユーザー名。

database.password

 

SQL Server データベースサーバーに接続するときに使用するパスワード。

database.dbname

 

変更をストリーミングする SQL Server データベースの名前

database.server.name

 

監視する特定の SQL Server データベースサーバーの名前空間を特定して提供する論理名。このコネクターから成るすべての Kafka トピック名の接頭辞として使用されるため、論理名は他のすべてのコネクター全体で一意にする必要があります。英数字とアンダースコアのみを使用してください。

database.history.kafka.topic

 

コネクターがデータベーススキーマ履歴を保存する Kafka トピックのフルネーム。

database.history​.kafka.bootstrap.servers

 

コネクターが Kafka クラスターへの最初の接続を確立するのに使用するホスト/ポートペアの一覧。このコネクションは、コネクターによって以前に保存されていたデータベーススキーマ履歴を取得する場合や、ソースデータベースから読み取った各 DDL ステートメントを書き込むために使用されます。これは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを参照する必要があります。

table.whitelist

 

監視するテーブルの完全修飾テーブル識別子と一致する正規表現のオプションのコンマ区切りリスト。ホワイトリストに含まれていないテーブルはすべて監視から除外されます。各識別子は schemaName.tableName形式 です。デフォルトでは、コネクターは監視される各スキーマのすべてのシステムテーブルを監視します。とは使用されない場合があり table.blacklistます。

table.blacklist

 

監視から除外されるテーブルの完全修飾テーブル識別子と一致する正規表現のオプションのコンマ区切りリスト。ブラックリストに含まれていないテーブルはすべて監視されます。各識別子は schemaName.tableName形式 です。とは使用されない場合があり table.whitelistます。

column.blacklist

空の文字列

変更イベントメッセージ値から除外される必要のある列の完全修飾名に一致する正規表現のオプションのコンマ区切りリスト。カラムの完全修飾名は schemaName.tableName . columnName 形式です。プライマリーキーの列は、値からブラックリストに指定した場合でも、イベントのキーに常に含まれます。

time.precision.mode

adaptive

時間、日付、およびタイムスタンプは、異なるタイプの精度で表すことができます。たとえば、adaptive (デフォルト)は、データベース列のタイプに応じてミリ秒、マイクロ秒、またはナノ秒のいずれかの精度を使用して、データベース内で時間とタイムスタンプの値を正確にキャプチャーします。または、Kafka Connect の組み込み表現を時間、日付、およびタイムスタンプの値に connect 常に表します。「 一時値 」を参照してください。

tombstones.on.delete

true

削除イベントの後に tombstone イベントを生成するかどうかを制御します。
削除操作 true が削除イベントと後続の tombstone イベントによって表される場合。削除イベント false のみを送信する場合。
tombstone イベント(デフォルトの動作)を生成すると、Kafka はソースレコードが削除されると、指定のキーに関連するすべてのイベントを完全に削除できます。

column.truncate.to.length.chars

該当なし

フィールドの値が指定された文字数よりも長い場合に、変更イベントメッセージの値で切り捨てられる必要のある文字ベースの列の完全修飾名に一致する正規表現のオプションのコンマ区切りリスト。異なる長さを持つ複数のプロパティーを 1 つの設定で使用できますが、それぞれの長さは正の整数である必要があります。カラムの完全修飾名は schemaName.tableName . columnName 形式です。

column.mask.with.length.chars

該当なし

文字ベースの列の完全修飾名に一致する正規表現のオプションのコンマ区切りリスト。このリストは、変更イベントメッセージの値を、指定された数のアスタリスク(*)文字で構成されるフィールド値に置き換える必要があります。異なる長さを持つ複数のプロパティーを 1 つの設定で使用できますが、各長さは正の整数またはゼロである必要があります。カラムの完全修飾名は schemaName.tableName . columnName 形式です。

column.propagate.source.type

該当なし

元の型と長さをパラメーターとして追加する必要がある列の完全修飾名に一致する正規表現のオプションのコンマ区切りリスト。出力された変更メッセージの対応するフィールドスキーマにパラメーターとして追加する必要があります。スキーマパラメーター __debezium.source.column.length および __debezium.source.column.scale__debezium.source.column.type、元の型名と長さ(可変長のタイプ用)をそれぞれ伝播するために使用されます。シンクデータベースの対応する列のサイズを適切に調整するために便利です。カラムの完全修飾名は schemaName.tableName . columnName 形式です。

datatype.propagate.source.type

該当なし

元の型と長さをパラメーターとして追加する必要がある、出力された変更メッセージの対応するフィールドスキーマに対して、データベース固有のデータタイプ名に一致する正規表現のオプションのコンマ区切りリスト。スキーマパラメーター __debezium.source.column.length およびは __debezium.source.column.type、元の型名と長さ(可変長のタイプ用)をそれぞれ伝搬するために __debezium.source.column.scale 使用されます。シンクデータベースの対応する列のサイズを適切に調整するために便利です。完全修飾データタイプ名は schemaName.tableName . typeName 形式ですSQL Server 固有のデータタイプ名の リストは、「 SQL Server データタイプ」を参照してください。

message.key.columns

空の文字列

プライマリーキーをマップする完全修飾テーブルおよび列と一致する正規表現のセミコロンリスト。
各項目(通常の式)は、カスタムキー <fully-qualified table>:<a comma-separated list of columns> を表す完全修飾と一致する必要があります。
完全修飾テーブルは schemaName.tableName として 定義 できます。

以下の 高度な 設定プロパティーには、ほとんどの場合で動作するため、コネクターの設定で指定する必要はほとんどありません。

プロパティー

デフォルト

説明

snapshot.mode

initial

構造の最初のスナップショットと、任意でキャプチャーされたテーブルのデータを取得するためのモード。スナップショットが完了すると、コネクターはデータベースの変更ログから変更イベントを引き続き読み取ります。

サポートされる値は:
initial: で、キャプチャーされたテーブルの構造およびデータのスナップショットを作成します。トピックに、キャプチャーされたテーブルからのデータの完全な表現を組み込む必要がある場合に便利です。キャプチャーされたテーブルのスナップショットのみを作成し
schema_onlyます。今後の変更のみがトピックに伝播される必要がある場合に役に立ちます。

snapshot.isolation.mode

repeatable_read

使用するトランザクション分離レベルと、監視対象のテーブルをコネクターがロックする時間を制御するモード。、read_uncommitted、、read_committed repeatable_read snapshotおよびの 5 つの値があります( exclusive 実際には、exclusive モードは反復可能な読み取り分離レベルを使用しますが、読み取りにはすべてのテーブルに対して排他的ロックが行われます)。

これは、read_committed read_uncommitted モードにより snapshot、初期スナップショット時に他のトランザクションがテーブル行の更新を妨げることはありませんが、exclusive およびの作業は行われませ repeatable_read ん。

もう 1 つの側面として、データの一貫性があります。初期スナップショット exclusive snapshot およびストリーミングログがリニア履歴を構成するのは、およびモードのみになります。repeatable_read および read_committed モードの場合は、たとえば、追加されたレコードが、初期スナップショットでは、1 回、ストリーミングフェーズでは 2 回、という可能性があります。それにもかかわらず、整合性レベルはデータのミラーリングに対して行う必要があります。データの一貫性保証は read_uncommitted ありません(一部のデータは損失または破損する可能性があります)。

event.processing​.failure.handling.mode

fail

イベントの処理中にコネクターが例外に応答する方法を指定します fail (問題のあるイベントのオフセットを示唆)、コネクターが停止します
warn。は、問題のあるイベントが省略され、問題のあるイベントのオフセットがログに記録される原因と
skip なります。は、問題のあるイベントをスキップします。

poll.interval.ms

1000

新しい変更イベントが表示されるまでコネクターが待機する時間(ミリ秒単位)を指定する正の整数値。デフォルトは 1000 ミリ秒または 1 秒です。

max.queue.size

8192

データベースログから読み取られるイベントの変更が Kafka に書き込まれる前に配置されるブロックキューの最大サイズを指定する正の整数値。たとえば、Kafka への書き込みが遅い場合や、Kafka が利用できない場合などに、このキューはテーブルのリーダーにバックマークを提供することができます。キューに表示されるイベントは、このコネクターによって定期的に記録されるオフセットには含まれません。デフォルトは 8192 で、max.batch.size プロパティーに指定された最大バッチサイズよりも大きくする必要があります。

max.batch.size

2048

このコネクターの各反復中に処理されるイベントの各バッチの最大サイズを指定する正の整数値。デフォルトは 2048 です。

heartbeat.interval.ms

0

ハートビートメッセージを送信する頻度を制御します。
このプロパティーには、コネクターがハートビートトピックにメッセージを送信する頻度を定義する間隔(ミリ秒単位)が含まれます。これは、コネクターがデータベースから変更イベントを受信しているかどうかを監視するために使用できます。また、キャッシュされていないテーブルのレコードのみが長くなる場合には、ハートビートメッセージも利用する必要があります。このような状況では、コネクターは引き続きデータベースからログを読み取るが、Kafka に変更メッセージを出力せず、オフセットの更新が Kafka にコミットされないことを意味します。これにより、コネクターの再起動後にイベントが再送信されることがあります。ハートビートメッセージを送信しないよう 0 に、このパラメーターをに設定します。
デフォルトでは無効です。

heartbeat.topics.prefix

__debezium-heartbeat

ハートビートメッセージを送信するトピックの命名を制御します。
トピックには、パターンに従って名前が付けられ <heartbeat.topics.prefix>.<server.name>ます。

snapshot.delay.ms

 

起動後のスナップショットの取得前にコネクターが待機する間隔(ミリ秒単位)
。クラスターで複数のコネクターを起動する際にスナップショットの中断を回避するために使用できます。コネクターのリバランスが発生する可能性があります。

snapshot.fetch.size

2000

スナップショットの作成中に、各テーブルから 1 回読み込むべき行の最大数を指定します。コネクターはこのサイズの複数のバッチでテーブルコンテンツを読み取ります。デフォルトは 2000 です。

snapshot.lock.timeout.ms

10000

スナップショット実行時にテーブルロックの取得を待つ最大時間(ミリ秒単位)を指定する整数値。テーブルロックをこの時間帯に取得できない場合、スナップショットは失敗します( スナップショットも参照してください)。
に設定すると 0、コネクターがロックを取得できない場合にすぐに失敗します。値 -1 は無限に待機中であることを示します。

snapshot.select.statement.overrides

 

スナップショットに含まれるテーブルからの行を制御します。
このプロパティーには、完全修飾テーブルのコンマ区切りリスト( SCHEMA_NAME.TABLE_NAME) が含まれます。個々のテーブルの select ステートメントは、追加の設定プロパティーで指定され、各テーブルの 1 つずつ、ID によって識別されます snapshot.select.statement.overrides.[SCHEMA_NAME].[TABLE_NAME]。これらのプロパティーの値は、スナップショット中に特定のテーブルからデータを取得する際に使用する SELECT ステートメントになります。大規模な追加のみのテーブルのユースケースとして、以前のスナップショットが中断された場合に、スナップショットを開始(再開)する特定のポイントを設定することが挙げられます。
注記: この設定は、スナップショットにのみ影響します。ログの読み取り中にキャプチャーされるイベントは影響を受けません。

sanitize.field.names

true コネクター設定が Avro を使用するよう key.converter または value.converter パラメーターを明示的に指定する場合、それ以外の場合はデフォルトでに設定され falseます。

フィールド名が Avro 命名要件に準拠するようにサニタイズされているかどうか。

database.server.timezone

 

サーバーのタイムゾーン。

これは、サーバーから取得したトランザクションタイムスタンプ(ts_ms)のタイムゾーンを定義するために使用されます(実際にはゾーンされていません)。デフォルト値は unset です。SQL Server 2014 以前のバージョンで実行され、データベースサーバーに異なるタイムゾーンと Debezium コネクターを実行する JVM を使用する場合のみ指定する必要があります。
設定を解除すると、デフォルトの動作では、Debezium コネクターを実行する仮想マシンのタイムゾーンが使用されます。この場合、SQL Server 2014 以前で実行し、サーバーとコネクターで異なるタイムゾーンを使用する場合は、ts_ms の値が正しくない可能性があります。
使用できる値は、「Z」、「UTC」、オフセット値(+02:00、CET のような短縮ゾーン ID、"Europe/Paris" など)などです。

provide.transaction.metadata

false

true Debezium に設定すると、トランザクション境界でイベントが生成され、トランザクションメタデータでデータイベントが強化されます。

詳細は、「 トランザクションメタデータ 」を参照してください。

コネクターは、Kafka プロデューサーおよびコンシューマーの作成時に使用される パススルー 設定プロパティーもサポートします。具体的には、データベース履歴に書き込む Kafka プロデューサーの作成時に、接頭辞で始まるコネクター設定プロパティーがすべて database.history.producer. 使用され、コネクターの起動時にデータベース履歴を読み取る Kafka コンシューマーの作成時に、接頭辞で始まるすべてのコネクター設定が database.history.consumer. 使用されます(接頭辞なし)。

たとえば、以下のコネクター設定プロパティーを使用して Kafka ブローカーへの接続をセキュア化 できます。

Kafka プロデューサーおよびコンシューマーの パススルー に加え、で始まるプロパティー( database.例:) database.applicationName=debezium は JDBC URL に渡されます。

database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234
database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234

Kafka プロデューサーおよびコンシューマーのすべての設定プロパティーについては、Kafka のドキュメント を参照してください。(SQL Server コネクターは 新しいコンシューマー を使用します。)