Debezium スタートガイド
Debezium 1.4 の使用
概要
はじめに
このチュートリアルでは、Debezium を使用して MySQL データベースの更新をキャプチャーする方法を紹介します。データベースのデータが変更されると、結果となるイベントストリームを確認できます。
このチュートリアルでは、OpenShift で Debezium サービスを開始し、データベースの簡単な例を使用して MySQL データベースサーバーを実行した後、Debezium を使用してデータベースの変更をキャプチャーします。
前提条件
-
cluster-admin
権限での OpenShift Container Platform 4.x クラスターへのアクセス AMQ Streams 1.6 の OpenShift インストールおよびサンプルファイル。
AMQ Streams のダウンロードサイト からファイルをダウンロードできます。
Debezium MySQL Connector 1.4。
Red Hat Integration のダウンロードサイト からファイルをダウンロードできます。
上記は MySQL コネクターを対象とする前提条件です。その他の Debezium コネクターの前提条件は異なる場合があります。
多様性を受け入れるオープンソースの強化
Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。まずは、マスター (master)、スレーブ (slave)、ブラックリスト (blacklist)、ホワイトリスト (whitelist) の 4 つの用語の置き換えから始めます。この取り組みは膨大な作業を要するため、今後の複数のリリースで段階的に用語の置き換えを実施して参ります。詳細は、Red Hat CTO である Chris Wright のメッセージ をご覧ください。
第1章 Debezium の紹介
Debezium は、既存のデータベースをイベントストリームに変える分散型プラットフォームで、アプリケーションはデータベースの行レベルの変更を即座に確認し、対応することが可能になります。
Debezium は Apache Kafka 上に構築され、特定のデータベース管理システムを監視する Kafka Connect 対応のコネクターが提供されます。Debezium では、アプリケーションがデータを使用する場所から、データ変更の履歴が Kafka のログに記録されます。これにより、アプリケーションはすべてのイベントを簡単、正確、かつ完全に使用することができます。アプリケーションが予期せず停止しても、見逃すものはありません。アプリケーションが再起動すると、停止した場所からイベントの使用を再開します。
Debezium には、複数のコネクターが含まれています。このチュートリアルでは MySQL コネクター を使用します。
第2章 サービスの起動
Debezium を使用するには、AMQ Streams と Debezium コネクターサービスが必要です。このチュートリアルに必要なサービスを起動するには、以下を行う必要があります。
2.1. Kafka クラスターの設定
AMQ Streams を使用して Kafka クラスターを設定します。この手順では、単一ノードの Kafka クラスターをデプロイします。
手順
OpenShift 4.x クラスターで、新しいプロジェクトを作成します。
$ oc new-project debezium-tutorial
- AMQ Streams 1.6 OpenShift インストールとサンプルファイルをダウンロードしたディレクトリーに移動します。
AMQ Streams Cluster Operator をデプロイします。
Cluster Operator は、OpenShift クラスター内で Kafka クラスターのデプロイおよび管理を行います。このコマンドは、Cluster Operator をデプロイし、作成したプロジェクトのみを監視します。
$ sed -i 's/namespace: .*/namespace: debezium-tutorial/' install/cluster-operator/*RoleBinding*.yaml $ oc apply -f install/cluster-operator -n debezium-tutorial
Cluster Operator が稼働していることを確認します。
このコマンドを実行すると、Cluster Operator は稼働中で、すべての Pod の準備ができていることを確認できます。
$ oc get pods NAME READY STATUS RESTARTS AGE strimzi-cluster-operator-5c6d68c54-l4gdz 1/1 Running 0 46s
Kafka クラスターをデプロイします。
このコマンドは、
kafka-ephemeral-single.yaml
カスタムリソースを使用して、3 つの ZooKeeper ノードと 1 つの Kafka ノードを持つ Kafka の一時クラスターを作成します。$ oc apply -f examples/kafka/kafka-ephemeral-single.yaml
Kafka クラスターが稼働していることを確認します。
このコマンドを実行すると、Kafka クラスターは稼働中で、すべての Pod の準備ができていることを確認できます。
$ oc get pods NAME READY STATUS RESTARTS AGE my-cluster-entity-operator-5b5d4f7c58-8gnq5 3/3 Running 0 41s my-cluster-kafka-0 2/2 Running 0 70s my-cluster-zookeeper-0 2/2 Running 0 107s my-cluster-zookeeper-1 2/2 Running 0 107s my-cluster-zookeeper-2 2/2 Running 0 107s strimzi-cluster-operator-5c6d68c54-l4gdz 1/1 Running 0 8m53s
2.2. Kafka Connect のデプロイ
Kafka クラスターの設定後、Debezium のカスタムコンテナーイメージに Kafka Connect をデプロイします。このサービスは、Debezium MySQL コネクターを管理するためのフレームワークを提供します。
前提条件
- Podman または Docker がインストールされ、コンテナーを作成および管理するのに十分な権限がある。
手順
- Red Hat Integration のダウンロードサイト から Debezium MySQL Connector 1.4 アーカイブをダウンロードします。
Debezium MySQL コネクターアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。
tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mysql │ ├── ...
Debezium MySQL コネクターで Kafka Connect を実行するカスタムイメージを作成し、パブリッシュします。
registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0
をベースイメージとして使用して、新規のDockerfile
を作成します。以下の例のmy-plugins
は、プラグインディレクトリーの名前に置き換えます。FROM registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
Kafka Connect は、コネクターの実行を開始する前に、
/opt/kafka/plugins
ディレクトリーにあるサードパーティープラグインをロードします。コンテナーイメージをビルドします。たとえば、前の手順で作成した
Dockerfile
をdebezium-container-for-mysql
として保存し、Dockerfile
が現在のディレクトリーにある場合は、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-mysql:latest .
docker build -t debezium-container-for-mysql:latest .
カスタムイメージをコンテナーレジストリーにプッシュします。以下のいずれかのコマンドを実行します。
podman push <my_registry.io>/debezium-container-for-mysql:latest
docker push <my_registry.io>/debezium-container-for-mysql:latest
KafkaConnect
カスタムリソースのspec.image
プロパティーを編集して、新しいコンテナーイメージを示します。このプロパティーが設定されている場合、その値は Cluster Operator のSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
変数を上書きします。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster annotations:strimzi.io/use-connector-resources: "true" spec: #... image: debezium-container-for-mysql
結果
Kafka Connect が稼働します。コンテナーには Debezium MySQL コネクターがありますが、このコネクターはこの時点ではデータベースの変更をキャプチャーするように設定されていません。
2.3. MySQL データベースのデプロイ
現時点では、Kafka クラスターおよび Kafka Connect サービスが Debezium MySQL データベースコネクターとデプロイされています。ただし、Debezium による変更のキャプチャーを可能にするデータベースサーバーが必要です。この手順では、サンプルデータベースを使用して MySQL サーバーを起動します。
手順
以下のコマンドを実行して MySQL データベースを起動します。このコマンドは、
inventory
データベースのサンプルで設定した MySQL データベースサーバーを起動します。$ oc new-app --name=mysql quay.io/debezium/example-mysql:latest
以下のコマンドを実行して MySQL データベースのクレデンシャルを設定します。このコマンドによって MySQL データベースのデプロイメント設定が更新され、ユーザー名とパスワードが追加されます。
$ oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
以下のコマンドを実行して MySQL データベースが稼働していることを検証します。コマンドの実行後、MySQL データベースが稼働し、Pod の準備が整っていることを表す出力が表示されます。
$ oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23s
新しいターミナルを開き、
inventory
データベースのサンプルにログインします。このコマンドは、MySQL データベースを実行している Pod で MySQL コマンドラインクライアントを開きます。クライアントは以前に設定したユーザー名とパスワードを使用します。
$ oc exec mysql-1-2gzx5 -it -- mysql -u mysqluser -pmysqlpw inventory mysql: [Warning] Using a password on the command line interface can be insecure. Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 7 Server version: 5.7.29-log MySQL Community Server (GPL) Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql>
inventory
データベースのテーブルを一覧表示します。mysql> show tables; +---------------------+ | Tables_in_inventory | +---------------------+ | addresses | | customers | | geom | | orders | | products | | products_on_hand | +---------------------+ 6 rows in set (0.00 sec)
データベースを調べ、それに含まれるデータを確認します。たとえば、
customers
テーブルを表示します。mysql> select * from customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
第3章 inventory
データベースの変更をキャプチャーするためのコネクターの作成
Kafka、Debezium、および MySQL サービスを起動したら、inventory
データベースで変更をキャプチャーするコネクターインスタンスを作成することができます。
この手順では、コネクターインスタンスを定義する KafkaConnector
カスタムリソース (CR) を作成して適用することで、コネクターインスタンスを作成します。CR の適用後、コネクターインスタンスは inventory
データベースの binlog
で変更のキャプチャーを開始します。binlog
は、データベースのトランザクション (個別の行の変更やスキーマの変更など) をすべて記録します。データベースの行が変更されると、Debezium は変更イベントを生成します。
通常、Kafka ツールを使用して、レプリカ数の指定などの必要なトピックを手作業で作成します。ただし、このチュートリアルでは、1 つのレプリカのみでトピックを自動作成するように Kafka が設定されています。
手順
inventory
データベースへの変更をキャプチャーするために Debezium MySQL コネクターインスタンスを設定するKafkaConnector
CR を作成します。以下のサンプル CR をコピーします。inventory-connector.yaml
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector 1 labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 2 config: 3 database.hostname: mysql 4 database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 5 database.server.name: dbserver1 6 database.whitelist: inventory 7 database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 8 database.history.kafka.topic: schema-changes.inventory 9
- 1
- コネクターの名前。
- 2
- 1 度に 1 つのタスクのみが動作する必要があります。MySQL コネクターは MySQL サーバーの
binlog
を読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。 - 3
- コネクターの設定。
- 4
- データベースホスト。これは、MySQL サーバーを実行しているコンテナーの名前です (
mysql
)。 - 5 6
- 一意なサーバー ID および名前。サーバー名は、MySQL サーバーまたはサーバーのクラスターの論理識別子です。この名前は、すべての Kafka トピックの接頭辞として使用されます。
- 7
inventory
データベースの変更のみが検出されます。- 8 9
- コネクターは、このブローカー (イベントの送信先となるブローカーと同じ) とトピック名を使用して、データベーススキーマの履歴を Kafka に保存します。再起動時に、コネクターは、コネクターが読み取りを開始すべき時点で
binlog
に存在したデータベースのスキーマを復元します。
コネクターインスタンスを適用します。
$ oc apply -f inventory-connector.yaml
inventory-connector
コネクターは登録され、inventory
データベースに対して実行が開始されます。inventory-connector
の開始時に Kafka Connect のログ出力を監視することで、inventory-connector
が作成され、inventory
データベースの変更のキャプチャーが開始されたことを確認します。Kafka Connect のログ出力を表示します。
$ oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
ログの出力を確認し、初回のスナップショットが実行されたことを確認します。以下の行は、初回のスナップショットが開始されたことを表しています。
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
スナップショットには、複数のステップが関係します。
... 2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL master (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ... 2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
スナップショットの完了後、Debezium は
inventory
データベースのbinlog
への更新に対してキャプチャーを開始します。... 2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306] Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5) 2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306] ...
第4章 変更イベントの表示
Debezium MySQL コネクターのデプロイ後に、inventory
データベースへの変更のキャプチャーが開始されます。
コネクターの開始時に、イベントが以下のトピックに書き込まれたことを確認できます。これらのトピックの名前はすべて、コネクターの名前である dbserver1
で始まります。
dbserver1
- 変更がキャプチャーされるテーブルに適用される DDL ステートメントが書き込まれるスキーマ変更トピック。
dbserver1.inventory.products
-
inventory
データベースのproducts
テーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.products_on_hand
-
inventory
データベースのproducts_on_hand
テーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.customers
-
inventory
データベースのcustomers
テーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.orders
-
inventory
データベースのorders
テーブルの変更イベントレコードを受け取ります。
このチュートリアルでは、dbserver1.inventory.customers
トピックについて詳しく見ていきます。このトピックでは、異なるタイプの変更イベントを表示し、コネクターがそれらのイベントをキャプチャーする方法を確認します。
4.1. 作成 イベントの表示
dbserver1.inventory.customers
トピックを表示すると、MySQL コネクターが inventory
データベースの 作成 イベントをどのようにキャプチャーしたが分かります。この場合、作成 イベントは、データベースに追加された新規顧客をキャプチャーします。
手順
新しいターミナルを開き、
kafka-console-consumer
を使用してトピックの最初からdbserver1.inventory.customers
トピックを使用します。このコマンドは、Kafka (
my-cluster-kafka-0
) を実行している Pod で簡単なコンシューマー (kafka-console-consumer.sh
) を実行します。$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
コンシューマーは、
customers
テーブルの各行に 1 つずつ、4 つのメッセージ (JSON 形式) を返します。各メッセージには、対応するテーブル行のイベントレコードが含まれます。各イベントには、キー と 値 という 2 つの JSON ドキュメントがあります。キーは行のプライマリーキーに対応し、値は行の詳細 (行に含まれるフィールド、各フィールドの値、および行で実行された操作のタイプ) を表します。
最後のイベントでは、キー の詳細を確認します。
最後のイベントの キー の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" } ], "optional":false, "name":"dbserver1.inventory.customers.Key" }, "payload":{ "id":1004 } }
このイベントには、
schema
とpayload
の 2 つの部分があります。schema
には、ペイロードの内容を記述する Kafka Connect スキーマが含まれています。この場合、ペイロードはdbserver1.inventory.customers.Key
という名前の構造
で、これはオプションではなく、必須フィールドが 1 つあります (タイプint32
のID
)。payload
には、値が1004
のid
フィールドが 1 つあります。イベントの key を確認すると、このイベントは
id
の主キー列の値が1004
であるinventory.customers
テーブルの行に提供されることが分かります。同じイベントの 値 の詳細を確認します。
イベントの 値 は、行が作成されたことを示し、その行に含まれる内容が記載されています (この場合は挿入された行の
id
、first_name
、last_name
、およびemail
)。最後のイベントの 値の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "int64", "optional": false, "field": "ts_sec" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "boolean", "optional": true, "field": "snapshot" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "db" }, { "type": "string", "optional": true, "field": "table" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "version": "1.4.2.Final", "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "inventory", "table": "customers" }, "op": "c", "ts_ms": 1486500577691 } }
イベントのこの部分ははるかに長くなりますが、イベントの キー と同様に
schema
とpayload
もあります。schema
には、dbserver1.inventory.customers.Envelope
(バージョン 1) という名前の Kafka Connect スキーマが含まれており、5 つのフィールドを追加できます。op
-
操作のタイプを記述する文字列値が含まれる必須フィールド。MySQL コネクターの値は、
c
(作成または挿入)、u
(更新)、d
(削除)、およびr
(読み取り、初回のスナップショットでない場合) です。 before
-
任意のフィールド。存在する場合は、イベント発生前の行の状態が含まれます。この構造は、
dbserver1.inventory.customers.Value
Kafka Connect スキーマによって記述され、dbserver1
コネクターはinventory.customers
テーブルのすべての行に使用します。 after
-
任意のフィールド。存在する場合は、イベント発生後の行の状態が含まれます。この構造は、
before
で使用されるのと同じdbserver1.inventory.customers.Value
Kafka Connect スキーマで記述されます。 source
-
イベントのソースメタデータを記述する構造が含まれる必須のフィールド。MySQL の場合は複数のフィールドが含まれます: コネクター名、イベントが記録された
binlog
ファイルの名前、binlog
ファイルでのイベント発生位置、イベント内の行 (複数ある場合)、影響を受けるデータベースおよびテーブルの名前、変更を行った MySQL スレッド ID、このイベントはスナップショットの一部であったかどうか、MySQL サーバー ID (ある場合)、および秒単位のタイムスタンプ。 ts_ms
- 任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックを使用) が含まれます。
注記イベントの JSON 表現は、記述される行よりもはるかに長くなります。これは、すべてのイベントキーと値で Kafka Connect は ペイロード を記述する スキーマ を提供するためです。今後、この構造が変更される可能性があります。ただし、特に使用する側のアプリケーションが時間とともに進化する場合は、キーと値のスキーマがイベント自体にあると、メッセージを理解するのが非常に容易になります。
Debezium MySQL コネクターは、データベーステーブルの構造に基づいてこれらのスキーマを構築します。DDL ステートメントを使用して MySQL データベースのテーブル定義を変更する場合、コネクターはこれらの DDL ステートメントを読み取り、Kafka Connect スキーマを更新します。これは、イベント発生時にイベントの発生元となったテーブルと全く同じように、各イベントが構造化される唯一の方法です。ただし、単一テーブルのイベントがすべて含まれる Kafka トピックには、テーブル定義の各状態に対応するイベントが含まれる場合があります。
JSON コンバーターにはすべてのメッセージのキーおよび値スキーマが含まれるため、非常に詳細なイベントを生成します。
イベントの キー および 値 スキーマを、
inventory
データベースの状態と比較します。MySQL コマンドラインクライアントを実行しているターミナルで、以下のステートメントを実行します。mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
これは、確認したイベントレコードがデータベースのレコードと一致することを示しています。
4.2. データベースの更新および 更新 イベントの表示
Debezium MySQL コネクターが inventory
データベースで 作成 イベントをキャプチャーする方法を確認しました。次に、レコードの 1 つを変更し、コネクターがこれをどのようにキャプチャーするかを見てみましょう。
この手順を完了すると、データベースのコミットで変更した内容の詳細を確認する方法と、変更イベントを比較して、他の変更と関連していつ変更が発生したかを判断する方法について学ぶことができます。
手順
MySQL コマンドラインクライアントを実行しているターミナルで、以下のステートメントを実行します。
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
更新された
customers
テーブルを表示します。mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
kafka-console-consumer
を実行しているターミナルに切り替え、新しい 5 番目のイベントを確認します。customers
テーブルのレコードを変更することで、Debezium MySQL コネクターは新しいイベントを生成しました。新しい JSON ドキュメントが 2 つあるはずです。1 つはイベントの キー のドキュメントで、もう 1 つは新しいイベントの 値 のドキュメントです。更新 イベントの キー の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }
この キー は、以前のイベントの キー と同じです。
新しいイベントの 値 は次のとおりです。
schema
セクションには変更がないため、payload
セクションのみを表しています (書式を調整して読みやすくしてあります)。{ "schema": {...}, "payload": { "before": { 1 "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { 2 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { 3 "name": "1.4.2.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501486, "gtid": null, "file": "mysql-bin.000003", "pos": 364, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "u", 4 "ts_ms": 1486501486308 5 } }
- 1
before
フィールドは、データベースのコミット前の行と値の状態を表しています。- 2
after
フィールドは、更新された行の状態を表し、first_name
の値はAnne Marie
になっています。- 3
source
フィールド構造体には以前と同じ値が多数ありますが、ts_sec
およびpos
フィールドは更新されています (他の状況ではfile
が変更されることがあります)。- 4
op
フィールドの値はu
になっており、更新によってこの行が変更されたことを示しています。- 5
- The
ts_ms
フィールドは、Debezium がこのイベントを処理したときのタイムスタンプを示します。
payload
セクションを表示すると、更新 イベントに関する重要な情報を確認できます。-
before
とafter
構造を比較することで、コミットが原因で影響を受けた行で実際に何が変更されたかを判断できます。 -
ソース
構造を確認して、MySQL の変更の記録に関する情報を確認できます (トレーサビリティーを提供)。 -
イベントの
payload
セクションと、同じトピック (または別のトピック) の他のイベントを比較することで、別のイベントと同じ MySQL コミットの前、後、または一部としてイベントが発生したかどうかを判断できます。
4.3. データベースのレコードの削除および 削除 イベントの表示
Debezium MySQL コネクターが inventory
データベースで 作成 および 更新 イベントをキャプチャーする方法を確認しました。次に、レコードの 1 つを削除し、コネクターがこれをどのようにキャプチャーするかを見てみましょう。
この手順を完了すると、削除 イベントの詳細を見つける方法と、Kafka が ログコンパクション を使用して、コンシューマーがすべてのイベントを取得できる状態で 削除 イベントの数を減らす方法を説明します。
手順
MySQL コマンドラインクライアントを実行しているターミナルで、以下のステートメントを実行します。
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)
注記上記のコマンドが外部キー制約違反で失敗する場合は、以下のステートメントを使用して、addresses テーブルから顧客アドレスの参照を削除する必要があります。
mysql> DELETE FROM addresses WHERE customer_id=1004;
kafka-console-consumer
を実行しているターミナルに切り替え、2 つ の新しいイベントを表示します。customers
テーブルの行を削除することで、Debezium MySQL コネクターは 2 つの新しいイベントを生成しました。最初の新規イベントの キー および 値 を確認します。
最初の新規イベントの キー の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }
この キー は、これまで確認した 2 つのイベントの キー と同じです。
最初の新規イベントの 値 は以下のとおりです (書式を調整して読みやすくしてあります)。
{ "schema": {...}, "payload": { "before": { 1 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": null, 2 "source": { 3 "name": "1.4.2.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501558, "gtid": null, "file": "mysql-bin.000003", "pos": 725, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "d", 4 "ts_ms": 1486501558315 5 } }
よって、このイベントは、行の削除を処理に必要な情報をコンシューマーに提供します。古い値も提供されます。これは、コンシューマーによっては削除を適切に処理するのに古い値が必要になることがあるからです。
2 つ目の新規イベントの キー および 値 を確認します。
2 つ目の新規イベントの 値 は以下のとおりです (書式を調整して読みやすくしてあります)。
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }
繰り返しになりますが、この キー は、これまで確認した 3 つのイベントのキーと同じです。
同じイベントの 値 は以下のとおりです (書式を調整して読みやすくしてあります)。
{ "schema": null, "payload": null }
Kafka が ログコンパクション に設定されている場合、トピックの後半に同じキーを持つメッセージが 1 つ以上あると、トピックから古いメッセージが削除されます。この最後のイベントには、キーと空の値があるため、tombstone (トゥームストーン) イベントと呼ばれます。これは、Kafka が同じキーを持つこれまでのメッセージをすべて削除することを意味します。これまでのメッセージが削除されても、tombstone イベントであるため、コンシューマーは最初からトピックを読み取ることができ、イベントを見逃しません。
4.4. Kafka Connect サービスの再起動
Debezium MySQL コネクターが作成、更新、および削除イベントをキャプチャーする方法を確認しました。次に、稼働していない場合でもどのように変更イベントをキャプチャーするかを見てみましょう。
Kafka Connect サービスは、登録されたコネクターのタスクを自動的に管理します。したがって、オフラインになると、再起動時に稼働していないタスクがすべて開始されます。つまり、Debezium が稼働していない場合でも、変更をデータベースに報告できます。
この手順では、Kafka Connect を停止し、データベースのデータを一部変更した後、Kafka Connect を再起動して変更イベントを確認します。
手順
Kafka Connect サービスを停止します。
Kafka Connect サービスのデプロイメント設定を開きます。
$ oc edit dc/my-connect-cluster-connect
デプロイメント設定が表示されます。
apiVersion: apps.openshift.io/v1 kind: DeploymentConfig metadata: ... spec: replicas: 1 ...
-
spec.replicas
の値を0
に変更します。 - デプロイメント設定を保存します。
Kafka Connect サービスが停止したことを確認します。
このコマンドを実行すると、Kafka Connect サービスが完了し、稼働している Pod がないことを確認できます。
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-1-dxcs9 0/1 Completed 0 7h
Kafka Connect サービスが停止している間に、MySQL クライアントを実行しているターミナルに切り替え、新しいレコードをデータベースに追加します。
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
Kafka Connect サービスを再起動します。
Kafka Connect サービスのデプロイメント設定を開きます。
$ oc edit dc/my-connect-cluster-connect
デプロイメント設定が表示されます。
apiVersion: apps.openshift.io/v1 kind: DeploymentConfig metadata: ... spec: replicas: 0 ...
-
spec.replicas
の値を1
に変更します。 - デプロイメント設定を保存します。
Kafka Connect サービスが再起動したことを確認します。
このコマンドを実行すると、Kafka Connect サービスは稼働中で、Pod の準備ができていることを確認できます。
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-2-q9kkl 1/1 Running 0 74s
-
kafka-console-consumer.sh
を実行しているターミナルに切り替えます。新しいイベントを受け取ると表示されます。 Kafka Connect がオフラインだったときに作成したレコードを確認します (書式を調整して読みやすくしてあります)。
{ ... "payload":{ "id":1005 } } { ... "payload":{ "before":null, "after":{ "id":1005, "first_name":"Sarah", "last_name":"Thompson", "email":"kitt@acme.com" }, "source":{ "version":"1.4.2.Final", "connector":"mysql", "name":"dbserver1", "ts_ms":1582581502000, "snapshot":"false", "db":"inventory", "table":"customers", "server_id":223344, "gtid":null, "file":"mysql-bin.000004", "pos":364, "row":0, "thread":5, "query":null }, "op":"c", "ts_ms":1582581502317 } }
第5章 次のステップ
チュートリアルが完了したら、以下のステップを検討します。
チュートリアルをさらに試してみる。
MySQL コマンドラインクライアントを使用して、データベーステーブルの行を追加、変更、および削除し、トピックへの影響を確認します。外部キーによって参照される行は削除できないことに注意してください。
Debezium のデプロイメントを計画する。
Debezium を OpenShift または Red Hat Enterprise Linux にインストールできます。詳細は、以下を参照してください。
改訂日時: 2022-12-03 12:06:38 +1000