5.3. Debezium MySQL 连接器如何映射数据类型

Debezium MySQL 连接器代表对行的更改,其中包含与行存在的表类似的事件。event 包含每个列值的字段。该列的 MySQL 数据类型指定 Debezium 如何代表事件中的值。

存储字符串的列在 MySQL 中定义,并带有字符集和协调。MySQL 连接器在读取 binlog 事件中的列值的二进制表示时使用列的字符集。

连接器可将 MySQL 数据类型映射到 字面语义 类型。

  • 字面类型 :值如何使用 Kafka Connect 模式类型来表示。
  • 语义类型 : Kafka Connect 模式如何捕获字段(架构名称)的含义。

如果默认数据类型转换无法满足您的需要,您可以为连接器 创建自定义转换器

详情包括在以下部分:

基本类型

下表显示了连接器如何映射基本 MySQL 数据类型。

表 5.13. 基本类型映射的描述

MySQL 类型字面类型语义类型

布尔值, BOOL

布尔值

不适用

BIT(1)

布尔值

不适用

BIT(>1)

BYTES

io.debezium.data.Bits
length schema 参数包含一个代表位数的整数。byte[] 包含位(以 little-endian 格式),其大小调整为包含指定位数。例如,其中 n 是位:
numBytes = n/8 +(n%8== 0 ?0 : 1)

TINYINT

INT16

不适用

SMALLINT[(M)]

INT16

不适用

MUUINT[(M)]

INT32

不适用

INT, INTEGER[(M)]

INT32

不适用

BIGINT[(M)]

INT64

不适用

REAL[(M,D)]

FLOAT32

不适用

FLOAT[(P)]

FLOAT32FLOAT64

精度仅用于确定存储大小。精度 P 从 0 到 23 的结果会产生 4 字节的单精度 FLOAT32 列。从 24 到 53 的精度 P 会产生 8 字节双精度 FLOAT64 列。

FLOAT(M,D)

FLOAT64

从 MySQL 8.0.17 开始,非标准 FLOAT (M,D)和 DOUBLE (M,D)语法已弃用,并预期在以后的 MySQL 版本中删除对它的支持,将 FLOAT64 设置为默认值。

DOUBLE[(M,D)]

FLOAT64

不适用

CHAR(M)]

字符串

不适用

VARCHAR (M)]

字符串

不适用

BINARY (M)]

BYTESSTRING

N/A
根据 binary.handling.mode 连接器配置设置,为原始字节(默认)、base64 编码的字符串或 base64-url-safe-encoded 字符串或十六进制编码的字符串(基于 binary.handling.mode 连接器配置设置)。

VARBINARY (M)]

BYTESSTRING

N/A
根据 binary.handling.mode 连接器配置设置,为原始字节(默认)、base64 编码的字符串或 base64-url-safe-encoded 字符串或十六进制编码的字符串(基于 binary.handling.mode 连接器配置设置)。

TINYBLOB

BYTESSTRING

N/A
根据 binary.handling.mode 连接器配置设置,为原始字节(默认)、base64 编码的字符串或 base64-url-safe-encoded 字符串或十六进制编码的字符串(基于 binary.handling.mode 连接器配置设置)。

TINYTEXT

字符串

不适用

BLOB

BYTESSTRING

N/A
根据 binary.handling.mode 连接器配置设置,为原始字节(默认)、base64 编码的字符串或 base64-url-safe-encoded 字符串或十六进制编码的字符串(基于 binary.handling.mode 连接器配置设置)。
仅支持大小为 2GB 的值。建议您使用声明检查模式来外部化大型列值。

文本

字符串

N/A
仅支持大小最多 2GB 的值。建议您使用声明检查模式来外部化大型列值。

中型

BYTESSTRING

N/A
根据 binary.handling.mode 连接器配置设置,为原始字节(默认)、base64 编码的字符串或 base64-url-safe-encoded 字符串或十六进制编码的字符串(基于 binary.handling.mode 连接器配置设置)。

MEDTEXT

字符串

不适用

LONGBLOB

BYTESSTRING

N/A
根据 binary.handling.mode 连接器配置设置,为原始字节(默认)、base64 编码的字符串或 base64-url-safe-encoded 字符串或十六进制编码的字符串(基于 binary.handling.mode 连接器配置设置)。
仅支持大小为 2GB 的值。建议您使用声明检查模式来外部化大型列值。

LONGTEXT

字符串

N/A
仅支持大小最多 2GB 的值。建议您使用声明检查模式来外部化大型列值。

JSON

字符串

io.debezium.data.Json
包含 JSON 文档、数组或 scalar 的字符串表示。

ENUM

字符串

io.debezium.data.Enum
允许的 schema 参数包含以逗号分隔的允许值列表。

SET

字符串

io.debezium.data.EnumSet
allowed schema 参数包含以逗号分隔的允许值列表。

YEAR[(2|4)]

INT32

io.debezium.time.Year

TIMESTAMP[(M)]

字符串

io.debezium.time.ZonedTimestamp
in ISO 8601 format with microsecond precision。MySQL 允许 M 位于 0-6 范围内。

临时类型

排除 TIMESTAMP 数据类型,MySQL temporal 类型取决于 time.precision.mode 连接器配置属性的值。对于默认值指定为 CURRENT_ TIMESTAMPNOW 的 TIMESTAMP 列,值 1970-01-01 00:00:00 用作 Kafka Connect 模式中的默认值。

MySQL 允许 DATE、 DATE TIMETIMESTAMP 列的零值,因为有时使用零值而不是 null 值。当列定义允许 null 值时,MySQL 连接器将零值表示为 null 值,或者在列不允许 null 值时作为 epoch 天。

没有时区的临时值

DATETIME 类型代表本地日期和时间,如 "2018-01-13 09:48:27"。您可以看到,没有时区信息。这些列使用 UTC 根据列的精度转换为 epoch 毫秒或微秒。TIMESTAMP 类型代表没有时区信息的时间戳。在读取值时,MySQL 将从服务器(或会话)当前时区转换为 UTC,并从 UTC 转换为服务器(或会话)当前时区。例如:

  • DATETIME 的值为 2018-06-20 06:37:03 变为 1529476623000
  • TIMESTAMP 的值 2018-06-20 06:37:03 变为 2018-06-20T13:37:03Z

这些列根据服务器(或会话)当前的时区转换为 UTC 中的等同的 io.debezium.time.ZonedTimestamp。默认将从服务器查询时区。如果此操作失败,则必须由数据库 connectionTimeZone MySQL 配置选项明确指定。例如,如果数据库的时区(通过 connectionTimeZone 选项为连接器全局或配置了)是 "America/Los_Angeles",TIMESTAMP 值 "2018-06-20 06:37:03" 由带有值为 "2018-06-20T13:37:03" 的 ZonedTimestamp 表示。

运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响这些转换。

有关与临时值相关的属性的更多详细信息,请参见 MySQL 连接器配置属性 的文档。

time.precision.mode=adaptive_time_microseconds(default)

MySQL 连接器根据列的数据类型定义决定字面类型和语义类型,以便事件准确代表数据库中的值。所有时间字段都以微秒为单位。只有 00:00:00.00000023:59:59.999999 范围内的正 TIME 字段值才能被正确捕获。

表 5.14. time.precision.mode=adaptive_time_microseconds时的映射

MySQL 类型字面类型语义类型

DATE

INT32

io.debezium.time.Date
代表自 epoch 以来的天数。

TIME[(M)]

INT64

io.debezium.time.MicroTime
以微秒表示时间值,不包括时区信息。MySQL 允许 M 位于 0-6 范围内。

DATETIME, DATETIME (0), DATETIME (1), DATETIME (2), DATETIME (3)

INT64

io.debezium.time.Timestamp
代表超过 epoch 的毫秒数,不包括时区信息。

DATETIME (4), DATETIME (5), DATETIME (6)

INT64

io.debezium.time.MicroTimestamp
代表 epoch 过去的微秒数,不包括时区信息。

time.precision.mode=connect

MySQL 连接器使用定义的 Kafka Connect 逻辑类型。这种方法比默认方法更精确,如果数据库列具有大于 3 的部分 精度 值,则事件可能不太精确。只能处理 00:00:00.00023:59:59.999 的值。只有在您可以确保表中的 TIME 值不能超过支持的范围时,才设置 time.precision.mode=connect。在以后的 Debezium 版本中会删除 connect 设置。

表 5.15. time.precision.mode=connect时的映射

MySQL 类型字面类型语义类型

DATE

INT32

org.apache.kafka.connect.data.Date
代表自 epoch 后的天数。

TIME[(M)]

INT64

org.apache.kafka.connect.data.Time
从中等以来以微秒为单位代表时间值,不包括时区信息。

DATETIME[(M)]

INT64

org.apache.kafka.connect.data.Timestamp
从 epoch 开始代表毫秒的数量,不包括时区信息。

十进制类型

Debezium 连接器根据 decimal.handling.mode 连接器配置属性 的设置处理十进制。

decimal.handling.mode=precise

表 5.16. 当 decimal.handling.mode=precise时进行映射

MySQL 类型字面类型语义类型

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包括一个整数,它代表了十进制小数点移动了多少位。

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包括一个整数,它代表了十进制小数点移动了多少位。

decimal.handling.mode=double

表 5.17. Mappings when decimal.handling.mode=double

MySQL 类型字面类型语义类型

NUMERIC[(M[,D])]

FLOAT64

不适用

DECIMAL[(M[,D])]

FLOAT64

不适用

decimal.handling.mode=string

表 5.18. Mappings when decimal.handling.mode=string

MySQL 类型字面类型语义类型

NUMERIC[(M[,D])]

字符串

不适用

DECIMAL[(M[,D])]

字符串

不适用

布尔值

MySQL 以特定方式在内部处理 BOOLEAN 值。BOOLEAN 列内部映射到 TINYINT (1) 数据类型。在流期间创建表时,它会使用正确的 BOOLEAN 映射,因为 Debezium 接收原始 DDL。在快照期间,Debebe 执行 SHOW CREATE TABLE,以获取为 BOOLEANTINYINT (1) 列返回 TINYINT (1) 列的表定义。然后 Debezium 无法获取原始类型映射,因此映射到 TINYINT (1)

要允许您将源列转换为布尔值,Debezium 提供了一个 TinyIntOneToBooleanConverter 自定义转换器,您可以使用以下方法之一:

  • 将所有 TINYINT (1)TINYINT (1) UNSIGNED 列映射到 BOOLEAN 类型。
  • 使用以逗号分隔的正则表达式列表枚举列子集。
    要使用这种转换,您必须使用 selector 参数设置 转换器 配置属性,如下例所示:

    converters=boolean
    boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
    boolean.selector=db1.table1.*, db1.table2.column1
  • 注意:当快照执行 SHOW CREATE TABLE 时,MySQL8 不会显示 tinyint 未签名的 类型长度,这意味着这个转换器无法正常工作。新选项 length.checker 可以解决这个问题,默认值为 true。禁用 length.checker,并指定需要转换为 selector 属性的列,而不是根据类型转换所有列,如下例所示:

    converters=boolean
    boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
    boolean.length.checker=false
    boolean.selector=db1.table1.*, db1.table2.column1

spatial 类型

目前,Debebe MySQL 连接器支持以下 spatial 数据类型。

表 5.19. spatial 类型映射的描述

MySQL 类型字面类型语义类型

GEOMETRY,
LINESTRING,
POLYGON,
多点,
MULTILINESTRING,
MULTIPOLYGON,
GEOMETRYCOLLECTION

STRUCT

io.debezium.data.geometry.Geometry
包含带有两个字段的结构:

  • Srid (INT32: spatial reference system ID,用于定义存储在结构中的 geometry 对象的类型
  • Wkb (BYTES): 以 Well-Known-Binary (wkb)格式编码的 geometry 对象的二进制表示。如需了解更多详细信息,请参阅 Open Geospatial Consortium