第3章 inventory データベースの変更をキャプチャーするコネクターの作成
Kafka、Debezium、および MySQL サービスを起動したら、inventory
データベースの変更をキャプチャーするコネクターインスタンスを作成できます。
この手順では、コネクターインスタンスを定義する KafkaConnector
カスタムリソース (CR) を作成して適用することで、コネクターインスタンスを作成します。CR の適用後、コネクターインスタンスは inventory
データベースの binlog
で変更のキャプチャーを開始します。binlog
は、データベースのトランザクションをすべて記録します (各行の変更やスキーマの変更など)。データベースの行が変更されると、Debezium は変更イベントを生成します。
通常、Kafka ツールを使用して、レプリカ数の指定などの必要なトピックを手作業で作成します。ただし、このチュートリアルでは、1 つのレプリカのみでトピックを自動作成するように Kafka が設定されています。
手順
inventory
データベースへの変更をキャプチャーするために Debezium MySQL コネクターインスタンスを設定するKafkaConnector
CR を作成します。以下のサンプル CR をコピーします。inventory-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: inventory-connector 1 labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 2 config: 3 database.hostname: mysql 4 database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 5 database.server.name: dbserver1 6 database.whitelist: inventory 7 database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 8 database.history.kafka.topic: schema-changes.inventory 9
- 1
- コネクターの名前。
- 2
- 1 度に 1 つのタスクのみが動作する必要があります。MySQL コネクターは MySQL サーバーの
binlog
を読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。 - 3
- コネクターの設定。
- 4
- データベースホスト。MySQL サーバーを実行しているコンテナーの名前です (
mysql
)。 - 5 6
- 一意なサーバー ID および名前。サーバー名は、MySQL サーバーまたはサーバーのクラスターの論理識別子です。この名前は、すべての Kafka トピックのプレフィックスとして使用されます。
- 7
inventory
データベースの変更のみが検出されます。- 8 9
- コネクターは、このブローカー (イベントの送信先となるブローカーと同じ) とトピック名を使用して、データベーススキーマの履歴を Kafka に保存します。再起動時に、コネクターが読み取りを開始すべき時点で
binlog
に存在したデータベースのスキーマを復元します。
コネクターインスタンスを適用します。
$ oc apply -f inventory-connector.yaml
inventory-connector
コネクターが登録され、inventory
データベースに対して実行が開始されます。inventory-connector
の開始時に Kafka Connect のログ出力を監視することで、inventory-connector
が作成され、inventory
データベースの変更のキャプチャーが開始されたことを確認します。Kafka Connect のログ出力を表示します。
$ oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
ログの出力を確認し、初回のスナップショットが実行されたことを確認します。以下の行は、初回のスナップショットが開始されたことを表しています。
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
スナップショットには、複数のステップが関係します。
... 2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL master (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ... 2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
スナップショットの完了後、Debezium は
inventory
データベースのbinlog
への更新に対してキャプチャーを開始します。... 2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306] Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5) 2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306] ...