4.3. Debezium MongoDB コネクターのデータ変更イベントの説明

Debezium MongoDB コネクターは、データを挿入、更新、または削除する各ドキュメントレベルの操作に対してデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたコレクションによって異なります。

Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。

以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}

表4.1 変更イベントの基本内容の概要

項目フィールド名説明

1

schema

最初の スキーマ フィールドはイベントキーの一部です。イベントキーの ペイロード 部分の内容を記述する Kafka Connect スキーマを指定します。つまり、最初の schema フィールドは変更されたドキュメントのキーの構造を記述します。

2

payload

最初の payload フィールドはイベントキーの一部になります。前述の schema フィールドによって記述された構造を持ち、変更されたドキュメントのキーが含まれます。

3

schema

2 つ目の スキーマ フィールドはイベント値の一部です。イベント値 のペイロード 部分の内容を記述する Kafka Connect スキーマを指定します。つまり、2 つ目の スキーマは 変更されたドキュメントの構造を記述します。通常、このスキーマには入れ子になったスキーマが含まれます。

4

payload

2 つ目の payload フィールドはイベント値の一部です。前述の schema フィールドによって記述された構造を持ち、変更されたドキュメントの実際のデータが含まれます。

デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のコレクションと同じ名前を持つトピックにストリーミングされます。トピック名 を参照してください。

警告

MongoDB コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とコレクション名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または _) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。

論理サーバー名、データベース名、またはコレクション名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。

詳細は、以下のトピックを参照してください。

4.3.1. Debezium MongoDB 変更イベントのキー

変更イベントのキーには、変更されたドキュメントのキーのスキーマと、変更されたドキュメントの実際のキーのスキーマが含まれます。特定のコレクションでは、スキーマとそれに対応するペイロードの両方に単一の id フィールドが含まれます。このフィールドの値は、MongoDB Extended JSON のシリアライゼーションの厳格モード から派生する文字列として表されるドキュメントの識別子です。

論理名が meet ment のコネクター、インベントリー データベースが含まれるレプリカセット、および以下のようなドキュメントが含まれる お客様 コレクションについて考えてみましょう。

ドキュメントの例

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}

変更イベントキーの例

顧客 コレクションへの変更をキャプチャーするすべての変更イベントには、同じイベントキースキーマがあります。顧客 コレクションに前述の定義がある限り、顧客 コレクションへの変更をキャプチャーする変更イベントのキー構造はすべて以下のようになります。JSON では、以下のようになります。

{
  "schema": { 1
    "type": "struct",
    "name": "fulfillment.inventory.customers.Key", 2
    "optional": false, 3
    "fields": [ 4
      {
        "field": "id",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": { 5
    "id": "1004"
  }
}

表4.2 変更イベントキーの説明

項目フィールド名説明

1

schema

キーのスキーマ部分は、キーの ペイロード 部分の内容を記述する Kafka Connect スキーマを指定します。

2

fulfillment.inventory.customers.Key

キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更したドキュメントのキーの構造を説明します。キースキーマ名の形式は connector-name.database- name.collection-name.Key です。この例では、以下のようになります。

  • Mee tment は、このイベントを生成したコネクターの名前です。
  • inventory は変更されたコレクションが含まれるデータベースです。
  • お客様が 更新されたドキュメントが含まれるコレクションです。

3

任意

イベントキーの payload フィールドに値が含まれる必要があるかどうかを示します。この例では、キーのペイロードに値が必要です。ドキュメントにキーがない場合、キーの payload フィールドの値は任意です。

4

fields

各フィールドの名前、タイプ、および必要かどうかなど、ペイロード で想定される各フィールドを指定します。

5

payload

この変更イベントが生成されたドキュメントのキーが含まれます。この例では、キーにはタイプ stringid フィールドが含まれ、その値は 1004 です。

この例では、整数の識別子を持つドキュメントを使用しますが、有効な MongoDB ドキュメント識別子は、ドキュメント識別子を含め、同じように動作します。ドキュメント識別子の場合、イベントキーの payload.id 値は、厳密なモードを使用する MongoDB 拡張 JSON シリアライゼーションとして更新されたドキュメントの original _id フィールドを表す文字列です。以下の表は、異なるタイプの of _id フィールドを表す例を示しています。

表4.3 イベントキーペイロードの document _id フィールドを表す例

タイプmongodb _idキーのペイロード

Integer

1234

{ "id" : "1234" }

浮動小数点 (Float)

12.34

{ "id" : "12.34" }

文字列

"1234"

{ "id" : "\"1234\"" }

Document

{ "hi" : "kafka", "nums" : [10.0, 100.0, 1000.0] }

{ "id" : "{\"hi\" : \"kafka\", \"nums\" : [10.0, 100.0, 1000.0]}" }

ObjectId

ObjectId("596e275826f08b2730779e1f")

{ "id" : "{\"$oid\" : \"596e275826f08b2730779e1f\"}" }

バイナリー

BinData("a2Fma2E=",0)

{ "id" : "{\"$binary\" : \"a2Fma2E=\", \"$type\" : \"00\"}" }