12.4. 过滤 Debezium 更改事件记录

默认情况下,Debebe 提供它接收到 Kafka 代理的每个数据更改事件。然而,在很多情况下,您可能只关注生成者发出的事件子集。要让您只处理与您相关的记录,Debezium 只提供 过滤器 单个消息转换 (SMT)。

重要

Debezium 过滤器 SMT 是一个技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围

虽然可以使用 Java 创建自定义 SMT 来编码过滤逻辑,但使用自定义编码的 SMT 具有其缺陷。例如:

  • 需要预先编译转换并将其部署到 Kafka Connect。
  • 每个更改都需要代码重新编译和重新部署,从而造成不灵活的操作。

过滤器 SMT 支持与 JSR 223 集成的脚本语言(Java™ 平台跟踪)。

Debezium 不附带 JSR 223 API 的任何实现。要将表达式语言与 Debezium 搭配使用,您必须为语言下载 JSR 223 脚本引擎实施。根据您用于部署 Debezium 的方法,您可以自动从 Maven Central 下载所需的工件,也可以手动下载工件,然后将它们添加到 Debezium 连接器插件目录中,以及语言实施所使用的任何其他 JAR 文件中。

12.4.1. 设置 Debezium 过滤器 SMT

为安全起见,Debezium 连接器存档中不包含过滤器 SMT。相反,它会在单独的工件中提供 debezium-scripting-2.1.4.Final.tar.gz

如果通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium 连接器,若要使用过滤器 SMT,您必须明确下载 SMT 归档并部署文件以及连接器插件。当使用 AMQ Streams 部署连接器时,它可以根据您在 Kafka Connect 自定义资源中指定的配置参数自动下载所需的工件。重要信息:在 Kafka Connect 实例中存在过滤器 SMT 后,允许向实例添加连接器的任何用户都可以运行脚本表达式。要确保脚本表达式只能由授权用户运行,请务必在添加过滤器 SMT 前保护 Kafka Connect 实例及其配置接口。

如果您从 Dockerfile 构建 Kafka Connect 容器镜像,则适用以下步骤。如果使用 AMQ Streams 创建 Kafka Connect 镜像,请按照连接器部署主题中的说明操作。

流程

  1. 从浏览器中,打开 Debezium 下载站点的红帽构建,并下载 Debezium 脚本 SMT 存档(debezium-scripting-2.1.4.Final.tar.gz)。
  2. 将存档的内容提取到 Kafka Connect 环境的 Debezium 插件目录中。
  3. 获取 JSR-223 脚本引擎实现,并将其内容添加到 Kafka Connect 环境的 Debezium 插件目录中。
  4. 重启 Kafka Connect 进程以获取新的 JAR 文件。

Groovy 语言在 classpath 上需要以下库:

  • groovy
  • groovy-json (可选)
  • groovy-jsr223

JavaScript 语言在 classpath 上需要以下库:

  • graalvm.js
  • graalvm.js.scriptengine

12.4.2. 示例: Debezium 基本过滤器 SMT 配置

您可以在 Debezium 连接器的 Kafka Connect 配置中配置过滤器转换。在配置中,您可以通过定义基于业务逻辑的过滤条件来指定您感兴趣的事件。当过滤器 SMT 处理事件流时,它会根据配置的过滤器条件评估每个事件。只有满足过滤器条件条件的事件才会传递给代理。

要配置 Debezium 连接器来过滤更改事件记录,请在 Debezium 连接器的 Kafka Connect 配置中配置 Filter SMT。配置过滤器 SMT 要求您指定定义过滤条件的正则表达式。

例如,您可以在连接器配置中添加以下配置。

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...

前面的例子指定了使用 Groovy 表达式语言。正则表达式 value.op == 'u' && value.before.id == 2 会删除所有消息,但代表 update (u)记录的 id 值等于 2

自定义配置

前面的示例演示了一个简单的 SMT 配置,它旨在只处理 DML 事件,其中包含 op 字段。连接器可能会发出的其他类型的消息(心跳消息、tombstone 消息或有关架构更改和事务的元数据消息)不包含此字段。为了避免处理失败,您可以定义一个 SMT predicate 语句,该语句只会将转换应用到 特定事件。

12.4.3. 在过滤器表达式中使用的变量

Debezium 将某些变量绑定到过滤器 SMT 的评估上下文中。当您创建表达式来指定过滤器条件时,您可以使用 Debezium 绑定到评估上下文的变量。通过绑定变量,Debezium 允许 SMT 在评估表达式中的条件时查找和解释其值。

下表列出了 Debezium 绑定到过滤器 SMT 的评估上下文的变量:

表 12.3. 过滤表达式变量

名称描述类型

key

消息的一个键。

org.apache.kafka.connect​.data​.Struct

value

消息的值。

org.apache.kafka.connect.data​.Struct

keySchema

message 键的 schema。

org.apache.kafka.connect​.data​.Schema

valueSchema

消息值的 schema。

org.apache.kafka.connect​.data​.Schema

topic

目标主题的名称。

字符串

标头

消息标头的 Java 映射。key 字段是标头名称。headers 变量公开以下属性:

  • (类型为 Object
  • 模式 (类型为 org.apache.kafka​.connect​.data​.Schema

java.util.Map​<String, ​io.debezium.transforms​.scripting​.RecordHeader>

表达式可以在其变量上调用任意方法。表达式应解析为布尔值,用来决定 SMT 如何处理消息。当表达式中的过滤器条件评估为 true 时,会保留消息。当过滤器条件评估为 false 时,会删除消息。

表达式不应产生任何副作用。也就是说,不应修改他们通过的任何变量。

12.4.4. 用于有选择地应用过滤器转换的选项

除了 Debezium 连接器在发生数据库更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及有关架构更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:

12.4.5. 为其他脚本语言过滤条件配置

您表达过滤条件的方式取决于您使用的脚本语言。

例如,如 基本配置 示例所示,当您使用 Groovy 作为表达式语言时,以下表达式会删除所有消息,除了 id 值设置为 2 的更新记录:

value.op == 'u' && value.before.id == 2

其他语言使用不同的方法表达相同的条件。

提示

Debezium MongoDB 连接器将 afterpatch 字段作为序列化 JSON 文档而不是作为结构发出。
要将过滤器 SMT 与 MongoDB 连接器搭配使用,您必须首先将 JSON 中的数组字段解压到单独的文档中。
您可以使用表达式中的 JSON 解析器为每个数组项目生成单独的输出文档。例如,如果您使用 Groovy 作为表达式语言,请将 groovy-json 工件添加到 classpath 中,然后添加一个表达式,如 (new groovy.json.JsonSlurper ()).parseText (value.after).last_name == 'Kretchmar'

JavaScript

如果您使用 JavaScript 作为表达式语言,您可以调用 StructVirtualMachineget () 方法来指定过滤条件,如下例所示:

value.get('op') == 'u' && value.get('before').get('id') == 2

带有 Graal.js 的 JavaScript

如果您使用带有 Graal.js 的 JavaScript 来定义过滤条件,您可以使用类似于 Groovy 的方法。例如:

value.op == 'u' && value.before.id == 2

12.4.6. 配置过滤器转换的选项

下表列出了您可以在过滤器 SMT 中使用的配置选项。

表 12.4. 过滤 SMT 配置选项

属性

默认

描述

topic.regex

 

可选的正则表达式,用于评估事件的目标主题名称,以确定是否应用过滤逻辑。如果目标主题的名称与 topic.regex 的值匹配,则转换会在将事件传递给主题之前应用过滤器逻辑。如果主题的名称与 topic.regex 中的值不匹配,则 SMT 会将事件传递给主题未修改。

language

 

写入表达式的语言。必须以 jsr223. 例如: jsr223.groovyjsr223.graal.js。Debezium 仅支持通过 JSR 223 API 进行引导("Java ™ Platform" 跟踪)。

condition

 

要为每个消息评估的表达式。必须评估一个布尔值,其中为 true 的结果会保留消息,结果为 false 会删除它。

null.handling.mode

keep

指定转换如何处理 null (tombstone)信息。您可以指定以下选项之一:

keep
(默认)传递消息。
drop
完全删除消息。
评估
将过滤器条件应用到消息。