第6章 Oracle 用 Debezium コネクター (開発者プレビュー)

Debezium の Oracle コネクターは、コネクターの実行中に追加されたテーブルなど、Oracle サーバーのデータベースで発生する行レベルの変更をキャプチャーし、記録します。コネクターを設定して、スキーマとテーブルの特定サブセットの変更イベントを出力するか、特定の列の値を無視、マスク、または切り捨てできます。

Debezium は、ネイティブ LogMiner データベースパッケージを使用して Oracle から変更イベントを取得します。

重要

Debezium Oracle コネクターは開発者プレビュー機能です。開発者プレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。開発者プレビューの機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされず、機能的に完全ではないことがあるため、Red Hat は開発者プレビュー機能を実稼働環境に実装することは推奨しません。この機能に関してサポートが必要な場合には、Debezium コミュニティー と連携できます。

Debezium Oracle コネクターの使用に関する情報および手順は、以下のように整理されています。

6.1. Debezium Oracle コネクターの仕組み

Debezium Oracle コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびメタデータの使用方法を理解すると便利です。

詳細は以下を参照してください。

6.1.1. Debezium Oracle コネクターによるデータベーススナップショットの実行方法

通常、Oracle サーバーの redo ログは、データベースの完全な履歴を保持しないように設定されています。そのため、Debezium Oracle コネクターはデータベースの全履歴をログから取得できません。コネクターがデータベースの現在の状態のベースラインを確立できるようにするには、コネクターが最初に起動すると、データベースの初期 整合性スナップショット を実行します。

snapshot.mode コネクター設定プロパティーの値を設定することで、コネクターがスナップショットを作成する方法をカスタマイズできます。デフォルトでは、コネクターのスナップショットモードは initial に設定されます。

初期スナップショットを作成するためのデフォルトのコネクターワークフロー

スナップショットモードがデフォルトに設定されている場合、コネクターは以下のタスクを完了してスナップショットを作成します。

  1. キャプチャーするテーブルを決定します。
  2. 監視された各テーブルの EXCLUSIVE MODE ロックを取得して、スナップショットの作成中に構造的な変更が発生しないようにします。Debezium は短期間のみロックを保持します。
  3. サーバーの redo ログから現在のシステム変更番号 (SCN) の位置を読み取ります。
  4. 関連するテーブルすべての構造をキャプチャーします。
  5. ステップ 2 で取得したロックを解放します。
  6. ステップ 3 で読み込まれた SCN の位置(SELECT* FROM …​ AS OF SCN 123)で有効なものとして、関連するデータベーステーブルとスキーマをすべてスキャンし イベントレコードをテーブル固有の Kafka トピックに書き込みます。
  7. コネクターオフセットにスナップショットの正常な完了を記録します。

スナップショットプロセスが開始されたら、コネクターの障害、リバランス、またはその他の理由でプロセスが中断された場合には、コネクターの再起動後にプロセスが再開されます。コネクターによって初期スナップショットが完了した後、ステップ 3 で読み取られた位置からストリーミングが続行され、更新を見逃しません。何らかの理由でコネクターが再び停止した場合、再起動後に、以前停止した場所から変更のストリーミングが再開されます。

表6.1 snapshot.mode コネクター設定プロパティーの設定

設定説明

Initial

コネクターは、「初期スナップショット作成のデフォルトフロー」で説明されているように、データベースのスナップショットを実行します。スナップショットの完了後、コネクターは後続のデータベースの変更についてのイベントレコードのストリーミングを開始します。

schema_only

コネクターは関連するすべてのテーブルの構造をキャプチャーし、デフォルトのスナップショットワークフロー に記載されているすべてのステップを実行します。ただし、コネクターの起動時(Step 6)の時点でデータセットを表す READ イベントが作成されない点が異なります。

6.1.2. Debezium Oracle 変更イベントレコードを受信する Kafka トピックのデフォルト名

Debezium Oracle コネクターは、Debezium Oracle コネクターが 1 つのテーブルのすべての INSERTUPDATE、および DELETE 操作のイベントを 1 つの Kafka トピックに書き込むことです。Kafka トピックの命名規則は次のとおりです。

serverName.schemaName.tableName

たとえば、Met ment がサーバー名で、インベントリーが スキーマ名で、データベースに名前、顧客 および製品 が含まれるテーブルが含まれる場合、Debezium Oracle コネクターはイベントを以下の Kafka トピックに出力します。1 つはデータベースのテーブルごとに 1 つずつです。

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products

6.1.3. Debezium Oracle コネクターによるデータベーススキーマの変更の公開方法

Debezium Oracle コネクターは、スキーマ変更の履歴をデータベース履歴トピックに保存します。このトピックは内部コネクターの状態を反映するため、直接使用しないでください。アプリケーションがスキーマの変更に関する通知を必要とする場合は、パブリックスキーマの変更トピックから情報を取得する必要があります。コネクターは、スキーマ変更イベントを <serverName> という名前の Kafka トピックに書き込みます。serverName database.server.name 設定プロパティーに指定されたコネクターの名前になります。

Debezium は、新しいテーブルからデータをストリーミングするたびに、このトピックに新しいメッセージを出力します。

メッセージには、テーブルスキーマの論理表現が含まれます。

例: スキーマ変更トピックに出力されたメッセージ

以下の例は、JSON 形式の一般的なスキーマ変更メッセージを示しています。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "1.5.4.Final",
      "connector": "oracle",
      "name": "server1",
      "ts_ms": 1588252618953,
      "snapshot": "true",
      "db": "ORCLPDB1",
      "schema": "DEBEZIUM",
      "table": "CUSTOMERS",
      "txId" : null,
      "scn" : "1513734",
      "commit_scn": "1513734",
      "lcr_position" : null
    },
    "databaseName": "ORCLPDB1", 1
    "schemaName": "DEBEZIUM", //
    "ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n   (    \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n    \"FIRST_NAME\" VARCHAR2(255), \n    \"LAST_NAME" VARCHAR2(255), \n    \"EMAIL\" VARCHAR2(255), \n     PRIMARY KEY (\"ID\") ENABLE, \n     SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n   ) SEGMENT CREATION IMMEDIATE \n  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n  TABLESPACE \"USERS\" ", 2
    "tableChanges": [ 3
      {
        "type": "CREATE", 4
        "id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", 5
        "table": { 6
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 7
            "ID"
          ],
          "columns": [ 8
            {
              "name": "ID",
              "jdbcType": 2,
              "nativeType": null,
              "typeName": "NUMBER",
              "typeExpression": "NUMBER",
              "charsetName": null,
              "length": 9,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "FIRST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "LAST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "EMAIL",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ]
        }
      }
    ]
  }
}

表6.2 スキーマ変更トピックに出力されたメッセージのフィールドの説明

項目フィールド名説明

1

databaseName
schemaName

変更が含まれるデータベースとスキーマを識別します。

2

ddl

このフィールドには、スキーマの変更を行う DDL が含まれます。

3

tableChanges

DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。

4

type

変更の種類を説明します。値は以下のいずれかになります。

CREATE
テーブルの作成
変更
テーブルの変更
DROP
テーブルの削除

5

id

作成、変更、または破棄されたテーブルの完全な識別子。

6

table

適用された変更後のテーブルメタデータを表します。

7

primaryKeyColumnNames

テーブルのプライマリーキーを構成する列のリスト。

8

columns

変更されたテーブルの各列のメタデータ。

コネクターがスキーマ変更トピックに送信するメッセージは、スキーマ変更が含まれるデータベースの名前と同じメッセージキーを使用します。以下の例では、payload フィールドにキーが含まれます。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.oracle.SchemaChangeKey"
  },
  "payload": {
    "databaseName": "ORCLPDB1"
  }
}

6.1.4. トランザクション境界を表す Debezium Oracle コネクターによって生成されたイベント

Debezium は、トランザクションメタデータ境界を表し、データ変更イベントメッセージをエンリッチするイベントを生成できます。

データベーストランザクションは、BEGIN キーワードと END キーワードの間に囲まれているステートメントブロックによって表されます。Debezium は、すべてのトランザクションで BEGIN 区切り文字と END 区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。

status
BEGIN または END
id
一意のトランザクション識別子の文字列表現。
event_countEND イベント用)
トランザクションによって出力されるイベントの合計数。
data_collectionsEND イベント用)
指定のデータコレクションからの変更によって出力されたイベントの数を提供する data _collection と event_count のペアの配列。

以下の例は、典型的なトランザクション境界メッセージを示しています。

例: Oracle コネクタートランザクション境界イベント

{
  "status": "BEGIN",
  "id": "5.6.641",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "5.6.641",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER",
      "event_count": 1
    },
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.ORDER",
      "event_count": 1
    }
  ]
}

トランザクションイベントは、<database.server.name>.transaction という名前のトピックに書き込まれます。

6.1.4.1. 変更データイベントのエンリッチメント

トランザクションメタデータを有効にすると、データ message Envelope は新しい トランザクション フィールドでエンリッチされます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。

id
一意のトランザクション識別子の文字列表現。
total_order
トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。

以下の例は、典型的なトランザクションイベントメッセージを示しています。

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "5.6.641",
    "total_order": "1",
    "data_collection_order": "1"
  }
}