3.3. Debezium Db2 连接器数据更改事件的描述
Debezium Db2 连接器会为每行级别 INSERT、UPDATE 和 DELETE 操作生成数据更改事件。每个事件都包含一个键和值。键和值的结构取决于更改的表。
Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间而变化,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您使用 schema registry,则消费者可以从 registry 获取 schema 的模式 ID。这使得每个事件都可以自我包含。
以下框架 JSON 显示更改事件的基本四个部分。但是,如何配置在应用程序中使用的 Kafka Connect converter 决定了更改事件中的这四个部分的表示。只有当您将转换器配置为生成它时,schema 字段才处于更改事件中。同样,只有在将转换器配置为生成它时,才会发生更改事件中的事件密钥和事件有效负载。如果您使用 JSON 转换器,并将其配置为生成所有四个基本更改事件部分,则更改事件具有此结构:
{
"schema": { 1
...
},
"payload": { 2
...
},
"schema": { 3
...
},
"payload": { 4
...
},
}表 3.5. 更改事件基本内容概述
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
|
第一个 |
| 2 |
|
第一个 |
| 3 |
|
第二个 |
| 4 |
|
第二个 |
默认情况下,连接器流将事件记录更改为名称与事件原始表相同的名称的主题。如需更多信息,请参阅 主题名称。
Debezium Db2 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着,逻辑服务器名称必须以拉丁或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余的字符以及数据库和表名称中的每个字符都必须是字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效的字符,它将替换为下划线字符。
如果逻辑服务器名称、数据库名称或表名称包含无效字符,且区分另一个名称的唯一字符无效,则可能会导致意外冲突,因此替换为下划线。
另外,对于数据库、模式和表的 Db2 名称可能区分大小写。这意味着连接器可能会向同一 Kafka 主题发出多个表的事件记录。
详情包括在以下主题中:
3.3.1. 关于 Debezium db2 中的键更改事件
更改事件的密钥包含更改表的密钥的 schema,以及更改的行的实际键。在连接器创建事件时,模式及其对应有效负载都包含更改表的 PRIMARY KEY (或唯一约束)中的每个列的字段。
请考虑以下 客户 表,然后是此表的更改事件键示例。
表示例
CREATE TABLE customers ( ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(255) NOT NULL, LAST_NAME VARCHAR(255) NOT NULL, EMAIL VARCHAR(255) NOT NULL UNIQUE );
更改事件键示例
捕获对 customers 表的更改的每个更改事件都有相同的事件键模式。只要 表有以前的定义,捕获 customer 表更改的事件都有以下关键结构:在 JSON 中,它类似如下:
customers
{
"schema": { 1
"type": "struct",
"fields": [ 2
{
"type": "int32",
"optional": false,
"field": "ID"
}
],
"optional": false, 3
"name": "mydatabase.MYSCHEMA.CUSTOMERS.Key" 4
},
"payload": { 5
"ID": 1004
}
}表 3.6. 更改事件密钥的描述
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
|
键的 schema 部分指定一个 Kafka Connect 模式,它描述了密钥 |
| 2 |
|
指定 |
| 3 |
|
指明 event 键必须在其 |
| 4 |
|
定义密钥有效负载结构的 schema 名称。这个模式描述了更改的表的主键的结构。键架构名称的格式为 connector-name.database-name.table-name.
|
| 5 |
|
包含生成此更改事件的行的键。在本例中,键包含一个 |
3.3.2. 关于 Debezium Db2 更改事件中的值
更改事件中的值比键稍微复杂。与键一样,该值具有 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的 schema,包括其嵌套字段。更改创建、更新或删除数据的事件,并且具有 envelope 结构的值有效负载。
考虑用于显示更改事件键示例的相同示例:
表示例
CREATE TABLE customers ( ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(255) NOT NULL, LAST_NAME VARCHAR(255) NOT NULL, EMAIL VARCHAR(255) NOT NULL UNIQUE );
customers 表的每个更改事件的事件值部分都指定了相同的模式。事件值的有效负载因事件类型而异:
创建 事件
以下示例显示了连接器为在 customer 表中创建数据的操作生成的更改事件的值部分:
{
"schema": { 1
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "ID"
},
{
"type": "string",
"optional": false,
"field": "FIRST_NAME"
},
{
"type": "string",
"optional": false,
"field": "LAST_NAME"
},
{
"type": "string",
"optional": false,
"field": "EMAIL"
}
],
"optional": true,
"name": "mydatabase.MYSCHEMA.CUSTOMERS.Value", 2
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "ID"
},
{
"type": "string",
"optional": false,
"field": "FIRST_NAME"
},
{
"type": "string",
"optional": false,
"field": "LAST_NAME"
},
{
"type": "string",
"optional": false,
"field": "EMAIL"
}
],
"optional": true,
"name": "mydatabase.MYSCHEMA.CUSTOMERS.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "change_lsn"
},
{
"type": "string",
"optional": true,
"field": "commit_lsn"
},
],
"optional": false,
"name": "io.debezium.connector.db2.Source", 3
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "mydatabase.MYSCHEMA.CUSTOMERS.Envelope" 4
},
"payload": { 5
"before": null, 6
"after": { 7
"ID": 1005,
"FIRST_NAME": "john",
"LAST_NAME": "doe",
"EMAIL": "john.doe@example.org"
},
"source": { 8
"version": "2.1.4.Final",
"connector": "db2",
"name": "myconnector",
"ts_ms": 1559729468470,
"snapshot": false,
"db": "mydatabase",
"schema": "MYSCHEMA",
"table": "CUSTOMERS",
"change_lsn": "00000027:00000758:0003",
"commit_lsn": "00000027:00000758:0005",
},
"op": "c", 9
"ts_ms": 1559729471739 10
}
}表 3.7. 创建 事件值字段的描述
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
| 值的 schema,用于描述值有效负载的结构。每次连接器为特定表生成的更改时,更改事件的值模式都是相同的。 |
| 2 |
|
在 |
| 3 |
|
|
| 4 |
|
|
| 5 |
|
值的实际数据。这是更改事件提供的信息。 |
| 6 |
|
指定事件发生前行状态的可选字段。当 |
| 7 |
|
指定事件发生后行状态的可选字段。在本例中, |
| 8 |
|
描述事件源元数据的必需字段。
|
| 9 |
|
描述导致连接器生成事件的操作类型的必要字符串。在本例中,
|
| 10 |
|
显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新 事件
示例 customer 表中更新的更改事件值与该表的 create 事件相同。同样,update 事件值的有效负载具有相同的结构。但是,事件值 payload 在更新 事件中包含不同的值。以下是连接器在 customers 表中为更新生成的更改事件值示例:
{
"schema": { ... },
"payload": {
"before": { 1
"ID": 1005,
"FIRST_NAME": "john",
"LAST_NAME": "doe",
"EMAIL": "john.doe@example.org"
},
"after": { 2
"ID": 1005,
"FIRST_NAME": "john",
"LAST_NAME": "doe",
"EMAIL": "noreply@example.org"
},
"source": { 3
"version": "2.1.4.Final",
"connector": "db2",
"name": "myconnector",
"ts_ms": 1559729995937,
"snapshot": false,
"db": "mydatabase",
"schema": "MYSCHEMA",
"table": "CUSTOMERS",
"change_lsn": "00000027:00000ac0:0002",
"commit_lsn": "00000027:00000ac0:0007",
},
"op": "u", 4
"ts_ms": 1559729998706 5
}
}表 3.8. 更新 事件值字段的描述
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
|
指定事件发生前行状态的可选字段。在 update 事件值中, |
| 2 |
|
指定事件发生后行状态的可选字段。您可以比较 |
| 3 |
|
描述事件源元数据的必需字段。
|
| 4 |
|
描述操作类型的强制字符串。在 update 事件值中, |
| 5 |
|
显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新行的主/唯一键的列会更改行键的值。当键更改时,Debebe 会输出 三个 事件: DELETE 事件和带有行的旧键的 tombstone 事件,后跟带有行的新键的事件。
删除 事件
delete 更改事件中的值与为同一表的 create 和 update 事件相同的 schema 部分。示例 customer 表的 delete 事件中的事件值 payload 类似如下:
{
"schema": { ... },
},
"payload": {
"before": { 1
"ID": 1005,
"FIRST_NAME": "john",
"LAST_NAME": "doe",
"EMAIL": "noreply@example.org"
},
"after": null, 2
"source": { 3
"version": "2.1.4.Final",
"connector": "db2",
"name": "myconnector",
"ts_ms": 1559730445243,
"snapshot": false,
"db": "mydatabase",
"schema": "MYSCHEMA",
"table": "CUSTOMERS",
"change_lsn": "00000027:00000db0:0005",
"commit_lsn": "00000027:00000db0:0007"
},
"op": "d", 4
"ts_ms": 1559730450205 5
}
}表 3.9. 删除 事件值字段的描述
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
|
指定事件发生前行状态的可选字段。在一个 delete 事件值中, |
| 2 |
|
指定事件发生后行状态的可选字段。在 delete 事件值中, |
| 3 |
|
描述事件源元数据的必需字段。在一个 delete 事件值中,
|
| 4 |
|
描述操作类型的强制字符串。 |
| 5 |
|
显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
删除 更改事件记录为消费者提供处理此行删除所需的信息。包括了旧值,因为有些使用者可能需要它们才能正确处理删除。
Db2 连接器事件旨在用于 Kafka 日志压缩。只要每个密钥至少保留最新的消息,日志压缩就可以删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。
删除行时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,如果 Kafka 删除具有相同键的所有消息,消息值必须是 null。为了实现此目的,在 Debezium 的 Db2 连接器发出 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键有一个 null 值 。