6.2. Debezium Oracle コネクターのデータ変更イベントの説明
Oracle コネクターが出力する全データ変更イベントにはキーと値があります。キーと値の構造は、変更イベントの発生元となるテーブルによって異なります。Debezium のトピック名を構築する方法は、トピック名を参照してください。
Debezium Db2 コネクターは、すべての Kafka Connect スキーマ名 が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベット文字またはアンダースコア ([a-z,A-Z,_]) で始まり、論理サーバー名の残りの文字と、スキーマおよびテーブル名の残りの文字は英数字またはアンダースコア ([a-z,A-Z,0-9,\_]) で始まる必要があります。コネクターは無効な文字をアンダースコアに自動的に置き換えます。
複数の論理サーバー名、スキーマ名、またはテーブル名の中で区別ができる文字のみが無効な場合に、アンダースコアに置き換えられると、命名で予期しない競合が発生する可能性があります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、トピックコンシューマーによる処理が困難になることがあります。変更可能なイベント構造の処理を容易にするため、Kafka Connect の各イベントは自己完結型となっています。すべてのメッセージキーと値には、スキーマ と ペイロード の 2 つの部分で設定されます。スキーマはペイロードの構造を記述しますが、ペイロードには実際のデータが含まれます。
SYS、SYSTEM ユーザーアカウントが加える変更は、コネクターではキャプチャーされません。
データ変更イベントの詳細は、以下を参照してください。
6.2.1. Debezium Oracle コネクター変更イベントのキー
変更されたテーブルごとに変更イベントキーは、イベントの作成時にテーブルのプライマリーキー (または一意のキー制約) の各コラムにフィールドが存在するように設定されます。
たとえば、inventory データベーススキーマに定義されている customers テーブルには、以下の変更イベントキーが含まれる場合があります。
CREATE TABLE customers ( id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY, first_name VARCHAR2(255) NOT NULL, last_name VARCHAR2(255) NOT NULL, email VARCHAR2(255) NOT NULL UNIQUE );
< topic.prefix>.transaction 設定プロパティーの値が server1 に設定されている場合、データベースの customers テーブルで発生するすべての変更イベントの JSON 表現には以下のキー構造があります。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "ID"
}
],
"optional": false,
"name": "server1.INVENTORY.CUSTOMERS.Key"
},
"payload": {
"ID": 1004
}
}
キーのスキーマ 部分には、キーの部分の内容を記述する Kafka Connect スキーマが含まれます。上記の例では、payload 値はオプションではなく、構造は server1.DEBEZIUM.CUSTOMERS.Key という名前のスキーマで定義され、タイプ int32 の id という名前の必須フィールドが 1 つあります。キーの payload フィールドの値から、id フィールドが 1 つでその値が 1004 の構造 (JSON では単なるオブジェクト) であることが分かります。
つまり、このキーは、id プライマリーキーの列にある値が 1004 の、inventory.customers テーブルの行 (名前が server1 のコネクターからの出力) を記述していると解釈できます。
6.2.2. Debezium Oracle 変更イベントの値
変更イベントメッセージの値の構造は、メッセージの変更イベントの メッセージキーの構造 をミラーリングし、スキーマ セクションと ペイロード セクションの両方が含まれます。
変更イベント値のペイロード
変更イベント値のペイロードセクションの エンベロープ 構造には、以下のフィールドが含まれます。
op-
操作のタイプを記述する文字列値が含まれる必須フィールド。Oracle コネクターの変更イベント値のペイロードの
opフィールドには、c(作成または挿入)、u(更新)、d(delete)、またはr(スナップショットを示す read) のいずれかの値が含まれます。 before-
任意のフィールド。存在する場合は、イベント発生 前 の行の状態を記述します。この構造は、
server1.INVENTORY.CUSTOMERS.ValueKafka Connect スキーマによって記述され、server1コネクターはinventory.customersテーブルのすべての行に使用します。
after-
オプションのフィールドで、存在する場合は、変更が発生した 後の 行の状態が含まれます。構造は、
beforeフィールドに使用されるserver1.INVENTORY.CUSTOMERS.ValueKafka Connect スキーマによって記述されます。 sourceイベントのソースメタデータを記述する構造体を含む必須フィールド。Oracle コネクターの場合、構造には以下のフィールドが含まれます。
- Debezium のバージョン。
- コネクター名。
- イベントが進行中のスナップショットの一部であるかどうか。
- トランザクション ID(スナップショットの含まない)。
- 変更の SCN。
- ソースデータベースのレコードがいつ変更されたかを示すタイムスタンプ (スナップショットの場合、タイムスタンプはスナップショットの発生時刻を示す)。
変更を加えたユーザー名
ヒントcommit_scnフィールドは任意で、変更イベントが参加するトランザクションコミットの SCN を記述します。
ts_ms- 任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックがベース) が含まれます。
変更イベント値のスキーマ
イベントメッセージの値の スキーマ 部分には、ペイロードのエンベロープ構造と、その中のネストされたフィールドを記述するスキーマが含まれます。
変更イベント値の詳細は、以下を参照してください。
作成 イベント
次の例は、イベントキーの変更 例で説明した customers テーブルの create イベントの値を示しています。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "ID"
},
{
"type": "string",
"optional": false,
"field": "FIRST_NAME"
},
{
"type": "string",
"optional": false,
"field": "LAST_NAME"
},
{
"type": "string",
"optional": false,
"field": "EMAIL"
}
],
"optional": true,
"name": "server1.DEBEZIUM.CUSTOMERS.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "ID"
},
{
"type": "string",
"optional": false,
"field": "FIRST_NAME"
},
{
"type": "string",
"optional": false,
"field": "LAST_NAME"
},
{
"type": "string",
"optional": false,
"field": "EMAIL"
}
],
"optional": true,
"name": "server1.DEBEZIUM.CUSTOMERS.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"field": "txId"
},
{
"type": "string",
"optional": true,
"field": "scn"
},
{
"type": "string",
"optional": true,
"field": "commit_scn"
},
{
"type": "string",
"optional": true,
"field": "rs_id"
},
{
"type": "int32",
"optional": true,
"field": "ssn"
},
{
"type": "int32",
"optional": true,
"field": "redo_thread"
},
{
"type": "string",
"optional": true,
"field": "user_name"
},
{
"type": "boolean",
"optional": true,
"field": "snapshot"
}
],
"optional": false,
"name": "io.debezium.connector.oracle.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "server1.DEBEZIUM.CUSTOMERS.Envelope"
},
"payload": {
"before": null,
"after": {
"ID": 1004,
"FIRST_NAME": "Anne",
"LAST_NAME": "Kretchmar",
"EMAIL": "annek@noanswer.org"
},
"source": {
"version": "2.1.4.Final",
"name": "server1",
"ts_ms": 1520085154000,
"txId": "6.28.807",
"scn": "2122185",
"commit_scn": "2122185",
"rs_id": "001234.00012345.0124",
"ssn": 1,
"redo_thread": 1,
"user_name": "user",
"snapshot": false
},
"op": "c",
"ts_ms": 1532592105975
}
}上記の例では、イベントが以下のスキーマを定義する方法に注目してください。
-
エンベロープ (
server1.DEBEZIUM.CUSTOMERS.Envelope)。 -
ソース構造 (io.debezium.connector.oracle.Source、Oracle コネクターに固有で、すべてのイベントで再利用)。 -
beforeフィールドおよびafterフィールドのテーブル固有のスキーマ。
before フィールドおよび after フィールドのスキーマ名は <logicalName>.<schemaName>.<tableName>.Value の形式であるため、他のすべてのテーブルのスキーマからは完全に独立しています。その結果、Avro コンバーター を使用する場合、各論理ソースのテーブルの Avro スキーマには独自の進化と履歴があります。
このイベントの 値 の payload 部分には、イベントに関する情報が含まれます。行が作成された (op=c) ことを術士、および after フィールドの値に、行の ID、FIRST_NAME、LAST_NAME、および EMAIL 列に挿入された値が含まれていることを表しています。
デフォルトでは、イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。サイズが大きいのは、JSON 表現がメッセージのスキーマ部分とペイロード部分の両方を含んでいるためです。Avro コンバーター を使用して、 コネクターが Kafka トピックに書き込むメッセージのサイズを小さくすることができます。
更新イベント
次の例は、前の create イベントと同じテーブルからコネクターが捕捉した 更新 変更イベントを示しています。
{
"schema": { ... },
"payload": {
"before": {
"ID": 1004,
"FIRST_NAME": "Anne",
"LAST_NAME": "Kretchmar",
"EMAIL": "annek@noanswer.org"
},
"after": {
"ID": 1004,
"FIRST_NAME": "Anne",
"LAST_NAME": "Kretchmar",
"EMAIL": "anne@example.com"
},
"source": {
"version": "2.1.4.Final",
"name": "server1",
"ts_ms": 1520085811000,
"txId": "6.9.809",
"scn": "2125544",
"commit_scn": "2125544",
"rs_id": "001234.00012345.0124",
"ssn": 1,
"redo_thread": 1,
"user_name": "user",
"snapshot": false
},
"op": "u",
"ts_ms": 1532592713485
}
}ペイロードは create (挿入) イベントのペイロードと同じですが、以下の値は異なります。
-
opフィールドの値がuであり、この行が更新により変更されたことを示す。 -
beforeフィールドは、updateデータベースのコミット前に存在する値が含まれる行の以前の状態を表示します。 -
afterフィールドは行の更新状態を示しており、EMAILの値は現在anne@example.comに設定されています。 -
sourceフィールドの構造は以前と同じフィールドを含みますが、コネクターが REDO ログ内の異なる位置からイベントをキャプチャしたため、値は異なります。 -
ts_msフィールドは、Debezium がいつイベントを処理したかを示すタイムスタンプを示す。
payload セクションでは、他に有用な情報を複数示しています。たとえば、before と after の構造を比較して、コミットの結果として行がどのように変更されたかを判断できます。ソース 構造で、この変更の記録に関する情報がわかるので、追跡が可能です。また、このトピックや他のトピックの他のイベントと関連して、このイベントが発生した場合に、見識を得ることができます。これは、別のイベントと同じコミットの前、後、または一部として発生していましたか ?
行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。その結果、Debezium はこのような更新後に 3 つのイベントを出力します。
-
DELETEイベント。 - 行のキーが古い tombstone イベント
-
行に新しいキーを提供する
INSERTイベント。
削除 イベント
次の例は、先の create とupdate の イベント例で示したテーブルの delete イベントを示しています。delete イベントの schema 部分は、これらのイベントの schema 部分と同一です。
{
"schema": { ... },
"payload": {
"before": {
"ID": 1004,
"FIRST_NAME": "Anne",
"LAST_NAME": "Kretchmar",
"EMAIL": "anne@example.com"
},
"after": null,
"source": {
"version": "2.1.4.Final",
"name": "server1",
"ts_ms": 1520085153000,
"txId": "6.28.807",
"scn": "2122184",
"commit_scn": "2122184",
"rs_id": "001234.00012345.0124",
"ssn": 1,
"redo_thread": 1,
"user_name": "user",
"snapshot": false
},
"op": "d",
"ts_ms": 1532592105960
}
}
イベントの payload 部分は、create または update イベントのペイロードと比較して、いくつかの違いを示しています。
-
opフィールドの値はdであり、行が削除されたことを意味します。 -
beforeフィールドは、データベースのコミットで削除された行の旧状態を示します。 -
afterフィールドの値がnull場合、その行はもはや存在しないことを意味します。 -
sourceフィールドの構造には、create または update イベントに存在する多くのキーが含まれるが、ts_ms、scn、txIdフィールドの値は異なっている。 -
ts_msは、Debezium がこのイベントを処理したタイミングを示すタイムスタンプを示します。
delete イベントは、コンシューマーがこの行の削除を処理するために必要な情報を提供する。
Oracle コネクターのイベントは、Kafka ログコンパクション と連携するように設計されており、すべてのキーで最新のメッセージが保持される限り、古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、 Kafka がストレージ領域を確保できるようにします。
行が削除されると、Kafka は同じキーを使用する以前のメッセージをすべて削除できるため、前の例で示された delete イベント値は、ログ圧縮でまだ機能します。同じキーを共有する メッセージをすべて 削除するように Kafka に指示するには、メッセージの値を null に設定する必要があります。これを可能にするにはデフォルトで、Debezium Oracle コネクターは、値が null 以外で同じキーを持つ特別な 廃棄 イベントが含まれる 削除 イベントに従います。コネクタープロパティー tombstones.on.delete を設定すると、デフォルトの動作を変更できます。
切り捨て (truncate) イベント
切り捨て (truncate) 変更イベントは、テーブルが切り捨てられていることを伝えます。この場合のメッセージキーは null で、メッセージの値は以下のようになります。
{
"schema": { ... },
"payload": {
"before": null,
"after": null,
"source": { 1
"version": "2.1.4.Final",
"connector": "oracle",
"name": "oracle_server",
"ts_ms": 1638974535000,
"snapshot": "false",
"db": "ORCLPDB1",
"sequence": null,
"schema": "DEBEZIUM",
"table": "TEST_TABLE",
"txId": "02000a0037030000",
"scn": "13234397",
"commit_scn": "13271102",
"lcr_position": null,
"rs_id": "001234.00012345.0124",
"ssn": 1,
"redo_thread": 1,
"user_name": "user"
},
"op": "t", 2
"ts_ms": 1638974558961, 3
"transaction": null
}
}表6.6 切り捨て (truncate) イベント値フィールドの説明
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
イベントのソースメタデータを記述する必須のフィールド。切り捨て (truncate) イベント値の
|
| 2 |
|
操作の型を記述する必須の文字列。 |
| 3 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。
|
切り捨て (truncate) イベントは、テーブル全体に加えられた変更を表し、メッセージキーを持たないため、コンシューマーが 切り捨て られたイベントや変更イベントを受信する保証はありません (作成、更新 など)。例えば、コンシューマーが異なるパーティションからイベントを読み込む場合、同じテーブルの truncate イベントを受信した後に、テーブルの update イベントを受信することがあります。順序は、トピックが単一のパーティションを使用する場合にのみ保証されます。
切り捨て (truncate) イベントをキャプチャーしない場合は、skipped.operations オプションを使用して除外します。