12.6. 从 Debezium MongoDB 更改事件中提取源文档

Debezium MongoDB 连接器会发出数据更改信息,以表示 MongoDB 集合中发生的每个操作。这些事件消息的复杂结构传真代表原始数据库事件的详细信息。但是,一些下游用户可能无法以原始格式处理消息。例如,要表示数据收集中的嵌套文档,连接器以包含嵌套字段的格式发送事件消息。要支持接收器连接器,或者无法处理原始消息的层次结构格式的其他使用者,您可以使用 Debezium MongoDB 事件扁平化(ExtractNewDocumentState)单一消息转换(SMT)。SMT 简化了原始消息的结构,并可以通过其他方法修改消息,以便更轻松地处理数据。

事件扁平化转换是一个 Kafka Connect SMT

注意

本章中的信息仅描述了 Debezium MongoDB 连接器的单个消息转换(SMT)。有关与关系数据库一起使用的相同 SMT 的详情,请参考新记录状态 Extraction SMT 的文档

以下主题提供详情:

12.6.1. Debezium MongoDB 更改事件结构的描述

Debezium MongoDB 连接器生成具有复杂结构的更改事件。每个事件消息包括以下部分:

源元数据

包括但不仅限于以下字段:

  • 更改集合中数据的操作类型(创建/插入、更新或删除)。
  • 发生更改的数据库和集合的名称。
  • 标识更改的时间戳。
  • 可选的事务信息。
文档数据
数据

当 Debezium 连接器的 capture.mode 设置为以下值之一时,此字段存在于运行 MongoDB 6.0 及之后的版本中:

  • change_streams_with_pre_image.
  • change_streams_update_full_with_pre_image.

    如需更多信息,请参阅 MongoDB 预镜像支持

数据

代表在当前操作后文档中存在的值的 JSON 字符串。事件消息中存在 after 字段取决于事件类型和连接器配置。MongoDB 插入 操作的 create 事件始终包含 after 字段,而不考虑 capture.mode 设置。对于 更新 事件,只有在 capture.mode 被设置为以下值之一时,才会显示 after 字段:

  • change_streams_update_full
  • change_streams_update_full_with_pre_image.

    注意

    更改事件消息中的 after 值不一定表示事件后立即表示文档的状态。该值不会被动态计算,在连接器捕获更改事件后,它会查询集合以检索文档的当前值。

    例如,假设有多个操作、bc 修改文档的情况,以快速成功方式修改文档。当连接器进程时,它会更改 ,它会查询集合以获取完整的文档。同时,更改 bc。当连接器收到对更改的完整文档的响应时,可能会收到一个基于 bc 后续更改的文档版本。如需更多信息,请参阅 capture.mode 属性的文档。

以下片段显示了连接器在 MongoDB 插入 操作后发出的 创建 更改事件的基本结构:

{
  "op": "c",
  "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}",
  "source": { ... }
}

上例中的 after 字段的复杂格式提供了有关源数据库中发生的更改的详细信息。但是,一些消费者无法处理包含嵌套值的消息。要将原始消息的复杂嵌套字段转换为更简单、更通用兼容结构,请为 MongoDB 使用 event flattening SMT。SMT 扁平化消息中嵌套字段的结构,如下例所示:

{
  "field1" : "newvalue1",
  "field2" : "newvalue2"
}

有关 Debezium MongoDB 连接器生成的消息的默认结构的更多信息,请参阅 连接器文档

12.6.2. Debezium MongoDB 事件扁平化转换的行为

MongoDB 的事件扁平化 SMT 从 Debezium MongoDB 连接器发送的 创建或更新 更改事件消息中提取 after 字段。在 SMT 处理原始更改事件消息后,它会生成一个简化的版本,该版本只包含 after 字段的内容。

根据您的用例,您可以将 ExtractNewDocumentState SMT 应用到 Debezium MongoDB 连接器,或应用到消耗 Debezium 连接器生成的消息的接收器连接器。如果您将 SMT 应用到 Debezium MongoDB 连接器,则 SMT 会在将连接器发送到 Apache Kafka 前修改连接器发送的消息。要确保 Kafka 以原始格式保留完整的 Debezium 更改事件信息,请将 SMT 应用到接收器连接器。

当您使用事件扁平化 SMT 来处理从 MongoDB 连接器发出的消息时,F SMT 会将原始消息中的记录结构转换为正确输入的 Kafka Connect 记录,这些记录可以被典型的 sink 连接器使用。例如,FUSE 会将代表原始消息中的 after 信息的 JSON 字符串转换为任何消费者可以处理的模式结构。

另外,您可以为 MongoDB 配置事件扁平化 SMT,以便在处理过程中以其他方式修改消息。如需更多信息,请参阅配置 主题

12.6.3. 配置 Debezium MongoDB 事件扁平化转换

为使用 Debezium MongoDB 连接器发送的消息的 MongoDB 配置事件扁平化(ExtractNewDocumentState) SMT。

以下主题提供详情:

12.6.3.1. 示例: Debezium MongoDB 事件扁平化的基本配置

要获取 SMT 的默认行为,请在 sink 连接器配置中添加 SMT,而不指定任何选项,如下例所示:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState

与任何 Kafka Connect 连接器配置一样,您可以将 converts = 设置为多个、以逗号分隔的 SMT 别名。Kafka Connect 应用按照列出顺序指定的转换。

您可以为使用 MongoDB 事件扁平化 SMT 的连接器设置多个选项。以下示例显示了为连接器设置 drop.tombstonesdelete.handling.modeadd.headers 选项的配置:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=drop
transforms.unwrap.add.headers=op

有关上例中的配置选项的更多信息,请参阅配置 主题,

自定义配置

连接器可能会发出多种类型的事件信息(如心跳消息、tombstone 消息或有关事务的元数据消息)。要将转换应用到事件子集,您可以定义一个 SMT predicate 语句,用于选择性地将转换应用到 特定事件。

12.6.4. MongoDB 事件消息中编码数组的选项

默认情况下,事件扁平化 SMT 将 MongoDB 阵列转换为与 Apache Kafka Connect 或 Apache Avro 模式兼容的数组。虽然 MongoDB 数组可以包含多个类型的元素,但 Kafka 数组中的所有元素都必须是相同的类型。

为确保 SMT 以满足环境需求的方式编码数组,您可以指定 array.encoding 配置选项。以下示例显示了设置数组编码的配置:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.array.encoding=<array|document>

根据配置,SMT 使用以下编码方法之一处理源消息中数组的每个实例:

阵列编码
如果将 array.encoding 设置为 数组 (默认),则 SMT 编码 将使用 数组来编码原始消息中的数组。为确保正确处理,数组实例中的所有元素都必须是相同的类型。这个选项是一个限制,但它可让下游客户端轻松处理阵列。
文档编码
如果将 array.encoding 设置为 文档,则 SMT 会将源中的每个阵列转换为 结构 的结构,类似于 BSON 序列化结构 包含名为 _0、 _1_2 等等的字段,其中每个字段名称代表原始数组中某一元素的索引。SMT 使用 source 数组中等同元素检索的值填充每个索引字段。索引名称以下划线作为前缀,因为 Avro 编码禁止以数字字符开头的字段名称。

以下示例演示了 Debezium MongoDB 连接器如何代表一个数据库文件,其中包含包含异构数据类型的数组:

例 12.1. 示例:包含多个数据类型的数组文档编码

{
    "_id": 1,
    "a1": [
        {
            "a": 1,
            "b": "none"
        },
        {
            "a": "c",
            "d": "something"
        }
    ]
}

如果将 array.encoding 设置为 文档,则 SMT 会将前面的文档转换为以下格式:

{
    "_id": 1,
    "a1": {
        "_0": {
            "a": 1,
            "b": "none"
        },
        "_1": {
            "a": "c",
            "d": "something"
        }
    }
}

文档 编码选项可让 SMT 处理由异构元素组成的任意数组。但是,在使用这个选项前,请始终验证接收器连接器和其他下游用户是否能够处理包含多个数据类型的数组。

12.6.5. 在 MongoDB 事件消息中扁平化嵌套结构

当数据库操作涉及嵌入式文档时,Debezium MongoDB 连接器会发出一个 Kafka 事件记录,其中包含反映原始文档层次结构的结构。也就是说,事件消息将嵌套文档表示为一组嵌套字段结构。在下游连接器无法处理包含嵌套结构的消息的环境中,您可以将事件扁平化 SMT 配置为扁平化消息中的分层结构。扁平消息结构更适合表等存储。

要将 SMT 配置为扁平化嵌套结构,请将 flatten.struct 配置选项设置为 true。在转换的消息中,字段名称会被构建,使其与文档源一致。SMT 通过将父文档字段名称与嵌套文档字段的名称匹配来重命名每个扁平化字段。由 flatten.struct.delimiter 选项定义的分隔符分隔名称的组件。struct.delimiter 的默认值是一个下划线字符(_)。

以下示例显示了用于指定 SMT 扁平化嵌套结构的配置:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.flatten.struct=<true|false>
transforms.unwrap.flatten.struct.delimiter=<string>

以下示例显示了 MongoDB 连接器发送的事件信息。消息包含一个文档的字段,其中包含两个嵌套文档( bc )的字段:

{
    "_id": 1,
    "a": {
            "b": 1,
            "c": "none"
    },
    "d": 100
}

以下示例中的消息显示在 MongoDB 的 SMT 后的输出。

{
    "_id": 1,
    "a_b": 1,
    "a_c": "none",
    "d": 100
}

在生成的消息中,原始消息中的 bc 字段将被扁平化并重命名。重命名的字段通过将父文档的名称与嵌套文档的名称连接在一起: a _ba_c。新字段名称的组件使用下划线字符分隔,具体由 struct.delimiter 配置属性的设置所定义。

12.6.6. Debezium MongoDB 连接器如何报告 $unset 操作删除的字段名称

在 MongoDB 中,$unset operator 和 $rename operator 都从文档中删除字段。因为 MongoDB 集合是无架构的,在更新从文档中删除字段后,无法推断更新的文档中缺少的字段的名称。为了支持接收器连接器或其他可能需要删除字段的信息,Debezium 会发出更新信息,其中包含列出已删除字段名称的 removedFields 元素。

以下示例显示了导致删除字段的操作更新消息的一部分:

"payload": {
  "op": "u",
  "ts_ms": "...",
  "before": "{ ... }",
  "after": "{ ... }",
  "updateDescription": {
    "removedFields": ["a"],
    "updatedFields": null,
    "truncatedArrays": null
  }
}

在上例中,在 文档更新 之前和之后,代表源文档的状态。只有在设置了连接器的 capture.mode 时,连接器才会发出这些字段,如以下列表所述:

字段

在更改前提供文档的状态。只有在 capture.mode 设置为以下值之一时,才会显示此字段:

  • change_streams_with_pre_image
  • change_streams_update_full_with_pre_image.
after 字段

更改后提供文档的完整状态。只有在 capture.mode 设置为以下值之一时,才会显示此字段:

  • change_streams_update_full
  • change_streams_update_full_with_pre_image.

假设配置为捕获完整文档的连接器,当 ExtractNewDocumentState SMT 收到 $unset 事件 的更新 消息时,对于 $unset 事件的 SMT 会重新处理消息,这代表 removed 字段有一个 null 值,如下例所示:

{
    "id": 1,
    "a": null
}

对于没有配置为捕获完整文档的连接器,当 SMT 收到 $unset 操作的更新事件时,它会生成以下输出信息:

{
   "a": null
}

12.6.7. 确定原始数据库操作的类型

在 SMT 扁平化事件后,生成的消息不再指示生成事件的操作是 createupdate 或 initial snapshot read。通常,您可以通过配置连接器来公开有关删除附带的 tombstone 或重写事件的信息,来识别 删除操作。有关配置连接器以公开事件消息中的 tombstones 和 rewrites 的更多信息,请参阅 drop.tombstonesdelete.handling.mode 属性。

要在事件消息中报告数据库操作类型,SMT 可以向以下元素之一添加一个 op 字段:

  • 事件消息正文。
  • 消息标头。

例如,要添加一个标头属性来显示原始操作的类型,请添加转换,然后将 add.headers 属性添加到连接器配置中,如下例所示:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.headers=op

根据前面的配置,SMT 通过向消息添加 op 标头来报告事件类型,并为它分配一个字符串值来标识操作的类型。分配的字符串值基于原始 MongoDB 更改事件消息 中的 op 字段值。

12.6.8. 使用 MongoDB 事件扁平化 SMT 将 Debezium 元数据添加到 Kafka 记录

MongoDB 的事件扁平化 SMT 可以将原始更改事件消息的元数据字段添加到简化的消息中。添加的元数据字段前缀为双引号("__")。在事件记录中添加元数据可包含内容,如发生更改事件的集合名称,或包含特定于连接器的字段,如副本集名称。目前,SMT 只能添加以下更改事件子结构中的字段: source、Transaction 和 updateDescription

有关 MongoDB 更改事件结构的更多信息,请参阅 MongoDB 连接器文档

例如,您可以指定以下配置来添加副本集名称(rs)和更改事件的集合名称到最终扁平化的事件记录:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.fields=rs,collection

以上配置会导致以下内容添加到扁平化的记录中:

{ "__rs" : "rs0", "__collection" : "my-collection", ... }

如果您希望 SMT 添加元数据字段 来删除 事件,请将 delete.handling.mode 选项的值设置为 重写

12.6.9. 应用 MongoDB 的选项有选择地提取新的文档状态转换

除了 Debezium 连接器在发生数据库更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及有关架构更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。

有关如何有选择地应用 SMT 的更多信息,请参阅为转换配置 SMT predicate

12.6.10. MongoDB 事件扁平化转换的配置选项

下表描述了 MongoDB 事件扁平化 SMT 的配置选项。

属性默认描述

array.encoding

数组

指定 SMT 在编码从原始事件消息读取的数组时使用的格式。设置以下选项之一:

数组
SMT 使用 数组 datatype 将 MongoDB 阵列编码为与 Apache Kafka Connect 或 Apache Avro 模式兼容的格式。如果设置了这个选项,请验证每个阵列实例中元素是否使用相同的类型。虽然 MongoDB 允许数组包含多个数据类型,但有些下游客户端无法处理数组。
文档
SMT 将每个 MongoDB 数组转换为结构 的结构,其方式与 BSON 序列化 类似。结构 包含名为 _0、 _1、 _2 等的字段。为了遵守 Avro 命名标准,对于每个索引字段的数字名称加上下划线前缀。每个数字字段名称代表原始数组中某一元素的索引。SMT 使用指定数组元素的源文档检索的值填充每个索引字段。

有关 array.coding 选项的更多信息,请参阅 MongoDB 事件消息中编码数组的选项

flatten.struct

false

原始事件消息中的 SMT flattens 结构(structs)通过串联消息中的嵌套属性的名称(用可配置的分隔符分隔)来形成一个简单的字段名称。

flatten.struct.delimiter

_

flatten.struct 设为 true 时,指定转换在字段名称之间插入的分隔符,其从输入记录中串联到生成字段名称。

drop.tombstones

true

Debezium 为每个 删除操作 生成一个 tombstone 记录。默认行为是事件扁平化 SMT 从流中删除 tombstone 记录。要在流中保留 tombstone 记录,请指定 drop.tombstones=false

delete.handling.mode

drop

指定 SMT 如何处理 Debezium 为 删除操作 生成的更改事件记录。设置以下选项之一:

drop
SMT 从事件流 中删除 删除操作的记录。
none
SMT 从事件流中保留原始更改事件记录。记录仅包含 "value": "null "。
rewrite
SMT 从流保留更改事件记录的修改版本。为了提供另一种方法来指示删除了记录,修改的记录包含一个 value 字段,其中包含来自原始记录的键/值对,并将 __deleted: true 添加到

如果设置 rewrite 选项,您可能会发现更新的、对 DELETE 操作的简化记录足以跟踪已删除的记录。在这种情况下,您可能希望 SMT 丢弃 tombstone 记录

add.headers.prefix

__ (double-underscore)

将此可选字符串设置为标头前缀。

add.headers

没有默认值

指定以逗号分隔的列表,没有空格,包含您希望 SMT 添加到简化的消息标头中的元数据字段。当原始消息包含重复字段名称时,您可以通过提供结构名称和字段名称来标识要修改的特定字段,如 source.ts_ms

另外,您可以通过在列表中添加一个以下格式的条目来覆盖字段的原始名称,并为它分配新名称:

<field_name>:<new_field_name>.

例如:

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

您指定的新名称值区分大小写。

当 SMT 在简化的消息的标头中添加 metadata 字段时,它会为每个 metadata 字段名称添加前缀,并加倍。对于结构规格,SMT 也会在 struct 名称和字段名称之间插入下划线。

如果您指定了不在更改事件原始消息中的字段,则 SMT 不会将字段添加到标头中。

add.fields.prefix

__ (double-underscore)

指定为字段名称的前缀的可选字符串。

add.fields

没有默认值

将这个选项设置为用逗号分开的列表,没有空格,将元数据字段添加到简化的 Kafka 消息的 value 元素中。当原始消息包含重复字段名称时,您可以通过提供结构名称和字段名称来标识要修改的特定字段,如 source.ts_ms
另外,您可以通过在列表中添加一个以下格式的条目来覆盖字段的原始名称,并为它分配新名称:

<field_name>:<new_field_name>.

例如:

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

您指定的新名称值区分大小写。

当 SMT 在简化的消息的 value 元素中添加 metadata 字段时,它会为每个 metadata 字段名称添加前缀,并加倍。对于结构规格,SMT 也会在 struct 名称和字段名称之间插入下划线。

如果您指定了原始更改事件消息中没有的字段,则 SMT 仍然会将指定的字段添加到修改消息的 value 元素中。

sanitize.field.names

false

字段名称是否会被清理以遵守 Avro 命名要求。如需了解更多详细信息,请参阅 Avro 命名

已知限制

  • 由于 MongoDB 是一个无架构数据库,因此在使用 Debezium 将更改流传输到基于模式的数据关系数据库时,具有相同名称的集合中的字段必须存储同一类型的数据。
  • 将 SMT 配置为以与接收器连接器兼容的格式生成信息。如果接收器连接器需要"flat"消息结构,但它会收到一个在源 MongoDB 文档中编码数组的消息,则接收器连接器无法处理消息。