5.5.2. AMQ Streams を使用した Debezium MySQL コネクターのデプロイ

以前のバージョンの AMQ Streams では、OpenShift に Debezium コネクターをデプロイするには、最初にコネクター用の Kafka Connect イメージをビルドする必要がありました。コネクターを OpenShift にデプロイするのに現在推奨される方法は、AMQ Streams でビルド設定を使用して、使用する Debezium コネクタープラグインが含まれる Kafka Connect コンテナーイメージを自動的にビルドすることです。

ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect Pod のデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドしたら、KafkaConnector カスタムリソースを作成し、ビルドに含まれるコネクターを起動します。

前提条件

  • クラスター Operator がインストールされている OpenShift クラスターにアクセスできる必要があります。
  • AMQ Streams Operator が稼働している必要があります。
  • Kafka クラスターは、Apache Open Shift での AMQ ストリームのデプロイとアップグレードに記載されているようにデプロイされます。
  • Red Hat Integration ライセンスがある。
  • Kafka Connect is deployed on AMQ Streams
  • OpenShift oc CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。
  • Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションが必要であるか、ImageStream リソースを作成する必要があります。

    ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存するには、以下を実行します。
    • レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション。
    ビルドイメージをネイティブ OpenShift ImageStream として保存します。
    • ImageStream リソースがクラスターにデプロイされている。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams はデフォルトでは利用できません。

手順

  1. OpenShift クラスターにログインします。
  2. コネクターの Debezium KafkaConnect カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、以下の例のように metadata.annotations および spec.build プロパティーを指定する KafkaConnect CR を作成します。dbz-connect.yaml などの名前でファイルを保存します。

    例5.1 Debezium コネクターを含む KafkaConnect カスタムリソースを定義する dbz-connect.yaml ファイル

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.00
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-mysql
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/1.7.2.Final-redhat-<build_number>/debezium-connector-mysql-1.7.2.Final-redhat-<build_number>-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.0-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.0-redhat-<build-number>.zip
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/1.7.2.Final/debezium-scripting-1.7.2.Final.zip
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093

    表5.22 Kafka Connect 設定の説明

    項目説明

    1

    strimzi.io/use-connector-resources アノテーションを true に設定して、クラスターオペレーターが KafkaConnector リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。

    2

    spec.build 設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所と共にイメージに追加するプラグインを一覧表示します。

    3

    build.output は、新たにビルドされたイメージが保存されるレジストリーを指定します。

    4

    イメージ出力の名前およびイメージ名を指定します。output.type の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合は docker、内部の OpenShift ImageStream にイメージをプッシュする場合は imagestream です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定で build.output の指定に関する詳細は、AMQ Streams Build スキーマ参照 のドキュメントを参照 してください。

    5

    plugins 設定は、Kafka Connect イメージに追加するすべてのコネクターを一覧表示します。一覧の各エントリーについて、プラグイン name と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。任意で、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。

    6

    artifacts.type の値は、artifacts.url で指定したアーティファクトのファイルタイプを指定します。有効なタイプは ziptgz、または jar です。Debezium コネクターアーカイブは、.zip ファイル形式で提供されます。JDBC ドライバーファイルは .jar 形式です。type の値は、url フィールドで参照されるファイルのタイプと一致する必要があります。

    7

    artifacts.url の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターは指定されたサーバーにアクセスできる必要があります。

  3. 以下のコマンドを入力して、KafkaConnect ビルド仕様を OpenShift クラスターに適用します。

    oc create -f dbz-connect.yaml

    Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
    ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定に一覧表示されているコネクターアーティファクトはクラスターで利用できます。

  4. KafkaConnector リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
    たとえば、以下の KafkaConnector CR を作成し、mysql-inventory-connector.yaml として保存します。

    例5.2 Debezium コネクターの KafkaConnector カスタムリソースを定義する mysql-inventory-connector.yaml ファイル

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-mysql 1
    spec:
      class: io.debezium.connector.mysql.MySqlConnector 2
      tasksMax: 1  3
      config:  4
        database.history.kafka.bootstrap.servers: 'debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092'
        database.history.kafka.topic: schema-changes.inventory
        database.hostname: mysql.debezium-mysql.svc.cluster.local 5
        database.port: 3306   6
        database.user: debezium  7
        database.password: dbz  8
        database.dbname: mydatabase 9
        database.server.name: inventory_connector_mysql 10
        database.include.list: public.inventory  11

    表5.23 コネクター設定の説明

    項目説明

    1

    Kafka Connect クラスターに登録するコネクターの名前。

    2

    コネクタークラスの名前。

    3

    同時に動作できるタスクの数。

    4

    コネクターの設定。

    5

    ホストデータベースインスタンスのアドレス。

    6

    データベースインスタンスのポート番号。

    7

    Debezium がデータベースに接続するユーザーアカウントの名前。

    8

    データベースユーザーアカウントのパスワード

    9

    変更をキャプチャーするデータベースの名前。

    10

    データベースインスタンスまたはクラスターの論理名。
    指定の名前は英数字またはアンダースコアからのみ形成する必要があります。
    論理名は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
    コネクターを Avro コネクターと統合する場合、名前空間は関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの名前空間でも使用されます。

    11

    コネクターが変更イベントをキャプチャーするテーブルの一覧。

  5. 以下のコマンドを実行してコネクターリソースを作成します。

    oc create -n <namespace> -f <kafkaConnector>.yaml

    以下に例を示します。

    oc create -n debezium -f {context}-inventory-connector.yaml

    コネクターは Kafka Connect クラスターに登録され、KafkaConnector CR の spec.config.database.dbname で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。

これで、Debezium MySQL のデプロイメントを確認 する準備が整いました。