3.4.6. イベント

MongoDB コネクターによって生成されたすべてのデータ変更イベントにはキーと値があります。

注記

Kafka 0.10 以降、Kafka はオプションでメッセージキーで記録でき、メッセージが作成(プロデューサーによって記録)された タイムスタンプ、または Kafka によってログに書き込まれたタイムスタンプを値として記録できます。

Debezium および Kafka Connect はイベント メッセージの継続的なストリームを中心に設計されており、これらの イベントのソースが構造で変更された場合や、コネクターが改善または変更される場合に、これらのイベントの構造が時間とともに変更される可能性があります。これは、コンシューマーが処理するのが難しい場合があるため、Kafka Connect が非常に簡単に各イベントの自己完結性を持たせることができます。すべてのメッセージキーと値には、スキーマペイロード の 2 つの部分で設定されます。スキーマはペイロードの構造を記述しますが、ペイロードには実際のデータが含まれます。

3.4.6.1. 変更イベントのキー

特定のコレクションでは、変更イベントのキーには単一の id フィールドが含まれます。この値は、文字列として表されるドキュメントの識別子 で、厳密なモードの MongoDB 拡張 JSON シリアライゼーション から派生します。論理名が fulfillment のコネクター、inventory データベースを含むレプリカセットと、以下のようなドキュメントが含まれる customers コレクションについて考えてみましょう。

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}

customers コレクションのすべての変更イベントは、JSON 内の同じキー構造を特長としています。

{
  "schema": {
    "type": "struct",
    "name": "fulfillment.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": "1004"
  }
}

キーの スキーマ 部分には、ペイロード部分の内容を記述する Kafka Connect スキーマが含まれます。この場合、ペイロード 値は任意ではなく、fulfillment.inventory.customers.Key という名前のスキーマによって定義された構造であり、型 stringid という名前の必須フィールドが 1 つあります。キーの payload フィールドの値を確認すると、id フィールドが 1 つあり、値は整数 1004 を含む文字列で構造(JSON は単なるオブジェクト)であることがわかります。

この例では、整数識別子でドキュメントを使用していますが、有効な MongoDB ドキュメント識別子(ドキュメントを含む)は機能します。ペイロードの id フィールドの値は、元のドキュメントの _id フィールドの MongoDB 拡張 JSON シリアライゼーション(制限モード)を表す文字列になります。以下の例は、異なるタイプの _id フィールドがイベントキーのペイロードとしてエンコードされる方法を示しています。

タイプMongoDB _id の値キーのペイロード

Integer

1234

{ "id" : "1234" }

浮動小数点 (Float)

12.34

{ "id" : "12.34" }

文字列

"1234"

{ "id" : "\"1234\"" }

Document

{ "hi" : "kafka", "nums" : [10.0, 100.0, 1000.0] }

{ "id" : "{\"hi\" : \"kafka\", \"nums\" : [10.0, 100.0, 1000.0]}" }

ObjectId

ObjectId("596e275826f08b2730779e1f")

{ "id" : "{\"$oid\" : \"596e275826f08b2730779e1f\"}" }

バイナリー

BinData("a2Fma2E=",0)

{ "id" : "{\"$binary\" : \"a2Fma2E=\", \"$type\" : \"00\"}" }