4.4.6.2. イベントの値の変更

変更イベントメッセージの値は、より複雑です。キーメッセージと同様に、schema セクションと payload セクションがあります。MongoDB コネクターによって生成されたすべての変更イベント値の payload セクションには、以下のフィールドが含まれる 構造 があります。

  • op は、操作のタイプを記述する文字列の値が含まれる必須フィールドです。MongoDB コネクターの値は c create(または insert)、u 更新、d 削除、および r 読み取り(スナップショットの場合)になります。
  • after はオプションのフィールドで、イベント発生 にドキュメントの状態が含まれる場合。MongoDB の oplog エントリーには、イベント 作成 のドキュメントの完全な状態のみが含まれるため、これらは after フィールドが含まれるイベントのみになります。
  • source は、イベントのソースメタデータを記述する構造が含まれる必須フィールドです。これには、MongoDB のバージョン、論理名、レプリカセットの名前、コレクションの namespace、MongoDB タイムスタンプ(およびタイムスタンプ内のイベントの ordinal)が含まれます。 MongoDB 操作の識別子(例: oplog イベントの h フィールド)、およびイベントがスナップショット中に生じた場合は初期同期フラグ。
  • ts_ms は任意で、コネクターがイベントを処理した JVM のシステムクロックを使用して(Kafka Connect タスクを実行している JVM でシステムクロックを使用)。

当然ながら、イベントメッセージ値の スキーマ 部分には、この構造と内部のネストされたフィールドを記述するスキーマが含まれます。

customers コレクションの 作成読み取り のイベント値の例を見てみましょう。

{
    "schema": {
      "type": "struct",
      "fields": [
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json",
          "version": 1,
          "field": "after"
        },
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json",
          "version": 1,
          "field": "patch"
        },
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json",
          "version": 1,
          "field": "filter"
        },
        {
          "type": "struct",
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "version"
            },
            {
              "type": "string",
              "optional": false,
              "field": "connector"
            },
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_ms"
            },
            {
              "type": "boolean",
              "optional": true,
              "default": false,
              "field": "snapshot"
            },
            {
              "type": "string",
              "optional": false,
              "field": "db"
            },
            {
              "type": "string",
              "optional": false,
              "field": "rs"
            },
            {
              "type": "string",
              "optional": false,
              "field": "collection"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "ord"
            },
            {
              "type": "int64",
              "optional": true,
              "field": "h"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.mongo.Source",
          "field": "source"
        },
        {
          "type": "string",
          "optional": true,
          "field": "op"
        },
        {
          "type": "int64",
          "optional": true,
          "field": "ts_ms"
        }
      ],
      "optional": false,
      "name": "dbserver1.inventory.customers.Envelope"
      },
    "payload": {
      "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
      "patch": null,
      "source": {
        "version": "1.1.2.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 31,
        "h": 1546547425148721999
      },
      "op": "r",
      "ts_ms": 1558965515240
    }
  }

このイベントの schema 一部を確認すると、そのコレクションに 固有 のスキーマと source 構造のスキーマ(MongoDB コネクターに固有で、すべてのイベントで再利用)がわかります。また、after 値は常に文字列で、規則によりドキュメントの JSON 表現が含まれていることに注意してください。

このイベントの payload 一部を確認する op=r と、イベント内の情報が表示されます。つまり、ドキュメントがスナップショットの一部として読み取られたこと snapshot=trueafter フィールドの値にドキュメントの JSON 文字列表現が含まれることが記述されています。

注記

これは、イベントの JSON 表現が記述する行よりもはるかに大きいことが分かります。JSON 表現にはメッセージの スキーマペイロード の部分を含める必要があるため、これは true です。

このコレクションの 更新 変更イベントの値には実際には同じ スキーマ があり、ペイロードは同じですが、異なる値を保持します。具体的には、更新イベントには after 値がなく、代わりにべき等更新操作の patch JSON 表現や、更新の選択基準を含む filter 文字列が含まれます。filter 文字列にはシャードコレクションの複数のシャードキーフィールドを含めることができます。以下に例を示します。

{
    "schema": { ... },
    "payload": {
      "op": "u",
      "ts_ms": 1465491461815,
      "patch": "{\"$set\":{\"first_name\":\"Anne Marie\"}}",
      "filter": "{\"_id\" : {\"$numberLong\" : \"1004\"}}",
      "source": {
        "version": "1.1.2.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }

これを insert イベントの値と比較すると、payload セクションにいくつかの違いが表示されます。

  • op フィールドの値は u、更新によりこのドキュメントが変更されていることを示すようになりました。
  • patch フィールドが表示され、ドキュメントへの実際の MongoDB idempotent 変更の文字列が JSON 表現されます。この例では、first_name フィールドを新しい値に設定します。
  • filter フィールドが表示され、更新に使用される MongoDB 選択基準の文字列が JSON 表現されます。
  • after フィールドが表示されなくなる
  • source フィールド構造には以前のフィールドと同じフィールドがありますが、イベントが oplog の別の位置にあるため、値は異なります。
  • ts_ms、Debezium がこのイベントを処理したタイムスタンプを示しています。
警告

patch フィールドの内容は MongoDB 自体によって提供され、その正確な形式は特定のデータベースバージョンによって異なります。したがって、MongoDB インスタンスを新規バージョンにアップグレードする際には、フォーマットに変更を加える可能性があります。

本ドキュメントのすべての例は MongoDB 3.4 から取得され、別のものを使用する場合は異なる場合があります。

注記

MongoDB の oplog でイベントを更新するには、変更 後のドキュメントのステータスの または後 にはないため、コネクターがこの情報を提供する方法はありません。ただし、イベントの 作成 または 読み取り start 状態が含まれるため、各ドキュメントの最新状態を維持し、各イベントをその状態に適用することで、ストリームのダウンストリームコンシューマーは状態を完全に調整できます。Debezium コネクターはこのような状態を維持できないため、これは実行できません。

これまでは、イベントの 作成/読み取り および 更新 の例を確認することができます。以下の例は、同じコレクションの delete イベントの値を示しています。このコレクションの delete イベントの値は全く同じ スキーマ を持ち、ペイロードは同じですが、異なる値を保持します。特に、削除イベントには after 値も値もありません patch

{
    "schema": { ... },
    "payload": {
      "op": "d",
      "ts_ms": 1465495462115,
      "filter": "{\"_id\" : {\"$numberLong\" : \"1004\"}}",
      "source": {
        "version": "1.1.2.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }

これを他のイベントの値と比較すると、payload セクションにいくつかの違いがあります。

  • op フィールド値は d、このドキュメントが削除されたことを表しています。
  • patch フィールドは表示されません。
  • after フィールドは表示されません。
  • filter フィールドが表示され、削除に使用される MongoDB 選択基準の文字列が JSON 表現されます。
  • source フィールド構造には以前のフィールドと同じフィールドがありますが、イベントが oplog の別の位置にあるため、値は異なります。
  • ts_ms、Debezium がこのイベントを処理したタイムスタンプを示しています。

MongoDB コネクターは、他の種類のイベントを提供します。各 削除 イベントの後に、削除 イベントと同じキーを持つ tombstone イベントが続きますが、null 値が設定されます。これにより、Kafka には ログ圧縮 メカニズムを実行し、そのキーで すべて のメッセージを削除するのに必要な情報が提供されます。

注記

すべての MongoDB コネクターイベントは、Kafka ログの圧縮 と連携するように設計されています。これにより、すべてのキーの少なくとも最新のメッセージが保持されていれば、古いメッセージを削除できるようになります。これは、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できる Kafka でストレージ領域を回収する方法です。

一意に識別されたドキュメントの MongoDB コネクターイベントはすべて全く同じキーを持ち、最新のイベントのみが保持される Kafka にシグナルします。tombstone イベントは、同じ鍵を持つ すべて のメッセージが削除可能であることを Kafka に通知します。