4.3. データベースのレコードの削除および 削除 イベントの表示

Debezium MySQL コネクターが inventory データベースで 作成 および 更新 イベントをキャプチャーする方法を確認しました。次に、レコードの 1 つを削除し、コネクターがこれをどのようにキャプチャーするかを見てみましょう。

この手順を完了すると、削除 イベントの詳細を見つける方法と、Kafka が ログコンパクション を使用して、コンシューマーがすべてのイベントを取得できる状態で 削除 イベントの数を減らす方法を説明します。

手順

  1. MySQL コマンドラインクライアントを実行しているターミナルで、以下のステートメントを実行します。

    mysql> DELETE FROM customers WHERE id=1004;
    Query OK, 1 row affected (0.00 sec)
    注記

    上記のコマンドが外部キー制約違反で失敗する場合は、以下のステートメントを使用して、addresses テーブルから顧客アドレスの参照を削除する必要があります。

    mysql> DELETE FROM addresses WHERE customer_id=1004;
  2. kafka-console-consumer を実行しているターミナルに切り替え、2 つ の新規イベントを確認します。

    customers テーブルの行を削除することで、Debezium MySQL コネクターは 2 つの新しいイベントを生成しました。

  3. 最初の新規イベントの キー および を確認します。

    最初の新規イベントの キー の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。

    {
      "schema": {
        "type": "struct",
        "name": "dbserver1.inventory.customers.Key"
        "optional": false,
        "fields": [
          {
            "field": "id",
            "type": "int32",
            "optional": false
          }
        ]
      },
      "payload": {
        "id": 1004
      }
    }

    この キー は、これまで確認した 2 つのイベントの キー と同じです。

    最初の新規イベントの は以下のとおりです (書式を調整して読みやすくしてあります)。

    {
      "schema": {...},
      "payload": {
        "before": {  1
          "id": 1004,
          "first_name": "Anne Marie",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "after": null,  2
        "source": {  3
          "name": "1.5.4.Final",
          "name": "dbserver1",
          "server_id": 223344,
          "ts_sec": 1486501558,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 725,
          "row": 0,
          "snapshot": null,
          "thread": 3,
          "db": "inventory",
          "table": "customers"
        },
        "op": "d",  4
        "ts_ms": 1486501558315  5
      }
    }
    1
    before フィールドは、データベースのコミットで削除した行の状態を表しています。
    2
    行はもう存在しないため、after フィールドは null になります。
    3
    source フィールド構造には以前と同じ値が多数ありますが、ts_sec および pos フィールドは更新されています (他の状況では file が変更されることがあります)。
    4
    op フィールドの値は d になっており、この行が削除されたことを示しています。
    5
    ts_ms フィールドは、Debezium がこのイベントを処理したときのタイムスタンプを表しています。

    よって、このイベントは、行の削除を処理に必要な情報をコンシューマーに提供します。古い値も提供されます。これは、コンシューマーによっては削除を適切に処理するのに古い値が必要になることがあるからです。

  4. 2 つ目の新規イベントの キー および を確認します。

    2 つ目の新規イベントの は以下のとおりです (書式を調整して読みやすくしてあります)。

      {
        "schema": {
          "type": "struct",
          "name": "dbserver1.inventory.customers.Key"
          "optional": false,
          "fields": [
            {
              "field": "id",
              "type": "int32",
              "optional": false
            }
          ]
        },
        "payload": {
          "id": 1004
        }
      }

    繰り返しになりますが、この キー は、これまで確認した 3 つのイベントのキーと同じです。

    同じイベントの は以下のとおりです (書式を調整して読みやすくしてあります)。

    {
      "schema": null,
      "payload": null
    }

    Kafka が ログコンパクション に設定されている場合、トピックの後半に同じキーを持つメッセージが 1 つ以上あると、トピックから古いメッセージが削除されます。この最後のイベントには、キーと空の値があるため、tombstone (トゥームストーン) イベントと呼ばれます。これは、Kafka が同じキーを持つこれまでのメッセージをすべて削除することを意味します。これまでのメッセージが削除されても、tombstone イベントであるため、コンシューマーは最初からトピックを読み取ることができ、イベントを見逃しません。