6.2.2. Debezium Oracle 変更イベントの値

message キーと同様に、変更イベントメッセージの値には schema セクションと payload セクションがあります。Oracle コネクターによって生成されたすべての変更イベント値の payload セクションは、以下のフィールドを含む envelope 構造となっています。

op
操作のタイプを記述する文字列値が含まれる必須フィールド。Oracle コネクターの値は、c (作成または挿入)、u (更新)、d (削除)、および r (読み取り、初回のスナップショットでない場合) です。
before
任意のフィールド。存在する場合は、イベント発生の行の状態が含まれます。この構造は、server1.INVENTORY.CUSTOMERS.Value Kafka Connect スキーマによって記述され、server1 コネクターは inventory.customers テーブルのすべての行に使用します。
after
任意のフィールド。存在する場合は、イベント発生の行の状態が含まれます。この構造は、before で使用されるのと同じ server1.INVENTORY.CUSTOMERS.Value Kafka Connect スキーマで記述されます。
source
Oracle に、Debezium バージョン、コネクター名、イベントが作成中のスナップショットかどうか、トランザクション ID (スナップショット作成中ではない)、変更の SCN、ソースとなるデータベースでレコードが変更された時点を表すタイムスタンプ (スナップショット中、スナップショットの作成時点) といったフィールドが含まれる場合に、イベントのソースメタデータを記述する構造を含む必須のフィールドです。
ヒント

commit_scn フィールドは任意で、変更イベントが参加するトランザクションコミットの SCN を記述します。このフィールドは、LogMiner 接続ファクトリーを使用している場合にのみ表示されます。

ts_ms
任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックを使用) が含まれます。

当然ながら、イベントメッセージの値のschema 部分には、このエンベロープ構造と、その中のネストされたフィールドを記述するスキーマが含まれます。

作成 イベント

customers テーブルの create イベント値を見てみましょう。

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": true,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_scn"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "snapshot"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.oracle.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            }
        ],
        "optional": false,
        "name": "server1.DEBEZIUM.CUSTOMERS.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "source": {
            "version": "1.7.2.Final",
            "name": "server1",
            "ts_ms": 1520085154000,
            "txId": "6.28.807",
            "scn": "2122185",
            "commit_scn": "2122185",
            "snapshot": false
        },
        "op": "c",
        "ts_ms": 1532592105975
    }
}

前述のイベントの valueschema 部分を調べると、以下のスキーマが定義されている方法を確認できます。

  • エンベロープ
  • ソース 構造 (Oracle コネクターに固有で、すべてのイベントで再利用)。
  • before フィールドおよび after フィールドのテーブル固有のスキーマ。
ヒント

before フィールドおよび after フィールドのスキーマ名は <logicalName>.<schemaName>.<tableName>.Value の形式であるため、他のすべてのテーブルのスキーマからは完全に独立しています。つまり、Avro コンバーター を使用する場合、各論理ソース各テーブルの Avro スキーマには独自の進化と履歴があります。

このイベントの payload 部分には、イベントに関する情報が含まれます。行が作成された (op=c) ことを術士、および after フィールドの値に、行の IDFIRST_NAMELAST_NAME、および EMAIL 列に挿入された値が含まれていることを表しています。

ヒント

デフォルトでは、イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。JSON 表現にはメッセージのスキーマ 部分と ペイロード 部分を含める必要があるため、これは True です。Avro コンバーター を使用すると、コネクターが Kafka トピックに記述するメッセージのサイズを大幅に小さくすることができます。

更新イベント

このテーブル 更新 変更イベントの値には、作成 イベントと同じ スキーマ があります。ペイロードは同じ構造を使用しますが、保持する値が異なります。以下に例を示します。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "source": {
            "version": "1.7.2.Final",
            "name": "server1",
            "ts_ms": 1520085811000,
            "txId": "6.9.809",
            "scn": "2125544",
            "commit_scn": "2125544",
            "snapshot": false
        },
        "op": "u",
        "ts_ms": 1532592713485
    }
}

更新 イベントの値を 作成 (insert) イベントと比較して、payload セクションの以下の相違点に注意してください。

  • op フィールドの値は u になっており、更新によってこの行が変更されたことを示しています。
  • before フィールドは、データベースのコミット前の行と値の状態を表しています。
  • after フィールドの行の状態が更新され、ここで EMAIL の値が anne@example.com であることを確認することができます。
  • source フィールドの構造には以前と同じフィールドがありますが、このイベントは redo ログの位置とは異なるめ、値は異なります。
  • ts_ms は、Debezium がこのイベントを処理したタイムスタンプを示します。

payload セクションでは、他に有用な情報を複数示しています。たとえば、beforeafter の構造を比較して、コミットの結果として行がどのように変更されたかを判断できます。ソース 構造で、この変更の記録に関する情報がわかるので、追跡が可能です。また、このトピックや他のトピックの他のイベントと関連して、このイベントが発生した場合に、見識を得ることができます。これは、別のイベントと同じコミットの前、後、または一部として発生していましたか ?

注記

行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。その結果、Debezium はこのような更新後に 3 つのイベントを出力します。

  • DELETE イベント。
  • 行のキーが古い tombstone イベント
  • 行に新しいキーを提供する INSERT イベント。

削除 イベント

ここまでで、createupdate イベントのサンプルを確認しました。次に、同じテーブルの 削除 イベントの値を見てみましょう。createupdate イベントと同様に、delete イベントの場合は、値の schema 部分は全く同じです。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "after": null,
        "source": {
            "version": "1.7.2.Final",
            "name": "server1",
            "ts_ms": 1520085153000,
            "txId": "6.28.807",
            "scn": "2122184",
            "commit_scn": "2122184",
            "snapshot": false
        },
        "op": "d",
        "ts_ms": 1532592105960
    }
}

ペイロード 部分を確認すると、create または updateイベントペイロードと比べて多くの違いがあります。

  • op フィールドの値は d になっており、この行が削除されたことを示しています。
  • before フィールドは、データベースのコミットで削除した行の状態を表しています。
  • after フィールドが null で、行が存在しなくなったことが分かります。
  • 変更された ts_msscn および txId フィールドの除き、source フィールド構造には以前と同じ値が多数あります。
  • ts_ms は、Debezium がこのイベントを処理したタイムスタンプを示します。

このイベントでは、この行の削除の処理に使用できるあらゆる種類の情報をコンシューマーに提供します。

Oracle コネクターのイベントは、Kafka ログコンパクション と連携するように設計されており、すべてのキーで最新のメッセージが保持される限り、古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、 Kafka がストレージ領域を確保できるようにします。

行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、上記の削除 イベントの値はログコンパクションで動作します。同じキーを共有する メッセージをすべて 削除するように Kafka に指示するには、メッセージの値を null に設定する必要があります。これを可能にするにはデフォルトで、Debezium Oracle コネクターは、値が null 以外で同じキーを持つ特別な 廃棄 イベントが含まれる 削除 イベントに従います。コネクタープロパティー tombstones.on.delete を設定すると、デフォルトの動作を変更できます。