第 13 章 开发 Debezium 自定义数据类型转换器

重要

使用自定义开发的转换器只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅 https://access.redhat.com/support/offerings/techpreview

Debezium 更改事件记录中的每个字段代表源表或数据收集中的字段或列。当连接器向 Kafka 发出更改事件记录时,它会将源中的每个字段的数据类型转换为 Kafka Connect 模式类型。列值同样转换,以匹配目的地字段的 schema 类型。对于每个连接器,默认映射指定连接器如何转换每个数据类型。这些默认映射在每种连接器的数据类型文档中描述。

虽然默认映射通常足够了,但对于某些应用程序,您可能需要应用备用映射。例如,如果默认映射从 UNIX 时以毫秒格式导出列,但您的下游应用程序只能使用格式字符串来使用列值。您可以通过开发和部署自定义转换器来自定义数据类型映射。您可以将自定义转换器配置为对特定类型的所有列执行操作,或者您可以缩小其范围,以便它们只应用到特定的表列。converter 函数截获任何符合指定条件的列的数据类型转换请求,然后执行指定的转换。converter 忽略与指定条件不匹配的列。

自定义转换器是实施 Debezium 服务提供商接口(SPI)的 Java 类。您可以通过在连接器配置中设置 converters 属性来启用和配置自定义转换器。converters 属性指定连接器可用的转换器,并可包含进一步修改转换行为的子属性。

启动连接器后,连接器配置中启用的转换器会被实例化并添加到 registry 中。registry 将每个转换器与列或字段相关联。每当 Debezium 处理新的更改事件时,它会调用配置的转换器来转换它注册的列或字段。

13.1. 创建 Debezium 自定义数据类型转换器

以下示例显示了实现接口 io.debezium.spi.converter.CustomConverter 的 Java 类的转换器实现:

public interface CustomConverter<S, F extends ConvertedField> {

    @FunctionalInterface
    interface Converter {  1
        Object convert(Object input);
    }

    public interface ConverterRegistration<S> { 2
        void register(S fieldSchema, Converter converter); 3
    }

    void configure(Properties props);

    void converterFor(F field, ConverterRegistration<S> registration); 4
}
1
将数据从一个类型转换为另一个类型的功能。
2
注册转换器的回调。
3
为当前字段注册给定的模式和转换器。对于同一字段,不应多次调用一次。
4
注册自定义值和模式转换器,以用于特定字段。

自定义转换器方法

CustomConverter 接口的实现必须包括以下方法:

configure()

将连接器配置中指定的属性传递给转换器实例。配置 方法在连接器初始化时运行。您可以将转换器与多个连接器搭配使用,并根据连接器的属性设置修改其行为。
配置 方法接受以下参数:

props
包含传递给转换器实例的属性。每个属性指定转换特定类型的列值的格式。
converterFor()

注册转换器来处理数据源中的特定列或字段。Debezium 调用 converterFor () 方法,以提示转换器 来调用转换的注册converterFor 方法为每个列运行一次。
方法接受以下参数:

field
一个对象,用于传递处理字段或列的元数据。列元数据可以包含列或字段的名称、表或集合的名称、数据类型、大小等。
注册
一个类型为 io.debezium.spi.converter.CustomConverter.ConverterRegistration 的对象,它提供目标架构定义以及转换列数据的代码。当源列与转换器应该处理的类型匹配时,转换器会调用 注册 参数。调用 寄存器 方法来定义 schema 中每个列的转换器。模式使用 Kafka Connect SchemaBuilder API 代表。

13.1.1. Debezium 自定义转换器示例

以下示例实现了一个简单的转换器,它执行以下操作:

  • 运行 configure 方法,它根据连接器配置中指定的 schema.name 属性的值配置转换器。转换器配置特定于每个实例。
  • 运行 converterFor 方法,它将注册转换器来处理数据类型设置为 isbn 的源列中的值。

    • 根据为 schema.name 属性指定的值来识别目标 STRING 模式。
    • 将源栏中的 ISBN 数据转换为 String 值。

例 13.1. 一个简单的自定义转换器

public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private SchemaBuilder isbnSchema;

    @Override
    public void configure(Properties props) {
        isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
    }

    @Override
    public void converterFor(RelationalColumn column,
            ConverterRegistration<SchemaBuilder> registration) {

        if ("isbn".equals(column.typeName())) {
            registration.register(isbnSchema, x -> x.toString());
        }
    }
}

13.1.2. Debezium 和 Kafka Connect API 模块依赖项

自定义转换器 Java 项目已编译 Debezium API 和 Kafka Connect API 库模块的依赖关系。这些编译依赖关系必须包含在项目的 pom.xml 中,如下例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version> 1
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>${version.kafka}</version> 2
</dependency>
1
${version.debezium} 代表 Debezium 连接器的版本。
2
${version.kafka} 代表环境中的 Apache Kafka 版本。