第2章 MySQL 用 Debezium コネクター

MySQL にはバイナリーログ(binlog)があり、このログは、データベースにコミットされる順序ですべての操作を記録します。これには、テーブルスキーマの変更やテーブル内のデータが含まれます。MySQL はレプリケーションおよびリカバリーに binlog を使用します。

MySQL コネクターは binlog を読み取り、行レベル INSERT、、および DELETE 操作の変更イベントを生成し UPDATE、Kafka トピックに変更イベントを記録します。クライアントアプリケーションは、これらの Kafka トピックを読み取ります。

MySQL は通常、指定した時間が経過した後に binlog をパージするように設定されるため、MySQL コネクターは各データベースの初回の スナップショット を実行します。MySQL コネクターは、スナップショットの作成元から binlog を読み取ります。

2.1. MySQL コネクターの仕組みの概要

Debezium MySQL コネクターはテーブルの構造を追跡し、スナップショットを実行し、binlog イベントを Debezium 変更イベント、Kafka に記録されるレコードに変換します。

2.1.1. MySQL コネクターがデータベーススキーマを使用する方法

データベースクライアントがデータベースをクエリーする場合、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更できます。つまり、コネクターは、各挿入、更新、または削除の操作が記録されたときにスキーマを特定できる必要があります。また、コネクターは比較的古いイベントを処理し、テーブルのスキーマの変更前に記録された可能性があるため、現在のスキーマのみを使用することができません。

これを処理するため、MySQL は binlog にデータへの行レベルの変更と、データベースに適用される DDL ステートメントが含まれます。コネクターが binlog を読み取り、これらの DDL ステートメントを渡すと、それらを解析して各テーブルのスキーマのメモリー内表現を更新します。コネクターはこのスキーマ表現を使用して、それぞれの挿入、更新、または削除時にテーブルの構造を特定し、適切な変更イベントを生成します。コネクターは、個別のデータベース履歴 Kafka トピックで、各 DDL ステートメントが表示される binlog の位置とともにすべての DDL ステートメントを記録します。

コネクターがクラッシュするか、または正常に停止された後にコネクターを再起動すると、コネクターは特定の時点から特定の位置から binlog の読み取りを開始します。コネクターは、データベース履歴 Kafka トピックを読み取り、すべての DDL ステートメントをコネクターが開始する binlog のポイントに解析することで、この時点で存在していたテーブル構造を再ビルドします。

このデータベース履歴トピックは、コネクターが使用するためにのみ使用されます。コネクターは任意で、コンシューマーアプリケーション向けの異なるトピックでスキーマ変更イベントを生成できます。これは、MySQL コネクターがスキーマ変更トピックを処理する方法 に説明されています。

MySQL コネクターが、gh-ost やなどのスキーマ変更ツールの変更をキャプチャーする際に、移行プロセス中に作成 pt-online-schema-change されたヘルパーテーブルをホワイトリストのテーブルに含める必要があります。

ダウンストリームシステムでは、一時テーブルによって生成されたメッセージが必要ない場合は、単純なメッセージ変換を書き込んで適用して、それらをフィルターすることができます。

トピックの命名規則の詳細は、「 MySQL コネクターおよび Kafka トピック 」を参照してください。

2.1.2. MySQL コネクターによるデータベーススナップショットの実行方法

Debezium MySQL コネクターが最初に起動すると、データベースの初期 一貫性のあるスナップショット を実行します。以下のフローは、このスナップショットがどのように完了するかを説明します。

注記

これは、snapshot.mode プロパティー initial でとして設定されるデフォルトのスナップショットモードです。他のスナップショットモードについては、MySQL コネクター設定プロパティー をご確認ください。

コネクター
手順アクション

1

他のデータベースクライアントによる 書き込み をブロックする グローバル読み取りロック を取得します。

注記

このスナップショット自体は、binlog の位置とテーブルスキーマの読み取りの試行に干渉する可能性がある他のクライアントが DDL を適用できないようにする訳ではありません。グローバル読み取りロックは、後でリリースされる前に binlog の位置が読み込まれる間に保持されます。

2

繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内のすべての後続の読み取りが 一貫したスナップショット に対して実行されるようにします。

3

現在の binlog 位置を読み取ります。

4

コネクターの設定によって許可されるデータベースおよびテーブルのスキーマを読み取ります。

5

グローバル読み取りロック を解放します。これにより、他のデータベースクライアントがデータベースに書き込むことができるようになります。

6

必要なすべての CREATE…​ ステートメントを含む DDL 変更をスキーマ変更トピック DROP…​ に書き込みます。

注記

これは該当する場合に発生します。

7

データベーステーブルをスキャンし、各行に関連するテーブル固有の Kafka トピックで CREATE イベントを生成します。

8

トランザクションをコミットします。

9

コネクターオフセットで完了したスナップショットを記録します。

2.1.2.1. コネクターが失敗した場合、どうなりますか?

初期のスナップショット 作成中にコネクターが失敗するか、停止するか、またはリバランスされると、コネクターは再起動後に新しいスナップショットを作成します。意図しない スナップショット が完了すると、Debezium MySQL コネクターは binlog 内の同じ位置から再起動するため、更新が失われないようにします。

注記

コネクターが長い期間停止すると、MySQL は古い binlog ファイルをパージする可能性があり、コネクターの位置は失われます。位置が失われた場合、コネクターは開始位置のために 最初のスナップショット に戻ります。Debezium MySQL コネクターのトラブルシューティングに関する詳細は、「 MySQL connector common issues 」を参照してください。

2.1.2.2. グローバル読み取りロックが許可されていない場合は?

一部の環境では、グローバル読み取りロック が許可されない場合があります。Debezium MySQL コネクターがグローバル読み取りロックが許可されないことを検出すると、コネクターは代わりにテーブルレベルのロックを使用して、この方法でスナップショットを実行します。

重要

ユーザーには LOCK_TABLES 権限が必要です。

コネクター
手順アクション

1

繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内のすべての後続の読み取りが 一貫したスナップショット に対して実行されるようにします。

2

データベースおよびテーブルの名前を読み取り、フィルタリングします。

3

現在の binlog 位置を読み取ります。

4

コネクターの設定によって許可されるデータベースおよびテーブルのスキーマを読み取ります。

5

必要なすべての CREATE…​ ステートメントを含む DDL 変更をスキーマ変更トピック DROP…​ に書き込みます。

注記

これは該当する場合に発生します。

6

データベーステーブルをスキャンし、各行に関連するテーブル固有の Kafka トピックで CREATE イベントを生成します。

7

トランザクションをコミットします。

8

テーブルレベルのロックを解放します。

9

コネクターオフセットで完了したスナップショットを記録します。

2.1.3. MySQL コネクターによるスキーマ変更のトピックの処理方法

Debezium MySQL コネクターを設定して、MySQL サーバーのデータベースに適用されるすべての DDL ステートメントが含まれるスキーマ変更イベントを生成することができます。コネクターはこれらのすべてのイベントをという名前の Kafka トピックに書き込みます <serverName> serverName。は、database.server.name 設定プロパティーに指定されたコネクターの名前になります。

重要

スキーマ変更イベント の使用を選択した場合は、スキーマ変更トピックを使用し、データベースの履歴トピックは使用し ない でください。

注記

データベーススキーマ履歴には、イベントのグローバルな順序が存在することが重要です。そのため、データベース履歴トピックはパーティションを設定することはできません。これは、このトピックの作成時に 1 のパーティション数を指定する必要があることを意味します。自動トピックの作成に依存する場合は、Kafka の num.partitions 設定オプション(デフォルトのパーティション数)がに設定されていることを確認してください 1

2.1.3.1. スキーマ変更トピック構造

スキーマ変更トピックに書き込まれる各メッセージには、DDL ステートメントの適用時に使用される接続されたデータベースの名前が含まれるメッセージキーが含まれています。

{
  "schema": {
    "type": "struct",
    "name": "io.debezium.connector.mysql.SchemaChangeKey",
    "optional": false,
    "fields": [
      {
        "field": "databaseName",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
    "databaseName": "inventory"
  }
}

スキーマ変更イベントメッセージの値には、DDL ステートメント、ステートメントが適用されるデータベース、ステートメントが表示された binlog の位置が含まれる構造が含まれます。

{
  "schema": {
    "type": "struct",
    "name": "io.debezium.connector.mysql.SchemaChangeValue",
    "optional": false,
    "fields": [
      {
        "field": "databaseName",
        "type": "string",
        "optional": false
      },
      {
        "field": "ddl",
        "type": "string",
        "optional": false
      },
      {
        "field": "source",
        "type": "struct",
        "name": "io.debezium.connector.mysql.Source",
        "optional": false,
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_sec"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ]
      }
    ]
  },
  "payload": {
    "databaseName": "inventory",
    "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;",
    "source" : {
      "version": "0.10.0.Beta4",
      "name": "mysql-server-1",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": null,
      "table": null,
      "query": null
    }
  }
}
2.1.3.1.1. スキーマ変更のトピックに関する重要なヒント

この ddl フィールドには複数の DDL ステートメントを含めることができます。すべてのステートメントは databaseName フィールドのデータベースに適用され、データベースで適用された順序と同じ順序で表示されます。source フィールドは、テーブル固有のトピックに書き込まれた標準データ変更イベントとして構成されます。このフィールドは、異なるトピックでイベントを関連付けるのに役立ちます。

....
    "payload": {
        "databaseName": "inventory",
        "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,...
        "source" : {
            ....
        }
    }
....
クライアントが 複数のデータベースに DDL ステートメントを送信する場合はどうすればよいです か?
  • MySQL がアトミックに適用される場合、コネクターは DDL ステートメントを順番に取得し、データベースを基にグループ化し、各グループのスキーマ変更イベントを作成します。
  • MySQL が個別に適用される場合、コネクターはステートメントごとに個別のスキーマ変更イベントを作成します。

関連情報

2.1.4. MySQL コネクターイベント

Debezium MySQL コネクターによって生成されたすべてのデータ変更イベントには、キーと値が含まれます。変更イベントキーおよび変更イベント値には、スキーマ とペイロードの構造がスキーマと ペイロード が含まれ、ペイロードにはデータが含まれます。

警告

MySQL コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。これは、ラピン文字やアンダースコアではない文字がアンダースコアに置き換えられているため、論理サーバー名、データベース名、およびテーブル名のコンテナー名がこれらのアンダースコアに置き換えられると予期せぬ競合が生じる可能性があります。

2.1.4.1. Change イベントキー

変更イベントのキーには、イベントの作成時にの PRIMARY KEY (または一意の制約)内の各列のフィールドが含まれる構造があります。例のテーブルとテーブルのスキーマとペイロードがどのように表示されるかを見てみましょう。

テーブルの例

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

変更イベントキーの例

{
 "schema": { 1
    "type": "struct",
 "name": "mysql-server-1.inventory.customers.Key", 2
 "optional": false, 3
 "fields": [ 4
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
 "payload": { 5
    "id": 1001
  }
}

1
は内の内容を schema 説明し payloadます。
2
mysql-server-1.inventory.customers.Key は、がコネクター名で、がデータベースで、が表 mysql-server-1inventory ある構造を定義するスキーマの名前 customers です。
3
はオプションで payload はないことを示します。
4
で想定されるフィールドのタイプを指定し payloadます。
5
ペイロード自体(この場合は 1 つの id フィールドのみ)。

このキー inventory.customers は、プライマリーキーコラムに値がであるコネクターの外部に mysql-server-1 id ある行を記述し 1001ます。

2.1.4.2. イベント値の変更

変更イベントの値には、スキーマとペイロードセクションが含まれます。変更イベント値には、3 つのタイプのイベント構造があります。この構造のフィールドは以下で説明され、各変更イベント値の例でマークされます。

確認項目フィールド名説明

1

name

mysql-server-1.inventory.customers.Key は、コネクター名、がデータベースで、がテーブルで mysql-server-1 ある構造を定義 inventory するスキーマの名前 customers です。

2

op

操作のタイプを記述する 必須 の文字列。

  • c = create
  • u = 更新
  • d = DELETE
  • r 読み取り(初期スナップショット のみ)

3

before

イベント発生前の行の状態を指定するオプションのフィールド。

4

after

イベント発生後の行の状態を指定するオプションのフィールド。

5

source

以下を含む、イベントのソースメタデータを記述する 必須 フィールドです。

  • Debezium のバージョン
  • コネクター名
  • イベントが記録された binlog 名
  • binlog の位置
  • イベント内の行
  • イベントがスナップショットの一部であった場合
  • 影響を受けるデータベースおよびテーブルの名前
  • イベントを作成する MySQL スレッドの ID(スナップショットのみ)
  • MySQL サーバー ID(利用可能な場合)
  • timestamp
注記

binlog _rows_query_log_events オプションが有効で、コネクターで include.query オプションが有効な場合、イベントを生成した元の SQL ステートメントが含まれる query フィールドが表示されます。

6

ts_ms

コネクターがイベントを処理した時間を表示するオプションのフィールド。

注記

この時間は、Kafka Connect タスクを実行している JVM のシステムクロックに基づいています。

例のテーブルとテーブルのスキーマとペイロードがどのように表示されるかを見てみましょう。

テーブルの例

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

2.1.4.2.1. イベント値の変更の作成

以下の例は、customers テーブルの 作成 イベントを示しています。

{
  "schema": { 1
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.product.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope"
  },
  "payload": { 2
    "op": "c",
    "ts_ms": 1465491411815,
    "before": null,
    "after": {
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": {
      "version": "1.1.2.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}
1
このイベントの schema 一部は、オペレーションのスキーマ、ソース構造のスキーマ(MySQL コネクターに固有で、すべてのイベントで再利用される)、before および after フィールドのテーブル固有のスキーマを示します。
2
このイベントの payload 一部には、イベント内の情報が表示され、これは行が作成されたことを記述していること(原因 op=c)、および after フィールドに新たに挿入された行、、id first_name last_name、および email 列の値が含まれます。
2.1.4.2.2. Update change event value

customers テーブルの 更新 変更イベントの値には、作成 イベントと同じスキーマがあります。ペイロードは同じですが、異なる値を保持します。以下に例を示します(読みやすさを考慮したフォーマット)。

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { 2
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 3
      "version": "1.1.2.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 4
    "ts_ms": 1465581029523 5
  }
}

これと 挿入 イベントの値を比較すると、payload セクションにいくつかの違いを確認できます。

1
この before フィールドには、データベースのコミットの前に値が含まれる行の状態が含まれるようになりました。
2
after フィールドは行の更新された状態となり、first_name 値はになりまし Anne Marieた。beforeafter 構造を比較して、コミットが原因でこの行で実際に何が変更されているかを判断できます。
3
source フィールド構造は以前と同じフィールドを持ちますが、値は異なります(このイベントは binlog の別の位置にあります)。source 構造は、この変更の MySQL の記録に関する情報を表示します(トレース可能性の向上)。また、このイベントや他のトピックの他のイベントと比較し、このイベントが他のイベントとして同じ MySQL コミットの前、後、またはの一部として実行されたかどうかを確認するためにも使用できます。
4
これで op フィールド値はになり u、更新によりこの行が変更されていることを確認できます。
5
この ts_ms フィールドは、Debezium がこのイベントを処理したときにタイムスタンプを表示します。
注記

行のプライマリーキーまたは一意の鍵の列が更新され、Debezium は行のキーが変更され、Debezium は DELETE イベントと行の古いキーを持つ tombstone イベントを出力し、その後の行の新しいキーを含む INSERT イベントを出力します。

2.1.4.2.3. Delete change event value(変更イベント値の削除)

customers テーブルの delete 変更イベントの値は、作成 および 更新 イベントと全く同じスキーマを持ちます。ペイロードは同じですが、異なる値を保持します。以下に例を示します(読みやすさを考慮したフォーマット)。

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": null, 2
    "source": { 3
      "version": "1.1.2.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 805,
      "row": 0,
      "thread": 7,
      "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d", 4
    "ts_ms": 1465581902461 5
  }
}

作成 および 更新 イベントのペイロードと payload 部分を比較すると、いくつかの違いを確認できます。

1
この before フィールドには、データベースのコミットで削除された行の状態が表示されます。
2
after フィールドは null、行が存在しなくなったことを表しています。
3
source フィールド構造には、ts_sec および pos フィールドが変更された場合を除き、(他のシナリオでファイルが変更された可能性がありました)、以前と同じ値の多くがあります。
4
op フィールド値はになり d、この行が削除されたことを表しています。
5
ts_ms、Debezium がこのイベントを処理したときにタイムスタンプを表示します。

このイベントは、この行の削除を処理するために必要な情報をコンシューマーに提供します。一部のコンシューマーでは削除を適切に処理するために必要なため、古い値が含まれます。

MySQL コネクターのイベントは、Kafka ログの圧縮 と連携するように設計されています。これにより、すべてのキーの少なくとも最新のメッセージが保持されていれば、古いメッセージを削除することができます。これにより、Kafka はストレージ領域を回収でき、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できます。

行が削除されても、上記の delete イベント値はログの圧縮で機能します。これは、Kafka はその鍵を使用して以前のメッセージをすべて削除できるためです。message 値がの場合 null、Kafka は同じキーを持つすべてのメッセージを削除できることを認識します。これを可能にするため、Debezium の MySQL コネクターは、常に同じキーではなく null 値を持つ特別な tombstone イベントを持つ 削除 イベントにしたがいます。

2.1.5. MySQL コネクターがデータタイプをマップする方法

Debezium MySQL コネクターは、行が存在するテーブルのように構造化されたイベントを持つ行への変更を表します。イベントには、各列値のフィールドが含まれます。その列の MySQL データタイプは、値をイベントでどのように表すかを決定します。

文字列を保存する列は、文字セットと照合で MySQL で定義されます。MySQL コネクターは、binlog イベント内の列値のバイナリー表現を読み取るときに、列の文字セットを使用します。以下の表は、コネクターが MySQL データタイプを リテラル タイプとセマンティクスタイプの両方にマップする方法を示し います。

  • literal タイプ : Kafka Connect スキーマタイプを使用して値がどのように表されるか
  • semantic type : Kafka Connect スキーマがフィールドの意味をキャプチャーする方法(スキーマ名)
MySQL タイプリテラルタイプセマンティクスタイプ

BOOLEAN, BOOL

BOOLEAN

該当なし

BIT(1)

BOOLEAN

該当なし

BIT(>1)

BYTES

io.debezium.data.Bits

注記

length schema パラメーターには、ビット数を表す整数が含まれます。には little-endian 形式のビットが byte[] 含まれ、指定された数のビットが含まれるようにサイズが指定されます。

例(n がビット)

numBytes = n/8 + (n%8== 0 ? 0 : 1)

TINYINT

INT16

該当なし

SMALLINT[(M)]

INT16

該当なし

MEDIUMINT[(M)]

INT32

該当なし

INT, INTEGER[(M)]

INT32

該当なし

BIGINT[(M)]

INT64

該当なし

REAL[(M,D)]

FLOAT32

該当なし

FLOAT[(M,D)]

FLOAT64

該当なし

DOUBLE[(M,D)]

FLOAT64

該当なし

CHAR(M)]

STRING

該当なし

VARCHAR(M)]

STRING

該当なし

BINARY(M)]

BYTES

該当なし

VARBINARY(M)]

BYTES

該当なし

TINYBLOB

BYTES

該当なし

TINYTEXT

STRING

該当なし

BLOB

BYTES

該当なし

TEXT

STRING

該当なし

MEDIUMBLOB

BYTES

該当なし

MEDIUMTEXT

STRING

該当なし

LONGBLOB

BYTES

該当なし

LONGTEXT

STRING

該当なし

JSON

STRING

io.debezium.data.Json

注記

JSON ドキュメント、配列、またはスカラーの文字列表現が含まれます。

ENUM

STRING

io.debezium.data.Enum

注記

allowed schema パラメーターには、許可される値のコンマ区切りリストが含まれます。

SET

STRING

io.debezium.data.EnumSet

注記

allowed schema パラメーターには、許可される値のコンマ区切りリストが含まれます。

YEAR[(2|4)]

INT32

io.debezium.time.Year

TIMESTAMP[(M)]

STRING

io.debezium.time.ZonedTimestamp

注記

ISO 8601 形式(マイクロ秒の精度)MySQL では M、の範囲を使用でき 0-6ます。

2.1.5.1. 一時的な値

TIMESTAMP データタイプを除外する MySQL の一時タイプは、time.precision.mode 設定プロパティーの値によって異なります。デフォルト値は CURRENT_TIMESTAMP or として指定された TIMESTAMP 列の場合 NOW、値 1970-01-01 00:00:00 は Kafka Connect スキーマのデフォルト値として使用されます。

MySQL は、ゼロ値が null 値よりも優先されるため DATE, `DATETIME、、および TIMESTAMP 列の値にゼロ/値を許可します。MySQL コネクターは、列定義が null 値を許可する場合は null 値、またはコラムが null 値を許可しない場合はエポック日としてゼロ値を表します。

タイムゾーンのない一時的な値

DATETIME タイプは、「2018-01-13 09:48:27」などのローカル日時を表します。ご覧のとおり、タイムゾーンの情報はありません。このような列は、UTC を使用して列の精度に基づいてエポックミリ秒またはマイクロ秒に変換されます。TIMESTAMP タイプはタイムゾーン情報がないタイムスタンプを表し、値を書き込む際にサーバー(またはセッションの)タイムゾーンから UTC へ MySQL により変換されます。以下に例を示します。

  • DATETIME の値が 2018-06-20 06:37:03 になり 1529476623000ます。
  • TIMESTAMP の値が 2018-06-20 06:37:03 になり 2018-06-20T13:37:03Zます。

このような列は、サーバー(またはセッション)の現在のタイムゾーンを基に UTC io.debezium.time.ZonedTimestamp で同等の形式に変換されます。デフォルトでは、タイムゾーンはサーバーからクエリーされます。これに失敗する場合、database.serverTimezone コネクター設定プロパティーによって明示的に指定する必要があります。たとえば、データベースのタイムゾーン(グローバルで、またはでコネクターに設定されている場合 database.serverTimezone property)が「America/Los_Angeles」である場合、TIMESTAMP の値「2018-06-20 06:37:03」は「2018-06-20T13:37:03Z」の ZonedTimestamp 値で表されます。

Kafka Connect および Debezium を実行している JVM のタイムゾーンはこれらの変換には影響しないことに注意してください。

用語値に関連するプロパティーの詳細は、MySQL コネクター設定プロパティー のドキュメントを参照してください。

time.precision.mode=adaptive_time_microseconds(default)

MySQL コネクターは、リテラルタイプとセマンティクスタイプを列のデータタイプ定義に基づいて決定し、イベントがデータベースの正確な値を表すようにします。すべての時間フィールドはマイクロ秒単位です。の範囲 TIME にある正の値のみ 00:00:00.000000 を正しくキャプチャー 23:59:59.999999 できます。

MySQL タイプリテラルタイプセマンティクスタイプ

DATE

INT32

io.debezium.time.Date

注記

エポックからの日数を表します。

TIME[(M)]

INT64

io.debezium.time.MicroTime

注記

タイムゾーン情報が含まれ、時間の値をマイクロ秒単位で表します。MySQL では M、の範囲を使用でき 0-6ます。

DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)

INT64

io.debezium.time.Timestamp

注記

エポックからの経過時間(ミリ秒単位)を表し、タイムゾーン情報が含まれません。

DATETIME(4), DATETIME(5), DATETIME(6)

INT64

io.debezium.time.MicroTimestamp

注記

過去のエポックに対するマイクロ秒数を表し、タイムゾーン情報が含まれません。

time.precision.mode=connect

MySQL コネクターは事前定義された Kafka Connect の論理タイプを使用します。このアプローチはデフォルトのアプローチと比べると、データベース列に分 数秒の精度の値が大きい場合にイベントの精度 が低下する可能性があり 3ます。を処理 23:59:59.999 できるのは 00:00:00.000 の値だけです。テーブルの TIME 値がサポートされる範囲を超えない場合に time.precision.mode=connect のみ設定します。この connect 設定は、今後の Debezium で削除される予定です。

MySQL タイプリテラルタイプセマンティクスタイプ

DATE

INT32

org.apache.kafka.connect.data.Date

注記

エポックからの日数を表します。

TIME[(M)]

INT64

org.apache.kafka.connect.data.Time

注記

中間からの時間(マイクロ秒単位)を表し、タイムゾーン情報が含まれません。

DATETIME[(M)]

INT64

org.apache.kafka.connect.data.Timestamp

注記

エポックからの経過時間(ミリ秒単位)を表し、タイムゾーン情報が含まれません。

== 10 進数値

10 進数はプロパティーを使用して処理され decimal.handling.mode ます。

ヒント

詳細は、「 MySQL コネクター設定プロパティー 」を参照してください。

decimal.handling.mode=precise
MySQL タイプリテラルタイプセマンティクスタイプ

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

注記

scale schema パラメーターには、押す 10 点の数字を表す整数が含まれます。

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

注記

scale schema パラメーターには、押す 10 点の数字を表す整数が含まれます。

decimal.handling.mode=double
MySQL タイプリテラルタイプセマンティクスタイプ

NUMERIC[(M[,D])]

FLOAT64

該当なし

DECIMAL[(M[,D])]

FLOAT64

該当なし

decimal.handling.mode=string
MySQL タイプリテラルタイプセマンティクスタイプ

NUMERIC[(M[,D])]

STRING

該当なし

DECIMAL[(M[,D])]

STRING

該当なし

2.1.5.2. ブール値

MySQL は特定の方法で BOOLEAN 値を内部で処理します。BOOLEAN 列は内部的に TINYINT(1) datatype にマッピングされます。テーブルがストリーミング中に作成されると、Debezium が元の DDL を受け取るため、適切な BOOLEAN マッピングを使用します。スナップショットの Debezium が SHOW CREATE TABLE を実行してテーブル定義を取得し、そのテーブル定義が BOOLEAN および TINYINT(1)TINYINT(1) の両方に対して返されます。

その後、Debezium には元のタイプマッピングの取得方法がなく、にマッピングされ TINYINT(1)ます。

設定例を以下に示します。

converters=boolean
boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
boolean.selector=db1.table1.*, db1.table2.column1

2.1.5.3. 重要データタイプ

現在、Debezium MySQL コネクターは以下のデータタイプをサポートします。

MySQL タイプリテラルタイプセマンティクスタイプ

GEOMETRY, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, GEOMETRYCOLLECTION

STRUCT

io.debezium.data.geometry.Geometry

注記

2 つのフィールドが含まれる構造が含まれています。

  • srid (INT32: 構造に格納されるジオメトリーオブジェクトの種類を定義する強力な参照システム ID
  • wkb (BYTES): raw-Known-Binary(wkb)形式でエンコードされたジオメトリーオブジェクトのバイナリー表現。詳細は、Open Geospatial Consortium を参照してください。

2.1.6. MySQL コネクターおよび Kafka トピック

Debezium MySQL コネクターは INSERT UPDATE、イベントを 1 つの DELETE テーブルから 1 つの Kafka トピックに書き込みます。Kafka トピックの命名規則は以下のとおりです。

format

serverName.databaseName.tableName

例2.1 example

サーバー名で、fulfillment は、、およびの 3 つ のテーブル inventory が含まれるデータベース orderscustomersproductsます。Debezium MySQL コネクターはイベントを 3 つの Kafka トピックに生成し、データベースの各テーブルに対して 1 つずつイベントを生成します。

fulfillment.inventory.orders

fulfillment.inventory.customers

fulfillment.inventory.products

2.1.7. MySQL がサポートするトポロジー

Debezium MySQL コネクターは、以下の MySQL トポロジーをサポートします。

Standalone
単一の MySQL サーバーを使用する場合は、Debezium MySQL コネクターがサーバーをモニターできるように、binlog を有効にする(およびオプションで GTIDs が有効になっている)必要があります。バイナリーログは増分 バックアップ としても使用できるため、これは受け入れ可能です。この場合、MySQL コネクターは常に接続し、このスタンドアロン MySQL サーバーインスタンスに従います。
マスターおよびスレーブ

Debezium MySQL コネクターは、マスターまたはスレーブの 1 つ(そのスレーブで binlog が有効になっている場合)に従うことができますが、コネクターはそのサーバーに表示されるクラスターの変更のみを確認します。一般的に、これはマルチマスタートポロジー以外の問題ではありません。

コネクターは、クラスターの各サーバーで異なるサーバーの binlog の位置を記録します。そのため、コネクターは MySQL サーバーインスタンス 1 つのみに従う必要があります。サーバーが失敗した場合は、コネクターを続行する前に再起動または復元する必要があります。

高可用性クラスター
MySQL には多くの 高可用性 ソリューションが存在します。そのため、問題や障害からほぼすぐに復元できます。ほとんどの HA MySQL クラスターは GTID を使用して、スレーブがマスター上のすべての変更を追跡できるようにします。
マルチマスター

マルチマスター MySQL トポロジー は、それぞれが複数のマスターから複製する 1 つ以上の MySQL スレーブを使用します。これは、複数の MySQL クラスターのレプリケーションを集約する強力な方法です。GTID の使用が必要になります。

Debezium MySQL コネクターは、これらのマルチマスター MySQL スレーブをソースとして使用し、w 新規スレーブが古いスレーブにキャッチされていれば、さまざまなマルチマスター MySQL スレーブにフェイルオーバーできます(例: 新しいスレーブは最初のスレーブで最後に確認されたトランザクションをすべて保持します)。コネクターは、新しいマルチマスター MySQL スレーブに再接続して binlog で正しい位置を見つける際に、特定の GTID ソースを include または exclude するように設定できるため、コネクターがデータベースやテーブルのサブセットのみを使用しても機能します。

Hosted

Amazon RDS や Amazon Aurora などのホストオプションを使用するように Debezium MySQL コネクターがサポートされます。

重要

このようなホスト型のオプションでは グローバル読み取りロック が許可されないため、一貫したスナップショット を作成するためにテーブルレベルのロックが使用されます。