3.3. 删除数据库中的记录并查看 删除 事件
现在您已经了解了 Debezium MySQL 连接器如何在 inventory 数据库中捕获 创建或更新 事件,现在您将删除其中一个记录并查看连接器如何捕获它。
通过完成这个步骤,您将了解如何查找有关 删除 事件的详情,以及 Kafka 如何使用 日志压缩 来减少 删除事件的数量,同时仍然启用使用者来获取所有事件。
流程
在运行 MySQL 命令行客户端的终端中,运行以下语句:
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)
注意如果上述命令失败并显示外键约束,则必须使用以下声明从地址表中删除客户 地址 的引用:
mysql> DELETE FROM addresses WHERE customer_id=1004;
切换到运行
kafka-console-consumer的终端,以查看 两个新 事件。通过删除
客户表中的一行,Debezium MySQL 连接器会生成两个新事件。检查第一个新事件的键和值。
以下是第一个新事件 的密钥 详情(便于阅读):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }此密钥 与您在前两个事件中的键相同。
以下是第一个新事件 的值 (格式化以便便于阅读):
{ "schema": {...}, "payload": { "before": { 1 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": null, 2 "source": { 3 "name": "1.9.5.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501558, "gtid": null, "file": "mysql-bin.000003", "pos": 725, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "d", 4 "ts_ms": 1486501558315 5 } }因此,此事件提供了一个消费者,其中包含需要处理行删除所需的信息。另外还会提供旧的值,因为有些用户可能需要它们来正确处理删除。
检查第二个新事件的键和值。
以下是第二个新事件 的密钥 (格式用于可读性):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }再次说明,这个密钥 与您在前面的三个事件中完全相同。
以下是同一事件 的值 (格式化以便便于阅读):
{ "schema": null, "payload": null }如果将 Kafka 设置为 日志压缩,则如果以后在同一键中至少有一个信息,它将从主题中删除旧的信息。这个最后一个事件称为 tombstone 事件,因为它有一个键和一个空值。这意味着 Kafka 将删除所有具有相同键的之前的信息。尽管删除了之前的消息,但 tombstone 事件表示消费者仍然可以从开始读取该主题,而不会丢失任何事件。