11.2.3. Debezium コンテナーで Avro を使用するコネクターのデプロイ

ご使用の環境で、提供された Debezium コンテナーを使用して、Avro シリアライゼーションを使用する Debezium コネクターをデプロイしなければならない場合があります。Debezium 用のカスタム Kafka Connect コンテナーイメージをビルドし、Avro コンバーターを使用するように Debezium コネクターを設定するには、以下の手順を完了します。

前提条件

  • コンテナーを作成および管理するのに十分な権限と共に Docker をインストールしている。
  • Avro シリアライゼーションと共にデプロイする Debezium コネクタープラグインをダウンロードしている。

手順

  1. Service Registry のインスタンスをデプロイします。以下を実行する方法については、『Getting Started with Service Registry』を参照してください。

    • Service Registry のインストール
    • AMQ Streams のインストール
    • AMQ Streams ストレージのセットアップ
  2. Debezium コネクターのアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。複数の Debezium コネクターのアーカイブをダウンロードして展開した場合、作成されるディレクトリー構造は以下の例のようになります。

    tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    |   ├── ...
    ├── debezium-connector-mysql
    │   ├── ...
    ├── debezium-connector-postgres
    │   ├── ...
    └── debezium-connector-sqlserver
        ├── ...
  3. Avro シリアライゼーションを使用するように設定する Debezium コネクターが含まれるディレクトリーに Avro コンバーターを追加します。

    1. Red Hat Integration のダウンロードサイト に移動し、Service Registry Kafka Connect の zip ファイルをダウンロードします。
    2. 目的の Debezium コネクターディレクトリーにアーカイブを展開します。

    複数のタイプの Debezium コネクターを Avro シリアライゼーションを使用するように設定するには、該当するそれぞれのコネクタータイプのディレクトリーにアーカイブを展開します。それぞれのディレクトリーにアーカイブを抽出するとファイルが重複しますが、これにより依存関係の競合が生じる可能性がなくなります。

  4. Avro コンバーターを使用するように設定する Debezium コネクターを実行するためのカスタムイメージを作成して公開します。

    1. registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.0 をベースイメージとして使用して、新しい Dockerfile を作成します。以下の例の my-plugins を、実際のプラグインディレクトリーの名前に置き換えてください。

      FROM registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.0
      USER root:root
      COPY ./my-plugins/ /opt/kafka/plugins/
      USER 1001

      Kafka Connect は、コネクターの実行を開始する前に、/opt/kafka/plugins ディレクトリーにあるサードパーティープラグインをロードします。

    2. docker コンテナーイメージをビルドします。たとえば、前のステップで作成した docker ファイルを debezium-container-with-avro として保存した場合は、以下のコマンドを実行します。

      docker build -t debezium-container-with-avro:latest

    3. カスタムイメージをコンテナーレジストリーにプッシュします。例を以下に示します。

      docker push <myregistry.io>/debezium-container-with-avro:latest

    4. 新しいコンテナーイメージを示します。次のいずれかを行います。

      • KafkaConnect カスタムリソースの KafkaConnect.spec.image プロパティーを編集します。このプロパティーが設定されている場合、このプロパティーは Cluster Operator の STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数を上書きします。その例を以下に示します。

        apiVersion: kafka.strimzi.io/v1beta2
        kind: KafkaConnect
        metadata:
          name: my-connect-cluster
        spec:
          #...
          image: debezium-container-with-avro
      • install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml ファイルで STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数を編集して新しいコンテナーイメージを示すように、Cluster Operator を再インストールします。このファイルを編集する場合は、これを OpenShift クラスターに適用する必要があります。
  5. Avro コンバーターを使用するように設定されたそれぞれの Debezium コネクターをデプロイします。それぞれの Debezium コネクターについて、以下の設定を行います。

    1. Debezium コネクターインスタンスを作成します。以下の inventory-connector.yaml ファイルの例では、Avro コンバーターを使用するように設定された MySQL コネクターインスタンスを定義する KafkaConnector カスタムリソースを作成します。

      apiVersion: kafka.strimzi.io/kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mysql.MySqlConnector
        tasksMax: 1
        config:
          database.hostname: mysql
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054
          database.server.name: dbserver1
          database.include.list: inventory
          database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
          database.history.kafka.topic: schema-changes.inventory
          key.converter: io.apicurio.registry.utils.converter.AvroConverter
          key.converter.apicurio.registry.url: http://apicurio:8080/api
          key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
          value.converter: io.apicurio.registry.utils.converter.AvroConverter
          value.converter.apicurio.registry.url: http://apicurio:8080/api
          value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
    2. コネクターインスタンスを適用します。以下に例を示します。

      oc apply -f inventory-connector.yaml

      これにより inventory-connector が登録され、コネクターが インベントリー データベースに対して実行が開始されます。

  6. コネクターが作成され、指定されたデータベース内の変更の追跡を開始したことを確認します。コネクターインスタンスは、Kafka Connect のログ出力(例: inventory-connector の開始)を監視して確認できます。

    1. Kafka Connect のログ出力を表示します。

      oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
    2. ログの出力を確認し、初回のスナップショットが実行されたことを確認します。以下のような行が表示されるはずです。

      ...
      2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      スナップショットは、複数のステップを経て作成されます。

      ...
      2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,843 INFO 	 using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...
      2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      スナップショットの完了後、Debezium は変更イベントの インベントリー データベースの binlog の変更の追跡を開始します。

      ...
      2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
      Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
      INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
      2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
      ...