4.3.5.2. 変更イベント値

message キーと同様に、変更イベントメッセージの値には schema セクションと payload セクションがあります。SQL Server コネクターによって生成されたすべての変更イベント値の payload セクションには、以下のフィールドを含む エンベロープ 構造があります。

  • op は、操作のタイプを記述する文字列値が含まれる必須フィールドです。SQL Server コネクターの値は、c (作成または挿入)、u (更新)、d (削除)、および r (読み取り、スナップショットの場合)です。
  • before は任意のフィールドであり、存在する場合はイベント発生 の行の状態が含まれます。この構造は、server1.dbo.customers.Value Kafka Connect スキーマによって記述され、server1 コネクターは dbo.customers テーブルのすべての行に使用します。
  • after はオプションのフィールドで、存在する場合はイベント発生 の行の状態が含まれます。この構造は、の に で使用される同じ server1.dbo.customers.Value Kafka Connect スキーマによって記述されます。
  • source は、イベントのソースメタデータを記述する構造が含まれる必須のフィールドです。SQL Server の場合は、Debezium バージョン、コネクター名、イベントが進行中のスナップショットの一部であるかどうか、コミット LSN (スナップショット中ではない)、変更が発生した変更、データベース、スキーマ、テーブルの LSN が含まれます。 ソースデータベースでレコードが変更された時点を表すタイムスタンプ(スナップショット中は、スナップショットの時点になります)。

    また、ストリーミング中にフィールド event_serial_no が存在します。これは、同じコミットおよび変更 LSN を持つイベントを区別するために使用されます。値が 1 と異なる場合は、主に 2 つの状況を確認できます。

    • 更新 イベントの値は 2 に設定されます。これは、更新が SQL Server の CDC 変更テーブルに 2 つのイベントを生成するためです(ソースドキュメント)。最初の値には古い値が含まれ、2 番目の値には新しい値が含まれます。そのため、最初の値は破棄され、2 番目の値は Debezium 変更イベントの作成に使用されます。
    • プライマリーキーが更新されると、SQL Server は 2 つのレコードを出力します。delete は古いプライマリーキー値を持つレコードを削除し、新しいプライマリーキーでレコードを作成するために 挿入 します。どちらの操作も同じコミットおよび変更 LSN を共有します。イベント番号は 12 です。
  • ts_ms は任意です。存在する場合は、コネクターがイベントを処理した時間(Kafka Connect タスクを実行している JVM のシステムクロックを使用)が含まれます。

当然ながら、イベントメッセージの値のschema 部分には、このエンベロープ構造と、その中のネストされたフィールドを記述するスキーマが含まれます。

4.3.5.2.1. 作成 イベント

customers テーブルの create イベント値を見てみましょう。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "event_serial_no"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.sqlserver.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "server1.dbo.customers.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "source": {
      "version": "1.0.3.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729468470,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000758:0003",
      "commit_lsn": "00000027:00000758:0005",
      "event_serial_no": "1"
    },
    "op": "c",
    "ts_ms": 1559729471739
  }
}

このイベントの スキーマ 部分を確認すると、エンベロープ の スキーマ、ソース 構造のスキーマ(SQL Server コネクターに固有ですべてのイベントで再利用)、before および after フィールドのテーブル固有のスキーマを確認できます。

注記

before および after フィールドのスキーマ名は logicalName.schemaName の形式であるため、.Value は他のすべてのテーブルのスキーマから完全に独立しています。つまり、Avro コンバーター を使用する場合、各 論理ソース各テーブル の Avro スキーマには独自の進化と履歴があります。

このイベントの ペイロード 部分を確認すると、イベント内の情報、行が作成されたことを説明するものが表示されます( op=c以降)、after フィールドの値には新しい挿入された行の ID、first_namelast_name、および email 列の値が含まれます。

注記

イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。JSON 表現にはメッセージのスキーマ 部分と ペイロード 部分を含める必要があるため、これは True です。を使用して、Kafka トピックに書き込まれた実際のメッセージのサイズを大幅に小さくすることもできます。