第 13 章 开发 Debezium 自定义数据类型转换器
使用自定义开发的转换器只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅 https://access.redhat.com/support/offerings/techpreview。
Debezium 更改事件记录中的每个字段代表源表或数据收集中的字段或列。当连接器向 Kafka 发出更改事件记录时,它会将源中的每个字段的数据类型转换为 Kafka Connect 模式类型。列值同样转换,以匹配目的地字段的 schema 类型。对于每个连接器,默认映射指定连接器如何转换每个数据类型。这些默认映射在每种连接器的数据类型文档中描述。
虽然默认映射通常足够了,但对于某些应用程序,您可能需要应用备用映射。例如,如果默认映射从 UNIX 时以毫秒格式导出列,但您的下游应用程序只能使用格式字符串来使用列值。您可以通过开发和部署自定义转换器来自定义数据类型映射。您可以将自定义转换器配置为对特定类型的所有列执行操作,或者您可以缩小其范围,以便它们只应用到特定的表列。converter 函数截获任何符合指定条件的列的数据类型转换请求,然后执行指定的转换。converter 忽略与指定条件不匹配的列。
自定义转换器是实施 Debezium 服务提供商接口(SPI)的 Java 类。您可以通过在连接器配置中设置 converters 属性来启用和配置自定义转换器。converters 属性指定连接器可用的转换器,并可包含进一步修改转换行为的子属性。
启动连接器后,连接器配置中启用的转换器会被实例化并添加到 registry 中。registry 将每个转换器与列或字段相关联。每当 Debezium 处理新的更改事件时,它会调用配置的转换器来转换它注册的列或字段。
13.1. 创建 Debezium 自定义数据类型转换器
以下示例显示了实现接口 io.debezium.spi.converter.CustomConverter 的 Java 类的转换器实现:
public interface CustomConverter<S, F extends ConvertedField> {
@FunctionalInterface
interface Converter { 1
Object convert(Object input);
}
public interface ConverterRegistration<S> { 2
void register(S fieldSchema, Converter converter); 3
}
void configure(Properties props);
void converterFor(F field, ConverterRegistration<S> registration); 4
}自定义转换器方法
CustomConverter 接口的实现必须包括以下方法:
configure()将连接器配置中指定的属性传递给转换器实例。
配置方法在连接器初始化时运行。您可以将转换器与多个连接器搭配使用,并根据连接器的属性设置修改其行为。配置方法接受以下参数:props- 包含传递给转换器实例的属性。每个属性指定转换特定类型的列值的格式。
converterFor()注册转换器来处理数据源中的特定列或字段。Debezium 调用
converterFor ()方法,以提示转换器来调用转换的注册。converterFor方法为每个列运行一次。
方法接受以下参数:field- 一个对象,用于传递处理字段或列的元数据。列元数据可以包含列或字段的名称、表或集合的名称、数据类型、大小等。
注册-
一个类型为
io.debezium.spi.converter.CustomConverter.ConverterRegistration的对象,它提供目标架构定义以及转换列数据的代码。当源列与转换器应该处理的类型匹配时,转换器会调用注册参数。调用寄存器方法来定义 schema 中每个列的转换器。模式使用 Kafka ConnectSchemaBuilderAPI 代表。
13.1.1. Debezium 自定义转换器示例
以下示例实现了一个简单的转换器,它执行以下操作:
-
运行
configure方法,它根据连接器配置中指定的schema.name属性的值配置转换器。转换器配置特定于每个实例。 运行
converterFor方法,它将注册转换器来处理数据类型设置为isbn的源列中的值。-
根据为
schema.name属性指定的值来识别目标STRING模式。 -
将源栏中的 ISBN 数据转换为
String值。
-
根据为
例 13.1. 一个简单的自定义转换器
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private SchemaBuilder isbnSchema;
@Override
public void configure(Properties props) {
isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
}
@Override
public void converterFor(RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
if ("isbn".equals(column.typeName())) {
registration.register(isbnSchema, x -> x.toString());
}
}
}13.1.2. Debezium 和 Kafka Connect API 模块依赖项
自定义转换器 Java 项目已编译 Debezium API 和 Kafka Connect API 库模块的依赖关系。这些编译依赖关系必须包含在项目的 pom.xml 中,如下例所示:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version> 1
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${version.kafka}</version> 2
</dependency>