7.3.2. Debezium PostgreSQL 変更イベントの値

変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、ネストされたフィールドを含む、payload セクションの Envelope 構造を記述するスキーマが含まれます。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。

変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。

CREATE TABLE customers (
  id SERIAL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);

このテーブルへの変更に対する変更イベントの値は、REPLICA IDENTITY 設定およびイベントが使用する操作によって異なります。

詳細は、以下を参照してください。

Replica identity

REPLICA IDENTITY は、UPDATE および DELETE イベントの論理デコードプラグインで利用可能な情報量を決定する PostgreSQL 固有のテーブルレベルの設定です。具体的には、REPLICA IDENTITY の設定は、UPDATE または DELETE イベントが発生するたびに、関係するテーブル列の以前の値で利用可能な(ある場合)を制御します。

REPLICA IDENTITY には 4 つの値を使用できます。

  • DEFAULT: デフォルトの動作では、UPDATE イベントおよび DELETE イベントには、プライマリーキーがある場合にテーブルのプライマリーキー列の以前の値が含まれます。UPDATE イベントの場合は、値が変更されたプライマリーキー列のみが存在します。

    テーブルにプライマリーキーがない場合、コネクターはそのテーブルの UPDATE イベントまたは DELETE イベントを生成しません。プライマリーキーのないテーブルの場合、コネクターは 作成 イベントのみを出力します。通常、プライマリーキーのないテーブルは、テーブルの最後にメッセージを追加するために使用されます。つまり、UPDATE イベントおよび DELETE イベントは役に立ちません。

  • NOTHING: UPDATE および DELETE 操作のイベントには、テーブル列の以前の値に関する情報は含まれません。
  • FULL: UPDATE 操作および DELETE 操作の再送信イベントには、テーブルのすべてのコラムの以前の値が含まれます。
  • INDEX index-name: UPDATE および DELETE 操作の発生したイベントには、指定されたインデックスに含まれるコラムの以前の値が含まれます。UPDATE イベントには、更新された値を持つインデックス化された列も含まれます。

作成イベント

以下の例は、顧客 テーブルのデータを作成する操作に対してコネクターによって生成される変更イベントの値の部分を示しています。

{
    "schema": { 1
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "PostgreSQL_server.inventory.customers.Value", 2
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "PostgreSQL_server.inventory.customers.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "default": false,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "xmin"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.postgresql.Source", 3
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            }
        ],
        "optional": false,
        "name": "PostgreSQL_server.inventory.customers.Envelope" 4
    },
    "payload": { 5
        "before": null, 6
        "after": { 7
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { 8
            "version": "1.5.4.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]"
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "c", 9
        "ts_ms": 1559033904863 10
    }
}

表7.4 作成 イベント値フィールドの説明

項目フィールド名説明

1

schema

値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。

2

name

schema セクションでは、各name フィールドが、値のペイロード内のフィールドのスキーマを指定します。

Postgre SQL_server.inventory.customers.Value は、ペイロードの before および after フィールドのスキーマです。このスキーマは customers テーブルに固有です。

before および after フィールドのスキーマ名はlogicalName.tableName.Value の形式で、スキーマ名がデータベースで一意になるようにします。つまり、Avro コンバーター を使用する場合、各論理ソースの各テーブルの Avro スキーマには独自の進化と履歴があります。

3

name

io.debezium.connector.postgresql.Source は、ペイロードの ソースフィールドの スキーマです。このスキーマは、PostgreSQL コネクタに固有のものです。コネクターは生成するすべてのイベントにこれを使用します。

4

name

postgresql_server.inventory.customers.Envelope は、ペイロードの全体的な構造のスキーマです。PostgreSQL_server はコネクター名、インベントリーは データベース、顧客 はテーブルです。

5

payload

値の実際のデータ。これは、変更イベントが提供する情報です。

イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。これは、JSON 表現にはメッセージのスキーマ部分とペイロード部分を含める必要があるためです。ただし、Avro コンバーター を使用すると、コネクターが Kafka トピックにストリーミングするメッセージのサイズを大幅に小さくすることができます。

6

before

イベント発生前の行の状態を指定する任意のフィールド。この例では、op フィールドが create の c になる場合、この変更イベントは新規コンテンツ用であるため、before フィールドは null になります。

注記

このフィールドを利用できるかどうかは、各テーブルの REPLICA IDENTITY 設定によって異なります。

7

after

イベント発生後の行の状態を指定する任意のフィールド。この例では、after フィールドには新しい行の idfirst_name、last_name、およびメール コラム の値が含まれます。

8

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター型および名前
  • 新しい行が含まれるデータベースおよびテーブル
  • 追加のオフセット情報の文字列化された JSON 配列。最初の値は常に最後にコミットされた LSN で、2 番目の値は常に現在の LSN になります。どちらの値も null にすることができます。
  • スキーマ名
  • イベントがスナップショットの一部であるか
  • 操作が実行されたトランザクションの ID
  • データベースログの操作のオフセット
  • データベースに変更が加えられた時点のタイムスタンプ

9

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、c は操作によって行が作成されたことを示しています。有効な値は以下のとおりです。

  • c = create
  • u = update
  • d = delete
  • r = read(スナップショットのみに適用)

10

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

更新イベント

サンプル 顧客 テーブルの更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターが 顧客 テーブルの更新に対して生成するイベントの変更イベント値の例になります。

{
    "schema": { ... },
    "payload": {
        "before": { 1
            "id": 1
        },
        "after": { 2
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { 3
            "version": "1.5.4.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "u", 4
        "ts_ms": 1465584025523  5
    }
}

表7.5 更新 イベント値フィールドの説明

項目フィールド名説明

1

before

データベースをコミットする前に行にあった値が含まれる任意のフィールド。この例では、テーブルの REPLICA IDENTITY 設定がデフォルトでは DEFAULT であるため、プライマリーキー列 id のみが存在します。+ 更新 イベントに、行のすべてのコラムの以前の値が含まれるようにするには、ALTER TABLE customers REPLICA IDENTITY FULL を実行し、customers テーブルを変更する必要があります。

2

after

イベント発生後の行の状態を指定する任意のフィールド。この例では、first_name 値は Anne Marie になります。

3

source

イベントのソースメタデータを記述する必須のフィールド。ソースフィールド 構造には create イベントと同じフィールドがありますが、一部の値が異なります。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター型および名前
  • 新しい行が含まれるデータベースおよびテーブル
  • スキーマ名
  • イベントがスナップショットの一部である場合( 更新 イベントの場合は常に false
  • 操作が実行されたトランザクションの ID
  • データベースログの操作のオフセット
  • データベースに変更が加えられた時点のタイムスタンプ

4

op

操作の型を記述する必須の文字列。更新 イベント値の op フィールドの値は u で、更新によってこの行が変更されたことを示します。

5

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

注記

行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つのイベントが Debezium によって出力されます。3 つのイベントとは、DELETE イベント、行の古いキーを持つ 廃棄 (tombstone)、およびそれに続く行の新しいキーを持つイベントです。詳細は次のセクションで説明します。

プライマリーキーの更新

行のプライマリーキーフィールドを変更する UPDATE 操作は、プライマリーキーの変更と呼ばれます。プライマリーキーの変更では、UPDATE イベントレコードを送信する代わりに、コネクターは古いキーの DELETE イベントレコードと、新しい(更新された)キーの CREATE イベントレコードを送信します。これらのイベントには通常の構造と内容があり、イベントごとにプライマリーキーの変更に関連するメッセージヘッダーがあります。

  • DELETE イベントレコードには、メッセージヘッダーとして __debezium.newkey があります。このヘッダーの値は、更新された行の新しいプライマリーキーです。
  • CREATE イベントレコードには、メッセージヘッダーとして __debezium.oldkey があります。このヘッダーの値は、更新された行にあった以前の (古い) プライマリーキーです。

削除 イベント

削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ スキーマ の部分になります。サンプル 顧客 テーブルの 削除 イベントの ペイロード 部分は以下のようになります。

{
    "schema": { ... },
    "payload": {
        "before": { 1
            "id": 1
        },
        "after": null, 2
        "source": { 3
            "version": "1.5.4.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "d", 4
        "ts_ms": 1465581902461 5
    }
}

表7.6 削除 イベント値フィールドの説明

項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の before フィールドには、データベースのコミットで削除される前に行にあった値が含まれます。

この例では、テーブルの REPLICA IDENTITY 設定が DEFAULT であるため、before フィールドにはプライマリーキー列のみが含まれます。

2

after

イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の after フィールドは null で、行が存在しないことを示します。

3

source

イベントのソースメタデータを記述する必須のフィールド。削除 イベント値では、ソースフィールドの 構造は、同じテーブルの 作成 および 更新 イベントと同じです。多くの ソースフィールド 値も同じです。削除 イベント値の the ts_ms および lsn フィールドの値や、その他の値が変更された可能性があります。ただし、削除 イベント値の ソースフィールド は同じメタデータを提供します。

  • Debezium バージョン
  • コネクター型および名前
  • 削除された行が含まれていたデータベースおよびテーブル
  • スキーマ名
  • イベントがスナップショットの一部である場合(常に 削除 イベントの場合は false )。
  • 操作が実行されたトランザクションの ID
  • データベースログの操作のオフセット
  • データベースに変更が加えられた時点のタイムスタンプ

4

op

操作の型を記述する必須の文字列。op フィールドの値は d で、この行が削除されたことを示します。

5

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

削除 変更イベントレコードは、この行の削除を処理するために必要な情報を持つコンシューマーを提供します。

警告

プライマリーキーを持たないテーブル用に生成された 削除 イベントをコンシューマーが処理できるようにするには、テーブルの REPLICA IDENTITYFULL に設定します。テーブルにプライマリーキーがなく、テーブルの REPLICA IDENTITYDEFAULT または NOTHING に設定されている場合、削除 イベントは before フィールドがありません。

PostgreSQL コネクターイベントは、Kafka のログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、 Kafka がストレージ領域を確保できるようにします。

廃棄 (tombstone) イベント

行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、PostgreSQL コネクターは、同じキー null 値を持つ特別な 廃棄イベントで 削除 イベントに従います。

切り捨て (truncate) イベント

切り捨て (truncate) 変更イベントは、テーブルが切り捨てられていることを伝えます。この場合、メッセージキーは null で、メッセージの値は以下のようになります。

{
    "schema": { ... },
    "payload": {
        "source": { 1
            "version": "1.5.4.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "t", 2
        "ts_ms": 1559033904961 3
    }
}

表7.7 切り捨て (truncate) イベント値フィールドの説明

項目フィールド名説明

1

source

イベントのソースメタデータを記述する必須のフィールド。切り捨て(truncate) イベント値の ソースフィールドの 構造は、同じテーブルの 作成更新および削除イベント と同じで、以下のメタデータを提供します。

  • Debezium バージョン
  • コネクター型および名前
  • 新しい行が含まれるデータベースおよびテーブル
  • スキーマ名
  • イベントがスナップショットの一部である場合(常に 削除 イベントの場合は false )。
  • 操作が実行されたトランザクションの ID
  • データベースログの操作のオフセット
  • データベースに変更が加えられた時点のタイムスタンプ

2

op

操作の型を記述する必須の文字列。op フィールドの値は t で、このテーブルが切り捨てされたことを示します。

3

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

single TRUNCATE ステートメントが複数のテーブルに適用される場合、切り捨てられたテーブルごとに 1 つの 切り捨て(truncate) 変更イベントレコードが出力されます。

切り捨て (truncate) イベントは、テーブル全体に加えた変更を表し、メッセージキーを持たないので、単一のパーティションを持つトピックを使用しない限り、テーブルに関する変更イベント (作成更新 など) とそのテーブルの 切り捨て (truncate) イベントの順番は保証されません。たとえば、これらのイベントが異なるパーティションから読み取られる場合、コンシューマーは 更新 イベントを 切り捨て (truncate) イベントの後でのみ受け取る可能性があります。