8.2.5.2. Debezium SQL Server 変更イベントの値

変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、ネストされたフィールドを含む、payload セクションの Envelope 構造を記述するスキーマが含まれます。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。

変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。

CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);

このテーブルへの変更に対する変更イベントの値部分には、以下の各イベント型について記述されています。

作成イベント

以下の例は、顧客 テーブルのデータを作成する操作に対してコネクターによって生成される変更イベントの値の部分を示しています。

{
  "schema": { 1
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.customers.Value", 2
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "event_serial_no"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.sqlserver.Source", 3
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "server1.dbo.customers.Envelope" 4
  },
  "payload": { 5
    "before": null, 6
    "after": { 7
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "source": { 8
      "version": "1.5.4.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729468470,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000758:0003",
      "commit_lsn": "00000027:00000758:0005",
      "event_serial_no": "1"
    },
    "op": "c", 9
    "ts_ms": 1559729471739 10
  }
}

表8.4 作成 イベント値フィールドの説明

項目フィールド名説明

1

schema

値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。

2

name

スキーマ セクションで、各 name フィールドは、値のペイロードのフィールドのスキーマを指定します。

server1.dbo.customers.Value はペイロードのbefore および after フィールドのスキーマです。このスキーマは customers テーブルに固有です。

before および after フィールドのスキーマ名はlogicalName.database-schemaName.tableName.Value の形式を取るので、スキーマ名がデータベースで一意になるようにします。つまり、Avro コンバーター を使用する場合、各論理ソースの各テーブルの Avro スキーマには独自の進化と履歴があります。

3

name

io.debezium.connector.sqlserver.Source は、ペイロードの ソースフィールドの スキーマです。このスキーマは、SQL Server コネクターに固有です。コネクターは生成するすべてのイベントにこれを使用します。

4

name

server1.dbo.customers.Envelope は、ペイロードの全体的な構造のスキーマです。server1 はコネクター名、db o はデータベーススキーマ名、顧客 はテーブルです。

5

payload

値の実際のデータ。これは、変更イベントが提供する情報です。

イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。これは、JSON 表現にはメッセージのスキーマ部分とペイロード部分を含める必要があるためです。ただし、Avro コンバーター を使用すると、コネクターが Kafka トピックにストリーミングするメッセージのサイズを大幅に小さくすることができます。

6

before

イベント発生前の行の状態を指定する任意のフィールド。この例では、op フィールドが create の c になる場合、この変更イベントは新規コンテンツ用であるため、before フィールドは null になります。

7

after

イベント発生後の行の状態を指定する任意のフィールド。この例では、after フィールドには新しい行の idfirst_name、last_name、およびメール コラム の値が含まれます。

8

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター型および名前
  • データベースおよびスキーマ名
  • データベースに変更が加えられた時点のタイムスタンプ
  • イベントがスナップショットの一部であるか
  • 新しい行が含まれるテーブルの名前
  • サーバーログオフセット

9

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、c は操作によって行が作成されたことを示しています。有効な値は以下のとおりです。

  • c = create
  • u = update
  • d = delete
  • r = read(スナップショットのみに適用)

10

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。イベントメッセージエンベロープでは、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースにコミットされた時刻を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

更新イベント

サンプル 顧客 テーブルの更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターが 顧客 テーブルの更新に対して生成するイベントの変更イベント値の例になります。

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "after": { 2
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "noreply@example.org"
    },
    "source": { 3
      "version": "1.5.4.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729995937,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000ac0:0002",
      "commit_lsn": "00000027:00000ac0:0007",
      "event_serial_no": "2"
    },
    "op": "u", 4
    "ts_ms": 1559729998706  5
  }
}

表8.5 更新 イベント値フィールドの説明

項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。更新 イベント値の before フィールドには、データベースのコミット前に各テーブル列のフィールドと、その列にあった値が含まれます。この例では、email 値は john.doe@example.org です

2

after

イベント発生後の行の状態を指定する任意のフィールド。構造 の前後に 比較して、この行への更新内容を判断できます。この例では、メール の値は noreply@example.org になります。

3

source

イベントのソースメタデータを記述する必須のフィールド。ソースフィールド 構造には create イベントと同じフィールドがありますが、一部の値が異なります。たとえば、更新 イベントサンプルのオフセットは異なります。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター型および名前
  • データベースおよびスキーマ名
  • データベースに変更が加えられた時点のタイムスタンプ
  • イベントがスナップショットの一部であるか
  • 新しい行が含まれるテーブルの名前
  • サーバーログオフセット

event_serial_no フィールドは、同じコミットおよび変更 LSN を持つイベントを区別します。このフィールドの値が 1 以外である場合の典型的な状況:

  • 更新によって SQL Server の CDC 変更テーブルに 2 つのイベントが生成されるため、更新 イベントの値は 2 に設定されています(詳細はソースドキュメントを参照してください)。最初のイベントには古い値が含まれ、2 番目のイベントには新しい値が含まれます。コネクターは最初のイベントの値を使用して 2 つ目のイベントを作成します。コネクターは最初のイベントを破棄します。
  • プライマリーキーが更新されると、SQL Server は 2 つのイベントを生成します。古いプライマリーキーを持つレコードを削除するための 削除 イベントと、新しいプライマリーキーを持つレコードを追加するための 作成 イベント。どちらの操作も同じコミットおよび変更 LSN を共有します。イベント番号はそれぞれ 12 です。

4

op

操作の型を記述する必須の文字列。更新 イベント値の op フィールドの値は u で、更新によってこの行が変更されたことを示します。

5

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。イベントメッセージエンベロープでは、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースにコミットされた時刻を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

注記

行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つ のイベントが Debezium によって出力されます。3 つのイベントとは、削除 イベント、行の古いキーを 持つ 廃棄(tombstone) イベント、および行の新しいキーを持つ 作成 イベントです。

削除 イベント

削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ スキーマ の部分になります。サンプル 顧客 テーブルの 削除 イベントの ペイロード 部分は以下のようになります。

{
  "schema": { ... },
  },
  "payload": {
    "before": { <>
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "noreply@example.org"
    },
    "after": null, 1
    "source": { 2
      "version": "1.5.4.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559730445243,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000db0:0005",
      "commit_lsn": "00000027:00000db0:0007",
      "event_serial_no": "1"
    },
    "op": "d", 3
    "ts_ms": 1559730450205 4
  }
}

表8.6 削除 イベント値フィールドの説明

項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の before フィールドには、データベースのコミットで削除される前に行にあった値が含まれます。

2

after

イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の after フィールドは null で、行が存在しないことを示します。

3

source

イベントのソースメタデータを記述する必須のフィールド。削除 イベント値では、ソースフィールドの 構造は、同じテーブルの 作成 および 更新 イベントと同じです。多くの ソースフィールド 値も同じです。削除 イベント値の the ts_ms および pos フィールドの値や、その他の値が変更された可能性があります。ただし、削除 イベント値の ソースフィールド は同じメタデータを提供します。

  • Debezium バージョン
  • コネクター型および名前
  • データベースおよびスキーマ名
  • データベースに変更が加えられた時点のタイムスタンプ
  • イベントがスナップショットの一部であるか
  • 新しい行が含まれるテーブルの名前
  • サーバーログオフセット

4

op

操作の型を記述する必須の文字列。op フィールドの値は d で、この行が削除されたことを示します。

5

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。イベントメッセージエンベロープでは、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

SQL Server コネクターイベントは、Kafka ログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、 Kafka がストレージ領域を確保できるようにします。

廃棄 (tombstone) イベント

行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium の SQL Server コネクターは 削除 イベントを出力した後に 同じキーと null 値を持つ特別な廃棄(tombstone)イベントを出力します。