3.3. 删除数据库中的记录并查看 删除 事件

现在您已经了解了 Debezium MySQL 连接器如何在 inventory 数据库中捕获 创建或更新 事件,现在您将删除其中一个记录并查看连接器如何捕获它。

通过完成这个步骤,您将了解如何查找有关 删除 事件的详情,以及 Kafka 如何使用 日志压缩 来减少 删除事件的数量,同时仍然启用使用者来获取所有事件。

流程

  1. 在运行 MySQL 命令行客户端的终端中,运行以下语句:

    mysql> DELETE FROM customers WHERE id=1004;
    Query OK, 1 row affected (0.00 sec)
    注意

    如果上述命令失败并显示外键约束,则必须使用以下声明从地址表中删除客户 地址 的引用:

    mysql> DELETE FROM addresses WHERE customer_id=1004;
  2. 切换到运行 kafka-console-consumer 的终端,以查看 两个新 事件。

    通过删除 客户 表中的一行,Debezium MySQL 连接器会生成两个新事件。

  3. 检查第一个新事件的键和值。

    以下是第一个新事件 的密钥 详情(便于阅读):

    {
      "schema": {
        "type": "struct",
        "name": "dbserver1.inventory.customers.Key"
        "optional": false,
        "fields": [
          {
            "field": "id",
            "type": "int32",
            "optional": false
          }
        ]
      },
      "payload": {
        "id": 1004
      }
    }

    此密钥 与您在前两个事件中的键相同。

    以下是第一个新事件 的值 (格式化以便便于阅读):

    {
      "schema": {...},
      "payload": {
        "before": {  1
          "id": 1004,
          "first_name": "Anne Marie",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "after": null,  2
        "source": {  3
          "name": "1.9.5.Final",
          "name": "dbserver1",
          "server_id": 223344,
          "ts_sec": 1486501558,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 725,
          "row": 0,
          "snapshot": null,
          "thread": 3,
          "db": "inventory",
          "table": "customers"
        },
        "op": "d",  4
        "ts_ms": 1486501558315  5
      }
    }
    1
    before 字段现在具有数据库提交删除的行的状态。
    2
    after 字段为 null,因为行不再存在。
    3
    source 字段结构有许多与以前相同的值,但 ts_secpos 字段已更改(在其他情况下,文件 可能已更改)。
    4
    op 字段值现在是 d,表示删除此行。
    5
    ts_ms 字段显示 Debezium 处理此事件的时间戳。

    因此,此事件提供了一个消费者,其中包含需要处理行删除所需的信息。另外还会提供旧的值,因为有些用户可能需要它们来正确处理删除。

  4. 检查第二个新事件的键和值。

    以下是第二个新事件 的密钥 (格式用于可读性):

      {
        "schema": {
          "type": "struct",
          "name": "dbserver1.inventory.customers.Key"
          "optional": false,
          "fields": [
            {
              "field": "id",
              "type": "int32",
              "optional": false
            }
          ]
        },
        "payload": {
          "id": 1004
        }
      }

    再次说明,这个密钥 与您在前面的三个事件中完全相同。

    以下是同一事件 的值 (格式化以便便于阅读):

    {
      "schema": null,
      "payload": null
    }

    如果将 Kafka 设置为 日志压缩,则如果以后在同一键中至少有一个信息,它将从主题中删除旧的信息。这个最后一个事件称为 tombstone 事件,因为它有一个键和一个空值。这意味着 Kafka 将删除所有具有相同键的之前的信息。尽管删除了之前的消息,但 tombstone 事件表示消费者仍然可以从开始读取该主题,而不会丢失任何事件。