Debezium の RHEL へのインストール


Red Hat Integration 2022.Q4

Red Hat Enterprise Linux (RHEL) での Debezium 1.9.7 の使用

概要

本ガイドでは、AMQ Streams を使用して RHEL に Red Hat Debezium をインストールする方法を説明します。

はじめに

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。まずは、マスター (master)、スレーブ (slave)、ブラックリスト (blacklist)、ホワイトリスト (whitelist) の 4 つの用語の置き換えから始めます。この取り組みは膨大な作業を要するため、今後の複数のリリースで段階的に用語の置き換えを実施して参ります。詳細は、Red Hat CTO である Chris Wright のメッセージ をご覧ください。

第1章 Debezium の概要

Debezium for Red Hat Integration は、データベース操作をキャプチャーし、行レベル操作のデータ変更イベントレコードを作成して、Apache Kafka トピックに変更イベントレコードをストリーミングする分散型プラットフォームです。Debezium は Apache Karaf に構築され、AMQ Streams とデプロイおよび統合されます。

Debezium によって、データベーステーブルの行レベルの変更がキャプチャーされ、対応する変更イベントが AMQ Streams に渡されます。アプリケーションはこれらの 変更イベントストリーム を読み取りでき、変更イベントが発生した順にアクセスできます。

Debezium は、Debezium for Red Hat Integration のアップストリームコミュニティープロジェクトです。

Debezium には、以下を含む複数の用途があります。

  • データレプリケーション
  • キャッシュの更新およびインデックスの検索
  • モノリシックアプリケーションの簡素化
  • データ統合
  • ストリーミングクエリーの有効化

Debezium は、以下の一般的なデータベース用の Apache Kafka Connect コネクターを提供します。

第2章 Debezium コネクターの RHEL へのインストール

コネクタープラグインで Kafka Connect を拡張して、AMQ Streams 経由で Debezium コネクターをインストールします。AMQ Streams のデプロイ後に、Kafka Connect で Debezium をコネクター設定としてデプロイできます。

2.1. Kafka トピック作成に関する推奨事項

Debezium は、データを複数の Apache Kafka トピックに保存します。トピックは、管理者が事前に作成する必要があります。または、Kafka Connect を設定して トピックを自動的に設定します

以下のリストで、トピックの作成時に考慮すべき制限および推奨事項を説明します。

Debezium Db2、MySQL、Oracle、および SQLServer コネクターのデータベース履歴トピック

上記の各コネクターには、データベース履歴トピックが必要です。データベース履歴トピックを手動で作成する場合でも、Kafka ブローカーを使用してトピックを自動的に作成するか、Kafka Connect を使用してトピックを作成する場合でも、トピックが以下の設定で設定されていることを確認してください。

  • 無限または非常に長い保持期間。
  • 3 以上の実稼働環境でのレプリケーション係数。
  • 単一パーティション。
その他のトピック
  • 指定のレコードの 最後の 変更イベントのみが保存されるように Kafka ログコンパクション を有効にする場合は、Apache Kafka で以下のトピックプロパティーを設定します。

    • min.compaction.lag.ms
    • delete.retention.ms

      トピックコンシューマーがすべてのイベントを受信してマーカーを削除するのに十分な時間を確保するには、シンクコネクターに予想される最大ダウンタイムよりも大きい値を前述のプロパティーの値に指定します。たとえば、シンクコネクターに更新を適用する際に発生する可能性のあるダウンタイムについて考えてみましょう。

  • 実稼働でレプリケートされます。
  • 単一パーティション。

    単一パーティションルールを緩和できますが、アプリケーションはデータベースの異なる行に対して順不同のイベントを処理する必要があります。単一行のイベントは、引き続き完全に順序付けされます。複数のパーティションを使用する場合、Kafka がキーをハッシュ化してパーティションを決定するのがデフォルトの動作になります。その他のパーティションストラテジーでは、単一メッセージ変換 (SMT: Single Message Transformations) を使用して、各レコードにパーティション番号を設定する必要があります。

2.2. Debezium コネクター設定の計画

Debezium コネクターをデプロイする前に、コネクターの設定方法を決定してください。設定は、コネクターの動作を指定する情報を提供し、Debezium がソースデータベースに接続することを可能にします。

コネクター設定を JSON として指定し、コネクターを登録する準備ができたら、curl を使用して設定を Kafka Connect API エンドポイントに送信します。

前提条件

  • ソースデータベースがデプロイされ、Debezium コネクターがデータベースにアクセスできます。
  • コネクターがソースデータベースにアクセスするために必要な以下の情報を把握しています。

    • データベースホストの名前または IP アドレス。
    • データベースに接続するためのポート番号。
    • コネクターがデータベースへのサインインに使用できるアカウントの名前。
    • データベースユーザーアカウントのパスワード。
    • データベースの名前。
    • コネクターの情報の取得元であるテーブルの名前。
    • コネクターの変更イベントの出力先である Kafka ブローカーの名前。
    • コネクターのデータベース履歴情報の送信先である Kafka トピックの名前。

手順

  • Debezium コネクターに適用する設定を JSON 形式で指定します。

    次の例は、Debezium MySQL コネクターの簡単な設定を示しています。

    {
      "name": "inventory-connector",  1
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 2
        "tasks.max": "1",  3
        "database.hostname": "mysql",  4
        "database.port": "3306", 5
        "database.user": "debezium", 6
        "database.password": "dbz", 7
        "database.server.id": "184054",  8
        "database.server.name": "dbserver1",
        "database.include.list": "public.inventory",  9
        "database.history.kafka.bootstrap.servers": "kafka:9092",  10
        "database.history.kafka.topic": "dbhistory.inventory"  11
      }
    }
    1
    Kafka Connect クラスターに登録するコネクターの名前。
    2
    コネクタークラスの名前。
    3
    同時に動作できるタスクの数。一度に操作できるタスクは 1 つだけです。
    4
    ホストデータベースインスタンスのホスト名または IP アドレス。
    5
    データベースインスタンスのポート番号。
    6
    Debezium がデータベースに接続するユーザーアカウントの名前。
    7
    データベースユーザーアカウントのパスワード
    8
    一意なサーバー ID および名前。サーバー名は、Debezium がアクセスできるデータベースサーバーまたはサーバーのクラスターの論理識別子です。Debezium は、作成する各 Kafka トピックにこの名前を接頭辞として付けます。
    9
    コネクターが変更イベントをキャプチャーするテーブルの一覧。
    10
    コネクターがデータベーススキーマ履歴を送信する Kafka ブローカーの名前。指定されたブローカーは、コネクターが出力する変更イベントも受け取ります。
    11
    スキーマ履歴を格納する Kafka トピックの名前。
    コネクターの再起動後、コネクターは停止した時点からデータベースログの読み取りを再開し、オフライン中に発生したトランザクションのイベントを出力します。コネクターは、未読トランザクションの変更イベントを Kafka に書き込む前に、スキーマ履歴をチェックしてから、元のトランザクションが発生したときに有効だったスキーマを適用します。

関連情報

  • コネクターのタイプごとに設定できる設定プロパティーの詳細は、Debezium User Guide のコネクターのデプロイメントドキュメントを参照してください。

2.3. Red Hat Enterprise Linux での AMQ Streams を使用した Debezium のデプロイ

この手順では、Red Hat Enterprise Linux で Debezium のコネクターを設定する方法を説明します。コネクターは、Apache Kafka Connect を使用して AMQ Streams クラスターにデプロイされます。Kafka Connect は Apache Kafka と外部システムとの間でデータをストリーミングするためのフレームワークです。Kafka Connect は、スタンドアロンモードではなく分散モードで実行する必要があります。

前提条件

  • Debezium のデプロイ先であるホスト環境は、サポートされている設定 で Red Hat Enterprise Linux、AMQ Streams、および Java を実行します。

    • AMQ Streams のインストール方法の詳細については、Installing AMQ Streams を参照してください。
    • 単一の ZooKeeper ノードおよび単一の Kafka ノードが含まれる実稼働ではない、基本的な AMQ Streams クラスターをインストールする方法の詳細は、Running a single node AMQ Streams cluster を参照してください。
注記

以前のバージョンの AMQ Streams を実行している場合は、最初に AMQ Streams 2.2 にアップグレードする必要があります。アップグレードプロセスの詳細については、AMQ Streams and Kafka upgrades を参照してください。

  • ホストの管理者権限 (sudo アクセス) があります。
  • Apache ZooKeeper と Apache Kafka ブローカーが実行されています。
  • Kafka Connect は、スタンドアロンモードではなく、分散モード で実行されています。
  • AMQ Streams のインストール時に作成された kafka ユーザーの認証情報を把握しています。
  • ソースデータベースがデプロイされ、Debezium をデプロイするホストがデータベースにアクセスできます。
  • コネクターの設定方法 を理解しています。

手順

  1. 使用する Debezium コネクターを Red Hat Integration ダウンロードサイト からダウンロードします。たとえば、Debezium 1.9.7 MySQL Connector をダウンロードして、Dabezium を MySQL データベースと使用します。
  2. AMQ Streams をデプロイした Red Hat Enterprise Linux ホストで、ターミナルウィンドウを開き、/opt/kafkaconnector-plugins ディレクトリーを作成します (まだ存在しない場合)。

    $ sudo mkdir /opt/kafka/connector-plugins
  3. 次のコマンドを入力して、/opt/kafka/connector-plugins ディレクトリーにダウンロードした Debezium コネクターアーカイブの内容を抽出します。

    $ sudo unzip debezium-connector-mysql-1.9.7.Final.zip -d /opt/kafka/connector-plugins
  4. インストールするコネクターごとに、手順 1 - 3 を繰り返します。
  5. ターミナルウィンドウから、kafka ユーザーとしてサインインします。

    $ su - kafka
    $ Password:
  6. Kafka Connect プロセスが実行中の場合は、停止します。

    1. 次のコマンドを入力して、Kafka Connect が分散モードで実行されているかどうかを確認します。

      $ jcmd | grep ConnectDistributed

      プロセスが実行中の場合、コマンドはプロセス ID を返します。次に例を示します。

      18514 org.apache.kafka.connect.cli.ConnectDistributed /opt/kafka/config/connect-distributed.properties
    2. プロセス ID を指定して kill コマンドを入力し、プロセスを停止します。次に例を示します。

      $ kill 18514
  7. /opt/kafka/config/ にある connect-distributed.properties ファイルを編集し、plugin.path の値を Debezium コネクタープラグインの親ディレクトリーの場所に設定します。

    plugin.path=/opt/kafka/connector-plugins
  8. 分散モードで Kafka Connect を起動します。

    $ /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
  9. Kafka Connect の実行後、Kafka Connect API を使用してコネクターを登録します。
    curl コマンドを入力して、「Debezium コネクター設定の計画」 で指定したコネクター設定 JSON を localhost:8083/connectors の Kafka Connect REST API エンドポイントに送信する POST リクエストを送信します。
    以下に例を示します。

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ \
    -d '{"name": "inventory-connector", "config": \
    { "connector.class": "io.debezium.connector.mysql.MySqlConnector", \
    "tasks.max": "1", \
    "database.hostname": "mysql", \
    "database.port": "3306", \
    "database.user": "debezium", \
    "database.password": "dbz", \
    "database.server.id": "184054", \
    "database.server.name": "dbserver1", \
    "database.include.list": "public.inventory", \
    "database.history.kafka.bootstrap.servers": "kafka:9092", \
    "database.history.kafka.topic": "dbhistory.inventory" } }'

    複数のコネクターを登録するには、コネクターごとに個別のリクエストを送信します。

  10. Kafka Connect を再起動して、変更を実装します。

    Kafka Connect が起動すると、connector-plugins ディレクトリーから、設定済みの Debezium コネクターをロードします。

    設定が完了すると、デプロイされたコネクターはソースデータベースに接続し、挿入、更新、または削除された行またはドキュメントごとにイベントを生成します。

  11. 各 Kafka Connect ワーカーノードに対して、ステップ 5 - 10 を繰り返します。

2.4. デプロイメントの確認

コネクターが起動すると、設定されたデータベースのスナップショットが実行され、指定したテーブルごとにトピックが作成されます。

前提条件

  • 「Red Hat Enterprise Linux での AMQ Streams を使用した Debezium のデプロイ」 の手順に基づいて、Red Hat Enterprise Linux にコネクターをデプロイしています。手順

    1. ホストのターミナルウィンドウで、次のコマンドを入力して、Kafka Connect API からコネクターのリストを要求します。

      $ curl -H "Accept:application/json" localhost:8083/connectors/

      クエリーは、デプロイされたコネクターの名前を返します。以下に例を示します。

      ["inventory-connector"]
    2. ホストのターミナルウィンドウで次のコマンドを入力して、コネクターが実行しているタスクを表示します。

      $ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

      コマンドは、以下の例のような出力を返します。

      HTTP/1.1 200 OK
      Date: Thu, 06 Feb 2020 22:12:03 GMT
      Content-Type: application/json
      Content-Length: 531
      Server: Jetty(9.4.20.v20190813)
      
      {
        "name": "inventory-connector",
        ...
        "tasks": [
          {
            "connector": "inventory-connector",
            "task": 0
          }
        ]
      }
    3. Kafka クラスター内のトピックのリストを表示します。
      ターミナルウィンドウから /opt/kafka/bin/ に移動し、以下のシェルスクリプトを実行します。

      kafka-topics.sh --bootstrap-server=localhost:9092 --list

      Kafka ブローカーは、コネクターが作成するトピックのリストを返します。使用可能なトピックは、コネクターの snapshot.modesnapshot.include.collection.list、および table.include.list の設定プロパティーの設定によって異なります。デフォルトでは、コネクターはデータベース内の非システムテーブルごとにトピックを作成します。

    4. トピックの内容を表示します。
      ターミナルウィンドウから /opt/kafka/bin/ に移動し、kafka-console-consumer.sh シェルスクリプトを実行して、前のコマンドで返されたトピックの 1 つの内容を表示します。

      以下に例を示します。

      kafka-console-consumer.sh \
      >     --bootstrap-server localhost:9092 \
      >     --from-beginning \
      >     --property print.key=true \
      >     --topic=inventory_connector_mysql.inventory.products_on_hand

      トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。

      例2.1 統合変更イベントの内容

      {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Key"},"payload":{"product_id":101}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.9.7.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}

      上記の例では、payload 値は、コネクタースナップショットがテーブル inventory.products_on_hand から 読み込み (op" ="r") イベントを生成したことを示しています。product_id レコードの before 状態は null であり、レコードに以前の値が存在しないことを示します。"after" 状態が product_id 101 で項目の quantity3 で示しています。

次のステップ

各コネクターで使用できる設定の詳細、および変更データのキャプチャーを有効にするためにソースデータベースを設定する方法については、Debezium ユーザーガイド を参照してください。

2.5. Kafka Connect クラスターの Debezium コネクタープラグインの更新

Red Hat Enterprise Linux にデプロイされているバージョンの Debezium コネクターを置き換えるには、コネクタープラグインを更新します。

手順

  1. 置き換える Debezium コネクタープラグインのコピーを Red Hat Integration ダウンロードサイト からダウンロードします。
  2. Debezium コネクターアーカイブの内容を /opt/kafka/connector-plugins ディレクトリーに展開します。

    $ sudo unzip debezium-connector-mysql-1.9.7.Final.zip -d /opt/kafka/connector-plugins
  3. Kafka Connect を再起動します。

付録A サブスクリプションの使用

AMQ Streams は、ソフトウェアサブスクリプションによって提供されます。サブスクリプションを管理するには、Red Hat カスタマーポータルでアカウントにアクセスします。

アカウントへのアクセス

  1. access.redhat.com に移動します。
  2. アカウントがない場合は、作成します。
  3. アカウントにログインします。

サブスクリプションのアクティベート

  1. access.redhat.com に移動します。
  2. サブスクリプション に移動します。
  3. Activate a subscription に移動し、16 桁のアクティベーション番号を入力します。

zip および tar ファイルのダウンロード

zip または tar ファイルにアクセスするには、カスタマーポータルを使用して、ダウンロードする関連ファイルを検索します。RPM パッケージを使用している場合は、この手順は必要ありません。

  1. ブラウザーを開き、access.redhat.com/downloads で Red Hat カスタマーポータルの Product Downloads ページにログインします。
  2. INTEGRATION AND AUTOMATION まで下方向にスクロールします。
  3. Red Hat Integration をクリックして、Red Hat Integration ダウンロードページを表示します。
  4. コンポーネントの ダウンロード リンクをクリックします。

改訂日時: 2022-11-27 05:23:15 +1000

Red Hat logoGithubRedditYoutube

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.