第5章 MySQL の Debezium コネクター

重要

本リリースの Debezium MySQL コネクターには、他の Debezium コネクターによって使用される一般的なコネクターフレームワークをベースとした新しいデフォルトキャプチャー実装が含まれています。改訂されたキャプチャー実装はテクノロジープレビュー機能です。テクノロジープレビューの機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされず、機能的に完全ではないことがあるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、「テクノロジプレビュー機能のサポート範囲」を参照してください。

新しいキャプチャー実装と共に実行中にコネクターでエラーや予期しない動作を生じる場合は、以下の設定オプションを設定して以前の実装に戻すことができます。

internal.implementation=legacy

MySQL には、データベースにコミットされた順序ですべての操作を記録するバイナリーログ (binlog) があります。これには、テーブルスキーマの変更やテーブルのデータの変更が含まれます。MySQL はレプリケーションとリカバリーに binlog を使用します。

Debezium MySQL コネクターは binlog を読み取り、行レベルの INSERTUPDATE、および DELETE 操作の変更イベントを生成し、変更イベントを Kafka トピックに出力します。クライアントアプリケーションはこれらの Kafka トピックを読み取ります。

MySQL は通常、指定期間後に binlogs をパージするように設定されているため、MySQL コネクターは各データベースの最初の整合性スナップショット を実行します。MySQL コネクターは、スナップショットが作成された時点から binlog を読み取ります。

Debezium MySQL コネクターの使用に関する情報および手順は、以下のように整理されています。

5.1. Debezium MySQL コネクターの仕組み

コネクターがサポートする MySQL トポロジーの概要は、アプリケーションを計画するときに役立ちます。Debezium MySQL コネクターを最適に設定および実行するには、コネクターによるテーブルの構造の追跡方法、スキーマ変更の公開方法、スナップショットの実行方法、および Kafka トピック名の決定方法を理解しておくと便利です。

詳細は以下を参照してください。

5.1.1. Debezium コネクターでサポートされる MySQL トポロジー

Debezium MySQL コネクターは以下の MySQL トポロジーをサポートします。

スタンドアロン
単一の MySQL サーバーを使用する場合は、Debezium MySQL コネクターがサーバーを監視できるように、binlog を有効 (および任意で GTID を有効) にする必要があります。バイナリーログも増分バックアップとして使用できるため、これは多くの場合で許容されます。この場合、MySQL コネクターは常にこのスタンドアロン MySQL サーバーインスタンスに接続し、それに従います。
プライマリーおよびレプリカ

Debezium MySQL コネクターはプライマリーサーバーまたはレプリカの 1 つ (レプリカの binlog が有効になっている場合) に従うことができますが、コネクターはサーバーが認識できるクラスターのみで変更を確認できます。通常、これはマルチプライマリートポロジー以外では問題ではありません。

コネクターは、サーバーの binlog の位置を記録します。この位置は、クラスターの各サーバーごとに異なります。そのため、コネクターは 1 つの MySQL サーバーインスタンスのみに従う必要があります。このサーバーに障害が発生した場合、サーバーを再起動またはリカバリーしないと、コネクターは継続できません。

高可用性クラスター
MySQL にはさまざまな 高可用性 ソリューションが存在し、問題や障害の耐性をつけ、即座に回復することが大変容易になります。ほとんどの HA MySQL クラスターは GTID を使用します。そのため、レプリカはあらゆるプライマリーサーバーの変更をすべて追跡できます。
マルチプライマリー

ネットワークデータベース (NDB) クラスターのレプリケーション は、複数のプライマリーアーバーからそれぞれをレプリケートする 1 つ以上の MySQL レプリカを使用します。これは、複数の MySQL クラスターのレプリケーションを集約する強力な方法です。このトポロジーには GTID を使用する必要があります。

Debezium MySQL コネクターはこれらのマルチプライマリー MySQL レプリカをソースとして使用することができ、新しいレプリカが古いレプリカに追い付けば、異なるマルチプライマリー MySQL レプリカにフェイルオーバーできます。つまり、新しいレプリカには最初のレプリカで確認されたすべてのトランザクションが含まれます。これは、新しいマルチプライマリー MySQL レプリカへの再接続を試み、binlog で適切な場所を見つけようとする際に、特定の GTID ソースが含まれるまたは除外されるようにコネクターを設定できるため、コネクターがデータベースやテーブルのサブセットのみを使用している場合でも機能します。

ホステッド

Debezium MySQL コネクターが Amazon RDS や Amazon Aurora などのホステッドオプションを使用するためのサポートがあります。

これらのホステッドオプションではグローバル読み取りロックが許可されないため、テーブルレベルロックを使用して 整合性スナップショット を作成します。

5.1.2. Debezium MySQL コネクターによるデータベーススキーマの変更の処理方法

データベースクライアントがデータベースのクエリーを行うと、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更が可能です。そのため、挿入、更新、または削除の操作が記録されるたびに、コネクターはどのスキーマであるかを特定できる必要があります。また、コネクターが比較的古いイベントを処理し、テーブルのスキーマが変更される前に記録された可能性があるため、コネクターは現在のスキーマのみを使用することはできません。

これに対応するために、MySQL の binlog にはデータの行レベルの変更だけでなく、データベースに適用される DDL ステートメントも含まれます。コネクターは binlog を読み取り、DDL ステートメントを見つけると、それらの DDL ステートメントを解析し、各テーブルのスキーマのインメモリー表現を更新します。コネクターはこのスキーマ表現を使用して、挿入、更新、または削除の操作時にテーブルの構造を特定し、適切な変更イベントを生成します。別のデータベース履歴 Kafka トピックでは、コネクターは各 DDL ステートメントがある binlog の場所とともにすべての DDL ステートメントを記録します。

コネクターが正常にクラッシュまたは停止された後にコネクターが再起動されると、コネクターは特定の場所 (特定の時点) から binlog の読み取りを開始します。コネクターは、データベース履歴の Kafka トピックを読み取り、コネクターが起動する binlog の時点まですべての DDL ステートメントを解析することで、この時点で存在したテーブル構造を再ビルドします。

このデータベース履歴トピックはコネクターのみが使用します。コネクターは任意で、コンシューマーアプリケーション向けの異なるトピックへのスキーマ変更イベントの生成 を 参照してください。

MySQL コネクターが、gh -ost、pt- online-schema- change などのスキーマ変更ツールが適用されるテーブルで変更をキャプチャーすると、移行プロセス中に作成されたヘルパーテーブルがあります。これらのヘルパーテーブルへの変更をキャプチャーするようにコネクターを設定する必要があります。ヘルパーテーブル用に生成されたレコードがコンシューマーに必要ない場合は、単一メッセージ変換を適用して、除去できます。

Debezium イベントレコードを受信する トピックのデフォルト名 を参照してください。

5.1.3. Debezium MySQL コネクターによるデータベーススキーマの変更の公開方法

Debezium MySQL コネクターを設定すると、MySQL サーバーのデータベースに適用されるすべての DDL ステートメントが含まれるスキーマ変更イベントを生成できます。コネクターは、これらのイベントを serverName という名前の Kafka トピックに出力します。serverName は、database.server.name コネクター設定プロパティーによって指定されるコネクターの名前になります。

スキーマ変更イベント の使用を選択した場合、スキーマ変更トピックからレコードを使用するようにしてください。データベース履歴トピックはコネクターのみが使用します。

重要

スキーマ変更トピックに出力されたイベントのグローバルな順序は重要です。したがって、データベース履歴のトピックをパーティション化しないでください。つまり、データベース履歴トピックの作成時にパーティション数を 1 に指定する必要があります。自動トピックの作成に依存する場合は、デフォルトのパーティション数を指定する Kafka の num.partitions 設定オプションが 1 に設定されていることを確認します。

コネクターがスキーマ変更トピックに出力する各レコードには、DDL ステートメントの適用時に接続されたデータベースの名前を含むメッセージキーが含まれています。例を以下に示します。

{
  "schema": {
    "type": "struct",
    "name": "io.debezium.connector.mysql.SchemaChangeKey",
    "optional": false,
    "fields": [
      {
        "field": "databaseName",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
    "databaseName": "inventory"
  }
}

スキーマ変更イベントレコードの値には、DDL ステートメント、ステートメントが適用されたデータベースの名前、および binlog におけるステートメントの位置を含む構造が含まれます。以下に例を示します。

{
  "schema": {
    "type": "struct",
    "name": "io.debezium.connector.mysql.SchemaChangeValue",
    "optional": false,
    "fields": [
      {
        "field": "databaseName",
        "type": "string",
        "optional": false
      },
      {
        "field": "ddl",
        "type": "string",
        "optional": false
      },
      {
        "field": "source",
        "type": "struct",
        "name": "io.debezium.connector.mysql.Source",
        "optional": false,
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ]
      }
    ]
  },
  "payload": {
    "databaseName": "inventory",
    "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;",
    "source" : {
      "version": "1.5.4.Final",
      "name": "mysql-server-1",
      "server_id": 0,
      "ts_ms": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": null,
      "table": null,
      "query": null
    }
  }
}

The ddl フィールドに複数の DDL ステートメントが含まれる可能性があります。各ステートメントは databaseName フィールドのデータベースに適用されます。ステートメントは、データベースに適用された順序で示されます。ソースフィールド は、テーブル固有のトピックに書き込まれた標準のデータ変更イベントとして構成されます。このフィールドは、異なるトピックでイベントを関連付けるのに役立ちます。

....
"payload": {
    "databaseName": "inventory",
    "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,...)",
    "source" : {
        ...
    }
}
....

クライアントは、複数のデータベースに適用される複数の DDL ステートメントを送信できます。MySQL がこれらをアトミックに適用する場合、コネクターは DDL ステートメントを順番に取得し、データベース別にグループ化して、各グループにスキーマ変更イベントを作成します。MySQL がこれらを個別に適用すると、コネクターは各ステートメントに対して個別のスキーマ変更イベントを作成します。

スキーマ履歴トピック 」も参照してください。

5.1.4. Debezium MySQL コネクターによるデータベーススナップショットの実行方法

Debezium MySQL コネクターが最初に起動すると、データベースの最初の 整合性スナップショット が実行されます。以下のフローは、コネクターによってこのスナップショットが作成される方法を示しています。このフローは、初期 デフォルトのスナップショットモード用です。その他のスナップショットモードの詳細は、MySQL コネクター snapshot.mode 設定プロパティ を参照してください。

表5.1 グローバル読み取りロックを使用して最初のスナップショットを実行するためのワークフロー

ステップアクション

1

他のデータベースクライアントによる 書き込み をブロックするグローバル読み取りロックを取得します。

スナップショット自体は、コネクターによる binlog の位置やテーブルスキーマの読み取りを妨害する可能性のある DDL を他のクライアントが適用しないように防ぐことはありません。コネクターは binlog の位置を読み取る間にグローバル読み取りロックを保持し、後のステップで説明するように、ロックを解除します。

2

繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内の後続の読み取りがすべて 整合性スナップショット に対して実行されるようにします。

3

現在の binlog の位置を読み取ります。

4

コネクターが変更をキャプチャーするように設定されたデータベースとテーブルのスキーマを読み取ります。

5

グローバル読み取りロックを解放します。他のデータベースクライアントがデータベースに書き込みできるようになりました。

6

該当する場合は、DDL の変更をスキーマ変更トピックに書き込みます。これには、必要な DROP…​ および CREATE…​ DDL ステートメントがすべて含まれます。

7

データベーステーブルをスキャンします。コネクターは、行ごとに CREATE イベントを関連するテーブル固有の Kafka トピックに出力します。

8

トランザクションをコミットします。

9

コネクターオフセットの完了済みスナップショットを記録します。

コネクターの再起動

最初のスナップショット の実行中にコネクターが失敗または停止したり、再分散された場合、コネクターの再起動後に新しいスナップショットが実行されます。この 最初のスナップショット が完了すると、Debezium MySQL コネクターは binlog の同じ位置から再起動するため、更新が見逃されることはありません。

コネクターが長時間停止した場合、MySQL が古い binlog ファイルをパージし、コネクターの位置が失われる可能性があります。位置が失われた場合、コネクターは 最初のスナップショット を開始位置に戻します。Debezium MySQL コネクターのトラブルシューティングに関する詳細は、問題が発生した場合の動作を参照し てください。

グローバル読み取りロックが許可されない

一部の環境では、グローバル読み取りロックが許可されません。Debezium MySQL コネクターがグローバル読み取りロックが許可されないことを検出すると、代わりにテーブルレベルロックを使用して、この方法でスナップショットを実行します。これには、Debezium コネクターのデータベースユーザーに LOCK TABLES 権限が必要になります。

表5.2 テーブルレベルロックを使用して最初のスナップショットを実行するためのワークフロー

ステップアクション

1

テーブルレベルロックを取得します。

2

繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内の後続の読み取りがすべて 整合性スナップショット に対して実行されるようにします。

3

データベースとテーブルの名前を読み取り、選別します。

4

現在の binlog の位置を読み取ります。

5

コネクターが変更をキャプチャーするように設定されたデータベースとテーブルのスキーマを読み取ります。

6

該当する場合は、DDL の変更をスキーマ変更トピックに書き込みます。これには、必要な DROP…​ および CREATE…​ DDL ステートメントがすべて含まれます。

7

データベーステーブルをスキャンします。コネクターは、行ごとに CREATE イベントを関連するテーブル固有の Kafka トピックに出力します。

8

トランザクションをコミットします。

9

テーブルレベルロックを解除します。

10

コネクターオフセットの完了済みスナップショットを記録します。

5.1.5. Debezium MySQL 変更イベントレコードを受信する Kafka トピックのデフォルト名

デフォルトの動作では、Debezium MySQL コネクターは 1 つのテーブルのすべての INSERTUPDATE、および DELETE 操作のイベントを 1 つの Kafka トピックに書き込みます。Kafka トピックの命名規則は次のとおりです。

serverName.databaseName.tableName

合致する 、サーバー名、インベントリーは データベース名で、データベースには 注文顧客および製品 の名前が含まれています。Debezium MySQL コネクターは、データベースのテーブルごとに 1 つずつ、3 つの Kafka トピックにイベントを出力します。

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products

トランザクションメタデータ

Debezium は、トランザクション境界を表し、データ変更イベントメッセージをエンリッチするイベントを生成できます。トランザクション BEGIN および END ごとに、Debezium は以下のフィールドが含まれるイベントを生成します。

  • status: BEGIN または END
  • id: 一意のトランザクション識別子の文字列表現
  • event_countEND イベント用)- トランザクションによって出力されたイベントの合計数
  • data_collectionsEND イベント用)- 指定のデータコレクションからの変更によって出力されたイベントの数を提供する data _collection と event_count のペアの配列。

{
  "status": "BEGIN",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}

トランザクションイベントは、database.server.name. transaction という名前のトピックに書き込まれます。

変更データイベントのエンリッチメント

トランザクションメタデータを有効にすると、データ message Envelope は新しい トランザクション フィールドでエンリッチされます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。

  • id: 一意のトランザクション識別子の文字列表現
  • total_order: トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
  • data_collection_order - トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。

以下は、メッセージの例になります。

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

GTID が有効になっていないシステムの場合、トランザクション識別子は binlog ファイル名と binlog の位置の組み合わせを使用して作成されます。たとえば、トランザクション BEGIN イベントに対応する binlog のファイル名と位置が mysql-bin.000002 および 1913 の場合、Debezium が構築したトランザクション識別子は file=mysql-bin.000002,pos=1913 になります。