Debezium の RHEL へのインストール
Red Hat Enterprise Linux (RHEL) での Debezium 1.9.7 の使用
概要
はじめに
多様性を受け入れるオープンソースの強化
Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。まずは、マスター (master)、スレーブ (slave)、ブラックリスト (blacklist)、ホワイトリスト (whitelist) の 4 つの用語の置き換えから始めます。この取り組みは膨大な作業を要するため、今後の複数のリリースで段階的に用語の置き換えを実施して参ります。詳細は、Red Hat CTO である Chris Wright のメッセージ をご覧ください。
第1章 Debezium の概要
Debezium for Red Hat Integration は、データベース操作をキャプチャーし、行レベル操作のデータ変更イベントレコードを作成して、Apache Kafka トピックに変更イベントレコードをストリーミングする分散型プラットフォームです。Debezium は Apache Karaf に構築され、AMQ Streams とデプロイおよび統合されます。
Debezium によって、データベーステーブルの行レベルの変更がキャプチャーされ、対応する変更イベントが AMQ Streams に渡されます。アプリケーションはこれらの 変更イベントストリーム を読み取りでき、変更イベントが発生した順にアクセスできます。
Debezium は、Debezium for Red Hat Integration のアップストリームコミュニティープロジェクトです。
Debezium には、以下を含む複数の用途があります。
- データレプリケーション
- キャッシュの更新およびインデックスの検索
- モノリシックアプリケーションの簡素化
- データ統合
- ストリーミングクエリーの有効化
Debezium は、以下の一般的なデータベース用の Apache Kafka Connect コネクターを提供します。
第2章 Debezium コネクターの RHEL へのインストール
コネクタープラグインで Kafka Connect を拡張して、AMQ Streams 経由で Debezium コネクターをインストールします。AMQ Streams のデプロイ後に、Kafka Connect で Debezium をコネクター設定としてデプロイできます。
2.1. Kafka トピック作成に関する推奨事項
Debezium は、データを複数の Apache Kafka トピックに保存します。トピックは、管理者が事前に作成する必要があります。または、Kafka Connect を設定して トピックを自動的に設定します。
以下のリストで、トピックの作成時に考慮すべき制限および推奨事項を説明します。
- Debezium Db2、MySQL、Oracle、および SQLServer コネクターのデータベース履歴トピック
上記の各コネクターには、データベース履歴トピックが必要です。データベース履歴トピックを手動で作成する場合でも、Kafka ブローカーを使用してトピックを自動的に作成するか、Kafka Connect を使用してトピックを作成する場合でも、トピックが以下の設定で設定されていることを確認してください。
- 無限または非常に長い保持期間。
- 3 以上の実稼働環境でのレプリケーション係数。
- 単一パーティション。
- その他のトピック
指定のレコードの 最後の 変更イベントのみが保存されるように Kafka ログコンパクション を有効にする場合は、Apache Kafka で以下のトピックプロパティーを設定します。
-
min.compaction.lag.ms
トピックコンシューマーがすべてのイベントを受信してマーカーを削除するのに十分な時間を確保するには、シンクコネクターに予想される最大ダウンタイムよりも大きい値を前述のプロパティーの値に指定します。たとえば、シンクコネクターに更新を適用する際に発生する可能性のあるダウンタイムについて考えてみましょう。
-
- 実稼働でレプリケートされます。
単一パーティション。
単一パーティションルールを緩和できますが、アプリケーションはデータベースの異なる行に対して順不同のイベントを処理する必要があります。単一行のイベントは、引き続き完全に順序付けされます。複数のパーティションを使用する場合、Kafka がキーをハッシュ化してパーティションを決定するのがデフォルトの動作になります。その他のパーティションストラテジーでは、単一メッセージ変換 (SMT: Single Message Transformations) を使用して、各レコードにパーティション番号を設定する必要があります。
2.2. Debezium コネクター設定の計画
Debezium コネクターをデプロイする前に、コネクターの設定方法を決定してください。設定は、コネクターの動作を指定する情報を提供し、Debezium がソースデータベースに接続することを可能にします。
コネクター設定を JSON として指定し、コネクターを登録する準備ができたら、curl
を使用して設定を Kafka Connect API エンドポイントに送信します。
前提条件
- ソースデータベースがデプロイされ、Debezium コネクターがデータベースにアクセスできます。
コネクターがソースデータベースにアクセスするために必要な以下の情報を把握しています。
- データベースホストの名前または IP アドレス。
- データベースに接続するためのポート番号。
- コネクターがデータベースへのサインインに使用できるアカウントの名前。
- データベースユーザーアカウントのパスワード。
- データベースの名前。
- コネクターの情報の取得元であるテーブルの名前。
- コネクターの変更イベントの出力先である Kafka ブローカーの名前。
- コネクターのデータベース履歴情報の送信先である Kafka トピックの名前。
手順
Debezium コネクターに適用する設定を JSON 形式で指定します。
次の例は、Debezium MySQL コネクターの簡単な設定を示しています。
{ "name": "inventory-connector", 1 "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", 2 "tasks.max": "1", 3 "database.hostname": "mysql", 4 "database.port": "3306", 5 "database.user": "debezium", 6 "database.password": "dbz", 7 "database.server.id": "184054", 8 "database.server.name": "dbserver1", "database.include.list": "public.inventory", 9 "database.history.kafka.bootstrap.servers": "kafka:9092", 10 "database.history.kafka.topic": "dbhistory.inventory" 11 } }
- 1
- Kafka Connect クラスターに登録するコネクターの名前。
- 2
- コネクタークラスの名前。
- 3
- 同時に動作できるタスクの数。一度に操作できるタスクは 1 つだけです。
- 4
- ホストデータベースインスタンスのホスト名または IP アドレス。
- 5
- データベースインスタンスのポート番号。
- 6
- Debezium がデータベースに接続するユーザーアカウントの名前。
- 7
- データベースユーザーアカウントのパスワード
- 8
- 一意なサーバー ID および名前。サーバー名は、Debezium がアクセスできるデータベースサーバーまたはサーバーのクラスターの論理識別子です。Debezium は、作成する各 Kafka トピックにこの名前を接頭辞として付けます。
- 9
- コネクターが変更イベントをキャプチャーするテーブルの一覧。
- 10
- コネクターがデータベーススキーマ履歴を送信する Kafka ブローカーの名前。指定されたブローカーは、コネクターが出力する変更イベントも受け取ります。
- 11
- スキーマ履歴を格納する Kafka トピックの名前。
コネクターの再起動後、コネクターは停止した時点からデータベースログの読み取りを再開し、オフライン中に発生したトランザクションのイベントを出力します。コネクターは、未読トランザクションの変更イベントを Kafka に書き込む前に、スキーマ履歴をチェックしてから、元のトランザクションが発生したときに有効だったスキーマを適用します。
関連情報
- コネクターのタイプごとに設定できる設定プロパティーの詳細は、Debezium User Guide のコネクターのデプロイメントドキュメントを参照してください。
2.3. Red Hat Enterprise Linux での AMQ Streams を使用した Debezium のデプロイ
この手順では、Red Hat Enterprise Linux で Debezium のコネクターを設定する方法を説明します。コネクターは、Apache Kafka Connect を使用して AMQ Streams クラスターにデプロイされます。Kafka Connect は Apache Kafka と外部システムとの間でデータをストリーミングするためのフレームワークです。Kafka Connect は、スタンドアロンモードではなく分散モードで実行する必要があります。
前提条件
Debezium のデプロイ先であるホスト環境は、サポートされている設定 で Red Hat Enterprise Linux、AMQ Streams、および Java を実行します。
- AMQ Streams のインストール方法の詳細については、Installing AMQ Streams を参照してください。
- 単一の ZooKeeper ノードおよび単一の Kafka ノードが含まれる実稼働ではない、基本的な AMQ Streams クラスターをインストールする方法の詳細は、Running a single node AMQ Streams cluster を参照してください。
以前のバージョンの AMQ Streams を実行している場合は、最初に AMQ Streams 2.2 にアップグレードする必要があります。アップグレードプロセスの詳細については、AMQ Streams and Kafka upgrades を参照してください。
-
ホストの管理者権限 (
sudo
アクセス) があります。 - Apache ZooKeeper と Apache Kafka ブローカーが実行されています。
- Kafka Connect は、スタンドアロンモードではなく、分散モード で実行されています。
-
AMQ Streams のインストール時に作成された
kafka
ユーザーの認証情報を把握しています。 - ソースデータベースがデプロイされ、Debezium をデプロイするホストがデータベースにアクセスできます。
- コネクターの設定方法 を理解しています。
手順
- 使用する Debezium コネクターを Red Hat Integration ダウンロードサイト からダウンロードします。たとえば、Debezium 1.9.7 MySQL Connector をダウンロードして、Dabezium を MySQL データベースと使用します。
AMQ Streams をデプロイした Red Hat Enterprise Linux ホストで、ターミナルウィンドウを開き、
/opt/kafka
にconnector-plugins
ディレクトリーを作成します (まだ存在しない場合)。$ sudo mkdir /opt/kafka/connector-plugins
次のコマンドを入力して、
/opt/kafka/connector-plugins
ディレクトリーにダウンロードした Debezium コネクターアーカイブの内容を抽出します。$ sudo unzip debezium-connector-mysql-1.9.7.Final.zip -d /opt/kafka/connector-plugins
- インストールするコネクターごとに、手順 1 - 3 を繰り返します。
ターミナルウィンドウから、
kafka
ユーザーとしてサインインします。$ su - kafka $ Password:
Kafka Connect プロセスが実行中の場合は、停止します。
次のコマンドを入力して、Kafka Connect が分散モードで実行されているかどうかを確認します。
$ jcmd | grep ConnectDistributed
プロセスが実行中の場合、コマンドはプロセス ID を返します。次に例を示します。
18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
プロセス ID を指定して
kill
コマンドを入力し、プロセスを停止します。次に例を示します。$ kill 18514
/opt/kafka/config/
にあるconnect-distributed.properties
ファイルを編集し、plugin.path
の値を Debezium コネクタープラグインの親ディレクトリーの場所に設定します。plugin.path=/opt/kafka/connector-plugins
分散モードで Kafka Connect を起動します。
$ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Kafka Connect の実行後、Kafka Connect API を使用してコネクターを登録します。
curl
コマンドを入力して、「Debezium コネクター設定の計画」 で指定したコネクター設定 JSON をlocalhost:8083/connectors
の Kafka Connect REST API エンドポイントに送信するPOST
リクエストを送信します。
以下に例を示します。curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ \ -d '{"name": "inventory-connector", "config": \ { "connector.class": "io.debezium.connector.mysql.MySqlConnector", \ "tasks.max": "1", \ "database.hostname": "mysql", \ "database.port": "3306", \ "database.user": "debezium", \ "database.password": "dbz", \ "database.server.id": "184054", \ "database.server.name": "dbserver1", \ "database.include.list": "public.inventory", \ "database.history.kafka.bootstrap.servers": "kafka:9092", \ "database.history.kafka.topic": "dbhistory.inventory" } }'
複数のコネクターを登録するには、コネクターごとに個別のリクエストを送信します。
Kafka Connect を再起動して、変更を実装します。
Kafka Connect が起動すると、
connector-plugins
ディレクトリーから、設定済みの Debezium コネクターをロードします。設定が完了すると、デプロイされたコネクターはソースデータベースに接続し、挿入、更新、または削除された行またはドキュメントごとにイベントを生成します。
- 各 Kafka Connect ワーカーノードに対して、ステップ 5 - 10 を繰り返します。
次のステップ
関連情報
2.4. デプロイメントの確認
コネクターが起動すると、設定されたデータベースのスナップショットが実行され、指定したテーブルごとにトピックが作成されます。
前提条件
「Red Hat Enterprise Linux での AMQ Streams を使用した Debezium のデプロイ」 の手順に基づいて、Red Hat Enterprise Linux にコネクターをデプロイしています。手順
ホストのターミナルウィンドウで、次のコマンドを入力して、Kafka Connect API からコネクターのリストを要求します。
$ curl -H "Accept:application/json" localhost:8083/connectors/
クエリーは、デプロイされたコネクターの名前を返します。以下に例を示します。
["inventory-connector"]
ホストのターミナルウィンドウで次のコマンドを入力して、コネクターが実行しているタスクを表示します。
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
コマンドは、以下の例のような出力を返します。
HTTP/1.1 200 OK Date: Thu, 06 Feb 2020 22:12:03 GMT Content-Type: application/json Content-Length: 531 Server: Jetty(9.4.20.v20190813) { "name": "inventory-connector", ... "tasks": [ { "connector": "inventory-connector", "task": 0 } ] }
Kafka クラスター内のトピックのリストを表示します。
ターミナルウィンドウから/opt/kafka/bin/
に移動し、以下のシェルスクリプトを実行します。kafka-topics.sh --bootstrap-server=localhost:9092 --list
Kafka ブローカーは、コネクターが作成するトピックのリストを返します。使用可能なトピックは、コネクターの
snapshot.mode
、snapshot.include.collection.list
、およびtable.include.list
の設定プロパティーの設定によって異なります。デフォルトでは、コネクターはデータベース内の非システムテーブルごとにトピックを作成します。トピックの内容を表示します。
ターミナルウィンドウから/opt/kafka/bin/
に移動し、kafka-console-consumer.sh
シェルスクリプトを実行して、前のコマンドで返されたトピックの 1 つの内容を表示します。
以下に例を示します。
kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector_mysql.inventory.products_on_hand
トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.1 統合変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.9.7.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
上記の例では、
payload
値は、コネクタースナップショットがテーブルinventory.products_on_hand
から 読み込み (op" ="r"
) イベントを生成したことを示しています。product_id
レコードのbefore
状態はnull
であり、レコードに以前の値が存在しないことを示します。"after"
状態がproduct_id
101
で項目のquantity
を3
で示しています。
次のステップ
各コネクターで使用できる設定の詳細、および変更データのキャプチャーを有効にするためにソースデータベースを設定する方法については、Debezium ユーザーガイド を参照してください。
2.5. Kafka Connect クラスターの Debezium コネクタープラグインの更新
Red Hat Enterprise Linux にデプロイされているバージョンの Debezium コネクターを置き換えるには、コネクタープラグインを更新します。
手順
- 置き換える Debezium コネクタープラグインのコピーを Red Hat Integration ダウンロードサイト からダウンロードします。
Debezium コネクターアーカイブの内容を
/opt/kafka/connector-plugins
ディレクトリーに展開します。$ sudo unzip debezium-connector-mysql-1.9.7.Final.zip -d /opt/kafka/connector-plugins
- Kafka Connect を再起動します。
付録A サブスクリプションの使用
AMQ Streams は、ソフトウェアサブスクリプションによって提供されます。サブスクリプションを管理するには、Red Hat カスタマーポータルでアカウントにアクセスします。
アカウントへのアクセス
- access.redhat.com に移動します。
- アカウントがない場合は、作成します。
- アカウントにログインします。
サブスクリプションのアクティベート
- access.redhat.com に移動します。
- サブスクリプション に移動します。
- Activate a subscription に移動し、16 桁のアクティベーション番号を入力します。
zip および tar ファイルのダウンロード
zip または tar ファイルにアクセスするには、カスタマーポータルを使用して、ダウンロードする関連ファイルを検索します。RPM パッケージを使用している場合は、この手順は必要ありません。
- ブラウザーを開き、access.redhat.com/downloads で Red Hat カスタマーポータルの Product Downloads ページにログインします。
- INTEGRATION AND AUTOMATION まで下方向にスクロールします。
- Red Hat Integration をクリックして、Red Hat Integration ダウンロードページを表示します。
- コンポーネントの ダウンロード リンクをクリックします。
改訂日時: 2022-11-27 05:23:15 +1000