6.2. Debezium Oracle 连接器数据更改事件的描述

Oracle 连接器发出的每个数据更改事件都有一个键和值。键和值的结构取决于更改事件源自的表。有关 Debezium 如何构建主题名称的详情,请参考 主题名称

警告

Debezium Oracle 连接器确保所有 Kafka Connect 模式名称都 是有效的 Avro 模式名称。这意味着,逻辑服务器名称必须以字母字符或下划线([a-z,A-Z,_])以及逻辑服务器名称中的其余字符以及架构和表名称中的所有字符都必须是字母数字字符或下划线([a-z,A-Z,0-9,\_])。连接器自动将无效字符替换为下划线字符。

当多个逻辑服务器名称、模式名称或表名称之间的字符无效字符无效字符时,意外命名冲突可能会导致字符区分。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间而变化,这很难以处理主题。为便于处理可变事件结构,Kafka Connect 中的每个事件都是自我包含的。每个消息键和值有两个部分:schemapayload。schema 描述了有效负载的结构,而有效负载包含实际数据。

警告

SYSSYSTEM 用户帐户执行的更改不会被连接器捕获。

以下主题包含有关数据更改事件的更多详情:

6.2.1. 关于 Debezium Oracle 连接器更改事件中的键

对于每个更改的表,更改事件键是结构化的,以便在事件创建事件时为表的主键(或唯一键约束)中有一个字段。

例如,在 inventory 数据库架构中定义的 客户 表可能有以下更改事件键:

CREATE TABLE customers (
  id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
  first_name VARCHAR2(255) NOT NULL,
  last_name VARCHAR2(255) NOT NULL,
  email VARCHAR2(255) NOT NULL UNIQUE
);

如果 < topic.prefix&gt;.transaction 配置属性的值设置为 server1,则数据库表中的 customers 表中发生的每个更改事件的 JSON 表示具有以下键结构:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "ID"
            }
        ],
        "optional": false,
        "name": "server1.INVENTORY.CUSTOMERS.Key"
    },
    "payload": {
        "ID": 1004
    }
}

键的 schema 部分包含一个 Kafka Connect 模式,用于描述密钥部分的内容。在上例中,有效负载 值不是可选,其结构由名为 server1.DEBEZIUM.CUSTOMERS.Key 的 schema 定义,一个名为 id 的 name type int32。键的 payload 字段的值表示它实际上是一个带有 id 字段的结构(在 JSON 中,是一个对象),其值为 1004

因此,您可以将这个键解释为 inventory.customers 表中的行(来自名为 server1 的连接器),其 id 主键列的值为 1004

6.2.2. 关于 Debezium Oracle 连接器更改事件中的值

更改事件消息中的值结构反映了消息中 更改事件中的消息键 的结构,并且包含 schema 部分和 payload 部分。

更改事件值的有效负载

更改事件值的 payload 部分中的 envelope 结构包含以下字段:

op
包含描述操作类型的字符串值的必填字段。Oracle 连接器更改事件值有效负载中的 op 字段包含以下值之一: c (创建或插入)、u (update)、d (delete)或 r (读,代表快照)。
之前
可选字段(如果存在)描述了事件 发生前 行的状态。该结构由 server1.INVENTORY.CUSTOMERS.Value Kafka Connect 模式描述,server1 连接器用于 inventory.customers 表中的所有行。
after
一个可选字段,如果存在,包含 更改后 行的状态。该结构由用于 before 字段的同一 server1.INVENTORY.CUSTOMERS.Value Kafka Connect 模式描述。
source

包含描述事件源元数据的结构的必填字段。如果是 Oracle 连接器,结构包括以下字段:

  • Debezium 版本。
  • 连接器名称。
  • 事件是持续快照的一部分。
  • 事务 ID (不包括快照)。
  • 更改的 SCN。
  • 指明源数据库中记录何时更改的时间戳(用于快照,时间戳指示快照何时发生)。
  • 进行更改的用户名

    提示

    commit_scn 字段是可选的,并描述了更改事件参与的事务提交的 SCN。

ts_ms
可选字段(如果存在)包含连接器处理事件的 JVM 中系统时钟的时间(基于系统时钟)。

更改事件值的 schema

事件消息值的 schema 部分包含一个 schema,用于描述有效负载的 envelope 结构及其中的嵌套字段。

有关更改事件值的更多信息,请参阅以下主题:

创建 事件

以下示例显示了 customers 表中的 create 事件值的值,如 更改事件键示例中所述

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": true,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "rs_id"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "ssn"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "redo_thread"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "user_name"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "snapshot"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.oracle.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            }
        ],
        "optional": false,
        "name": "server1.DEBEZIUM.CUSTOMERS.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "source": {
            "version": "2.1.4.Final",
            "name": "server1",
            "ts_ms": 1520085154000,
            "txId": "6.28.807",
            "scn": "2122185",
            "commit_scn": "2122185",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false
        },
        "op": "c",
        "ts_ms": 1532592105975
    }
}

在上例中,注意事件如何定义以下模式:

  • envelope (server1.DEBEZIUM.CUSTOMERS.Envelope).
  • 结构(io.debezium.connector.oracle.Source,它特定于 Oracle 连接器并在所有事件间重复使用。
  • 用于 beforeafter 字段的特定于表的模式。
提示

beforeafter 字段的 schema 的名称的格式为 <logicalName>.<schemaName>.<tableName>.Value, 因此完全独立与所有其他表的 schema。因此,当您使用 Avro converter 时,每个逻辑源中的表的 Avro 模式都有自己的演进和历史记录。

此事件值的 payload 部分提供有关事件的信息。它描述了创建了行(op=c),并显示 after 字段值包含插入到行 IDFIRST_NAMELAST_NAMEEMAIL 列中的值。

提示

默认情况下,事件的 JSON 表示大于它们描述的行。较大的大小是由 JSON 表示的,包括消息的 schema 和 payload 部分。您可以使用 Avro Converter 减少连接器写入 Kafka 主题的消息大小。

更新 事件

以下示例显示了连接器从上一次创建事件相同的表中的 更新 更改事件。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "source": {
            "version": "2.1.4.Final",
            "name": "server1",
            "ts_ms": 1520085811000,
            "txId": "6.9.809",
            "scn": "2125544",
            "commit_scn": "2125544",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false
        },
        "op": "u",
        "ts_ms": 1532592713485
    }
}

有效负载的结构与 创建 (插入)事件的有效负载相同,但以下值不同:

  • op 字段的值是 u,表示此行因为更新而改变。
  • before 字段显示行的前者状态,以及更新 数据库提交前存在的值。
  • after 字段显示行的更新状态,EMAIL 值现在设置为 anne@example.com
  • source 字段的结构包含与之前相同的字段,但它们的值不同,因为连接器从红色日志中的不同位置捕获事件。
  • ts_ms 字段显示 Debezium 处理事件的时间戳。

payload 部分显示一些其他有用的信息。例如,通过比较 beforeafter 结构,我们可以确定在提交后如何更改行。 结构提供有关 Oracle 在此变化记录相关的信息,从而提供了可追溯性。另外,当此事件与本主题和其它事件相关时,还会深入了解。它是否在与另一个事件相同的提交之前、之后或作为另一个事件的一部分发生?

注意

更新行主/唯一键的列时,行键的值会改变。因此,Debezium 会在此类 更新后发出三个 事件:

  • DELETE 事件。
  • 一个 tombstone 事件,带有行的旧键。
  • 为行提供新密钥的 INSERT 事件。

删除 事件

以下示例显示了上一次 createupdate 事件示例中显示的表的 delete 事件。delete 事件的 schema 部分与这些事件的 schema 部分相同。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "after": null,
        "source": {
            "version": "2.1.4.Final",
            "name": "server1",
            "ts_ms": 1520085153000,
            "txId": "6.28.807",
            "scn": "2122184",
            "commit_scn": "2122184",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false
        },
        "op": "d",
        "ts_ms": 1532592105960
    }
}

createupdate 事件相比,事件的 payload 部分显示了几个不同之处:

  • op 字段的值为 d,表示已删除该行。
  • before 字段显示数据库提交删除的行的前一个状态。
  • after 字段的值为 null,表示行不再存在。
  • source 字段的结构中包括了多个在 createupdate 事件中存在的键, 但 ts_ms, scn, 和 txId 中的值不同。
  • ts_ms 显示指示 Debezium 处理此事件的时间戳。

删除 事件为消费者提供处理此行所需的信息。

Oracle 连接器的事件被设计为用于 Kafka 日志压缩,这样可允许删除一些旧的信息,只要每个键至少保留最新的消息。这允许 Kafka 在确保主题包含完整的数据集并可用于重新载入基于密钥的状态时回收存储空间。

删除行时,上例中显示的 delete 事件值仍可用于日志压缩,因为 Kafka 能够删除所有使用相同键的消息。message 值必须设置为 null,以指示 Kafka 删除共享同一键的所有消息。为了实现此目的,默认情况下 Debezium 的 Oracle 连接器始终遵循一个 delete 事件,它有一个特殊的 tombstone 事件,其键相同但 null 值。您可以通过设置连接器属性 tombstones.on.delete 来改变默认的行为。

截断 事件

一个 截断 的更改事件信号,表示表已被截断。本例中 message 键为 null,消息值类似如下:

{
    "schema": { ... },
    "payload": {
        "before": null,
        "after": null,
        "source": { 1
            "version": "2.1.4.Final",
            "connector": "oracle",
            "name": "oracle_server",
            "ts_ms": 1638974535000,
            "snapshot": "false",
            "db": "ORCLPDB1",
            "sequence": null,
            "schema": "DEBEZIUM",
            "table": "TEST_TABLE",
            "txId": "02000a0037030000",
            "scn": "13234397",
            "commit_scn": "13271102",
            "lcr_position": null,
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user"
        },
        "op": "t", 2
        "ts_ms": 1638974558961, 3
        "transaction": null
    }
}

表 6.6. 截断 事件值字段的描述

字段名称描述

1

source

描述事件源元数据的必需字段。在 truncate 事件值中,source 字段结构与为同一表 创建更新和删除事件 相同,提供此元数据:

  • Debezium 版本
  • 连接器类型和名称
  • 包含新行的数据库和表
  • 模式名称
  • 如果事件是快照的一部分( truncate 事件始终为 false )
  • 执行操作的事务 ID
  • 操作的 SCN
  • 在数据库中进行更改的时间戳
  • 执行更改的用户名

2

op

描述操作类型的强制字符串。op 字段值为 t,表示此表已被截断。

3

ts_ms

显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中 ,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

由于 truncate 事件代表对整个表所做的更改,且没有消息键,且没有具有多个分区的主题,因此不能保证消费者收到 截断 事件和更改事件(创建update 等等)用于表。例如,当消费者从不同分区读取事件时,可能会在收到同一表的 截断 事件后收到表 的更新 事件。只有在主题使用单个分区时,才可以保证排序。

如果您不想捕获 截断 的事件,请使用 skipped.operations 选项过滤它们。