4.3.2. Debezium MongoDB 変更イベントの値

変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、入れ子のフィールドを含む、 Envelope セクションの payload 構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。

変更イベントキーの例を紹介するために使用した、同じサンプルドキュメントについて考えてみましょう。

ドキュメントの例

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}

このドキュメントへの変更に対する変更イベントの値部分には、以下の各イベントタイプについて記述されています。

作成 イベント

以下の例は、customers コレクションにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。

{
    "schema": { 1
      "type": "struct",
      "fields": [
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json", 2
          "version": 1,
          "field": "after"
        },
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json",
          "version": 1,
          "field": "patch"
        },
        {
          "type": "struct",
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "version"
            },
            {
              "type": "string",
              "optional": false,
              "field": "connector"
            },
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_ms"
            },
            {
              "type": "boolean",
              "optional": true,
              "default": false,
              "field": "snapshot"
            },
            {
              "type": "string",
              "optional": false,
              "field": "db"
            },
            {
              "type": "string",
              "optional": false,
              "field": "rs"
            },
            {
              "type": "string",
              "optional": false,
              "field": "collection"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "ord"
            },
            {
              "type": "int64",
              "optional": true,
              "field": "h"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.mongo.Source", 3
          "field": "source"
        },
        {
          "type": "string",
          "optional": true,
          "field": "op"
        },
        {
          "type": "int64",
          "optional": true,
          "field": "ts_ms"
        }
      ],
      "optional": false,
      "name": "dbserver1.inventory.customers.Envelope" 4
      },
    "payload": { 5
      "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", 6
      "patch": null,
      "source": { 7
        "version": "1.7.2.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 31,
        "h": 1546547425148721999
      },
      "op": "c", 8
      "ts_ms": 1558965515240 9
    }
  }

表4.4 作成 イベント値フィールドの説明

項目フィールド名説明

1

schema

値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のコレクションに生成するすべての変更イベントで同じになります。

2

name

schema セクションで、各 name フィールドは、値のペイロードのフィールドに対するスキーマを指定します。

io.debezium.data.Json はペイロードの afterpatch、および filter フィールドのスキーマです。このスキーマは customers コレクションに固有です。作成 イベントは、after フィールドが含まれる唯一のイベントです。更新 イベントには、filter フィールドと patchフィールドが含まれます。delete イベントには filter フィールドが含まれますが、after フィールドや patch フィールドは含まれません。

3

name

io.debezium.connector.mongo.Source はペイロードの sourceフィールドのスキーマです。このスキーマは MongoDB コネクターに固有です。コネクターは生成するすべてのイベントにこれを使用します。

4

name

dbserver1.inventory.customers.Envelope は、ペイロードの全体的な構造のスキーマで、dbserver1 はコネクター名、inventory はデータベース、customers はコレクションを指します。このスキーマはコレクションに固有です。

5

payload

値の実際のデータ。これは、変更イベントが提供する情報です。

イベントの JSON 表現はそれが記述するドキュメントよりもはるかに大きいように見えることがあります。これは、JSON 表現にはメッセージのスキーマ部分とペイロード部分を含める必要があるためです。しかし、Avro コンバーター を使用すると、コネクターが Kafka トピックにストリーミングするメッセージのサイズを大幅に小さくすることができます。

6

after

イベント発生後のドキュメントの状態を指定する任意のフィールド。この例では、after フィールドには新しいドキュメントの _idfirst_namelast_name、および email フィールドの値が含まれます。after の値は常に文字列です。慣例により、ドキュメントの JSON 表現が含まれます。MongoDB の oplog エントリーには、作成イベントのみにドキュメントの完全な状態が含まれます。つまり、作成 イベントは after フィールドが含まれる唯一のイベントです。

7

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。

  • Debezium バージョン。
  • イベントを生成したコネクターの名前。
  • 生成されたイベントの namespace を形成し、コネクターが書き込む Kafka トピック名で使用される、MongoDB レプリカセットの論理名。
  • 新しいドキュメントが含まれるコレクションおよびデータベースの名前。
  • イベントがスナップショットの一部である場合。
  • データベースで変更が加えられた時点のタイムスタンプおよびタイムスタンプ内のイベントの順序。
  • MongoDB 操作の一意の識別子。これは MongoDB のバージョンに依存します。これは、oplog イベントの h フィールド、または oplog イベントの lsid および txnNumber フィールドを表す stxnid という名前のフィールドです。

8

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、c は操作によってドキュメントが作成されたことを示しています。有効な値は以下のとおりです。

  • c = create
  • u = update
  • d = delete
  • r = read (読み取り、スナップショットのみに適用)

9

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

更新イベント

サンプル customers コレクションにある更新の変更イベントの値には、そのコレクションの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。更新 イベントには after の値はありません。その代わりに、以下の 2 つのフィールドがあります。

  • patch は、べき等更新操作の JSON 表現が含まれる文字列フィールドです。
  • filter は、更新の選択基準の JSON 表現が含まれる文字列フィールドです。filter 文字列には、シャード化コレクションの複数のシャードキーフィールドを含めることができます。

以下は、コネクターによって customers コレクションでの更新に生成されるイベントの変更イベント値の例になります。

{
    "schema": { ... },
    "payload": {
      "op": "u", 1
      "ts_ms": 1465491461815, 2
      "patch": "{\"$set\":{\"first_name\":\"Anne Marie\"}}", 3
      "filter": "{\"_id\" : {\"$numberLong\" : \"1004\"}}", 4
      "source": { 5
        "version": "1.7.2.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }

表4.5 更新 イベント値フィールドの説明

項目フィールド名説明

1

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、u は操作によってドキュメントが更新されたことを示しています。

2

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

3

patch

ドキュメントへの実際の MongoDB のべき等変更の JSON 文字列表現が含まれます。この例では、更新で first_name フィールドを新しい値に変更されています。

更新 イベント値には after フィールドが含まれません。

4

filter

更新するドキュメントの特定に使用された MongoDB 選択基準の JSON 文字列表現が含まれます。

5

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。

  • Debezium バージョン。
  • イベントを生成したコネクターの名前。
  • 生成されたイベントの namespace を形成し、コネクターが書き込む Kafka トピック名で使用される、MongoDB レプリカセットの論理名。
  • 更新されたドキュメントが含まれるコレクションおよびデータベースの名前。
  • イベントがスナップショットの一部である場合。
  • データベースで変更が加えられた時点のタイムスタンプおよびタイムスタンプ内のイベントの順序。
  • MongoDB 操作の一意の識別子。これは MongoDB のバージョンに依存します。これは、oplog イベントの h フィールド、または oplog イベントの lsid および txnNumber フィールドを表す stxnid という名前のフィールドです。
警告

Debezium 変更イベントでは、MongoDB は patch フィールドの内容を提供します。このフィールドの形式は、MongoDB データベースのバージョンによって異なります。したがって、新しい MongoDB データベースバージョンにアップグレードする場合は、形式が変更された可能性があるため注意してください。本書のサンプルは、MongoDB 3.4 から取得したため、ご使用のアプリケーションではイベントの形式が異なる場合があります。

注記

MongoDB の oplog では、更新 イベントには変更されたドキュメントの または の状態は含まれません。そのため、Debezium コネクターがこの情報を提供することはできません。ただし、Debezium コネクターは 作成 および 読み取り イベントでドキュメントの開始状態を提供します。ストリームのダウンストリームのコンシューマーは、ドキュメントごとに最新状態を維持し、新しいイベントの状態を保存された状態に比較することで、ドキュメント状態を再構築できます。Debezium コネクターはこの状態を維持できません。

削除 イベント

delete change イベントの値は、createupdate と同じ schema 部分を持ちます。delete イベントの payload 部分には、同じコレクションの 作成更新 イベントとは異なる値が含まれます。特に、 削除 イベントには after の値や patch の値は含まれません。以下は、customers コレクションのドキュメントの 削除 イベントの例になります。

{
    "schema": { ... },
    "payload": {
      "op": "d", 1
      "ts_ms": 1465495462115, 2
      "filter": "{\"_id\" : {\"$numberLong\" : \"1004\"}}", 3
      "source": { 4
        "version": "1.7.2.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }

表4.6 削除 イベント値フィールドの説明

項目フィールド名説明

1

op

操作の型を記述する必須の文字列。op フィールドの値は d で、ドキュメントが削除されたことを示します。

2

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

3

filter

削除するドキュメントの特定に使用された MongoDB 選択基準の JSON 文字列表現が含まれます。

4

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 または 更新 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。

  • Debezium バージョン。
  • イベントを生成したコネクターの名前。
  • 生成されたイベントの namespace を形成し、コネクターが書き込む Kafka トピック名で使用される、MongoDB レプリカセットの論理名。
  • 削除されたドキュメントが含まれたコレクションおよびデータベースの名前。
  • イベントがスナップショットの一部である場合。
  • データベースで変更が加えられた時点のタイムスタンプおよびタイムスタンプ内のイベントの順序。
  • MongoDB 操作の一意の識別子。これは MongoDB のバージョンに依存します。これは、oplog イベントの h フィールド、または oplog イベントの lsid および txnNumber フィールドを表す stxnid という名前のフィールドです。

MongoDB コネクターイベントは、Kafka ログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。

廃棄 (tombstone) イベント

一意に識別ドキュメントの MongoDB コネクターイベントはすべて同じキーを持ちます。ドキュメントが削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka がそのキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium の MongoDB コネクターは 削除 イベントを出力した後に、null 値以外で同じキーを持つ特別な廃棄 (tombstone) イベントを出力します。tombstone イベントは、同じキーを持つすべてのメッセージを削除できることを Kafka に通知します。