11.3. 以 CloudEvents 格式发送 Debezium 更改事件记录
CloudEvents 是以常见方式描述事件数据的规范。它的目标是在服务、平台和系统之间提供互操作性。Debezium 可让您配置 MongoDB、MySQL、PostgreSQL 或 SQL Server 连接器,以发出符合 CloudEvents 规格的更改事件记录。
以 CloudEvents 格式发出更改事件记录是一项技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围。
CloudEvents 规格定义:
- 一组标准化事件属性
- 定义自定义属性的规则
- 将事件格式映射到序列化表示的编码规则,如 JSON 或 Avro
- 用于传输层(如 Apache Kafka、HTTP 或 AMQP)的协议绑定
要配置 Debezium 连接器来发出符合 CloudEvents 规格的更改事件记录,Debezium 提供了 io.debezium.converters.CloudEventsConverter,它是一个 Kafka Connect 消息转换器。
目前,只支持结构化映射模式。CloudEvents 更改事件 envelope 可以是 JSON 或 Avro,每个 envelope 类型支持 JSON 或 Avro 作为 数据格式。预计未来的 Debezium 版本将支持二进制映射模式。
有关以 CloudEvents 格式发出更改事件的信息,如下所示:
有关使用 Avro 的详情,请参考:
11.3.1. CloudEvents 格式的 Debezium 更改事件记录示例
以下示例显示了 PostgreSQL 连接器提供的 CloudEvents 更改事件记录是什么。在本例中,PostgreSQL 连接器被配置为使用 JSON 作为 CloudEvents 格式 envelope,以及 数据格式。
{
"id" : "name:test_server;lsn:29274832;txId:565", 1
"source" : "/debezium/postgresql/test_server", 2
"specversion" : "1.0", 3
"type" : "io.debezium.postgresql.datachangeevent", 4
"time" : "2020-01-13T13:55:39.738Z", 5
"datacontenttype" : "application/json", 6
"iodebeziumop" : "r", 7
"iodebeziumversion" : "2.1.4.Final", 8
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578923739738",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumtxId" : "565",
"iodebeziumlsn" : "29274832",
"iodebeziumxmin" : null,
"iodebeziumtxid": "565", 9
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : { 10
"before" : null,
"after" : {
"pk" : 1,
"name" : "Bob"
}
}
}- 1 1 1
- 连接器根据更改事件的内容为更改事件生成的唯一 ID。
- 2 2 2
- 事件源,它是连接器配置中
topic.prefix属性指定的数据库的逻辑名称。 - 3 3 3
- CloudEvents 规格版本。
- 4 4 4
- 生成更改事件的连接器类型。此字段的格式是
io.debezium.CONNECTOR_TYPE.datachangeevent。CONNECTOR_TYPE的值为mongodb、mysql、postgresql或sqlserver。 - 5 5
- 源数据库中更改的时间。
- 6
- 描述
data属性的内容类型,本例中为 JSON。唯一的替代方案是 Avro。 - 7
- 操作标识符。可能的值有
r用于 read,c用于 create、u用于 update,或d用于 delete。 - 8
- 从 Debezium 更改事件已知的所有
源属性都通过使用属性名称的iodebezium前缀映射到 CloudEvents 扩展属性。 - 9
- 在连接器中启用时,从 Debezium 更改事件已知的每个
事务属性都映射到 CloudEvents 扩展属性,方法是将iodebeziumtx前缀用于属性名称。 - 10
- 实际数据更改本身。根据操作和连接器,数据可能会在、
after和/或patch字段之前包含。
以下示例还显示了 PostgreSQL 连接器提供的 CloudEvents 更改事件记录是什么。在本例中,PostgreSQL 连接器再次配置为使用 JSON 作为 CloudEvents 格式 envelope,但此时连接器被配置为使用 Avro 作为 数据格式。
{
"id" : "name:test_server;lsn:33227720;txId:578",
"source" : "/debezium/postgresql/test_server",
"specversion" : "1.0",
"type" : "io.debezium.postgresql.datachangeevent",
"time" : "2020-01-13T14:04:18.597Z",
"datacontenttype" : "application/avro", 1
"dataschema" : "http://my-registry/schemas/ids/1", 2
"iodebeziumop" : "r",
"iodebeziumversion" : "2.1.4.Final",
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578924258597",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumtxId" : "578",
"iodebeziumlsn" : "33227720",
"iodebeziumxmin" : null,
"iodebeziumtxid": "578",
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : "AAAAAAEAAgICAg==" 3
}
也可以将 Avro 用于 envelope 和 data 属性。
11.3.2. 配置 Debezium CloudEvents converter 的示例
在 Debezium 连接器配置中配置 io.debezium.converters.CloudEventsConverter。以下示例演示了如何配置 CloudEvents converter 来发送具有以下特征的更改事件记录:
- 使用 JSON 作为 envelope。
-
使用
http://my-registry/schemas/ids/1中的 schema registry 将data属性序列化为二进制 Avro 数据。
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json", 1
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...- 1
- 指定
serializer.type是可选的,因为json是默认值。
CloudEvents converter 转换 Kafka 记录值。在同一连接器配置中,如果您要对记录密钥操作,您可以指定 key.converter。例如,您可以指定 StringConverter、LongConverter、JsonConverter 或 AvroConverter。
11.3.3. Debezium CloudEvents converter 配置选项
当您将 Debezium 连接器配置为使用 CloudEvent converter 时,您可以指定以下选项。
表 11.3. CloudEvents converter 配置选项的描述
| 选项 | 默认 | 描述 |
|
|
用于 CloudEvents envelope 结构的编码类型。该值可以是 | |
|
|
用于 | |
| N/A |
使用 JSON 时要传递给底层转换器的任何配置选项。 | |
| N/A |
使用 Avro 时要传递给底层转换器的任何配置选项。删除了 | |
| none |
指定如何调整架构名称,以便与连接器使用的消息转换器兼容。该值可以是 |