8.3.3. Debezium コンテナーの使用

お使いの環境では、提供される Debezium コンテナーを使用して、Avro serializaion を使用する Debezium コネクターをデプロイすることができます。ここの手順に従ってこれを行います。この手順では、Debezium のカスタム Kafka Connect コンテナーイメージをビルドし、Debezium コネクターが Avro コンバーターを使用するように設定します。

前提条件

  • OpenShift クラスターへのクラスター管理者アクセスがある。
  • Avro のシリアライズでデプロイする Debezium コネクタープラグインをダウンロードしている。

手順

  1. Service Registry のインスタンスをデプロイします。以下の手順は、『Getting Started with Service Registry』の「 Installing Service Registry from the OpenShift OperatorHub 」を参照してください。

    • AMQ Streams のインストール
    • AMQ Streams ストレージの設定
    • Service Registry のインストール
  2. Debezium コネクターアーカイブを展開し、コネクタープラグインのディレクトリー構造を作成します。各 Debezium コネクターのアーカイブをダウンロードして抽出した場合、構造は以下のようになります。

    tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    |   ├── ...
    ├── debezium-connector-mysql
    │   ├── ...
    ├── debezium-connector-postgres
    │   ├── ...
    └── debezium-connector-sqlserver
        ├── ...
  3. Avro コンバーターを、Avro のシリアライズを使用するように設定する Debezium コネクターが含まれるディレクトリーに追加します。

    1. Red Hat Integration のダウンロードサイト に移動し、Service Registry Kafka Connect zip ファイルをダウンロードします。
    2. アーカイブを希望の Debezium コネクターディレクトリーに展開します。

    Avro のシリアライズを使用するように複数のタイプの Debezium コネクターを設定するには、関連するコネクタータイプのディレクトリーでアーカイブを展開します。これによりファイルが複製されますが、競合する依存関係の可能性がなくなります。

  4. Avro コンバーターを使用するように設定された Debezium コネクターを実行するためにカスタムイメージを作成し、パブリッシュします。

    1. をベースイメージ registry.redhat.io/amq7/amq-streams-kafka-25:1.5.0 として使用 Dockerfile して、新規のを作成します。以下の例では、my-plugins をプラグインディレクトリーの名前に置き換えます。

      FROM registry.redhat.io/amq7/amq-streams-kafka-25:1.5.0
      USER root:root
      COPY ./my-plugins/ /opt/kafka/plugins/
      USER 1001

      Kafka Connect がコネクターの実行を開始する前に、Kafka Connect は /opt/kafka/plugins ディレクトリーにあるサードパーティープラグインをロードします。

    2. Docker コンテナーイメージをビルドします。たとえば、で作成した docker ファイルをとして保存した場合 debezium-container-with-avro、以下のコマンドを実行します。

      docker build -t debezium-container-with-avro:latest

    3. カスタムイメージをコンテナーレジストリーにプッシュします。以下に例を示します。

      docker push debezium-container-with-avro:latest

    4. 新しいコンテナーイメージを示します。次のいずれかを行います。

      • KafkaConnect カスタムリソースの KafkaConnect.spec.image プロパティーを編集します。設定された場合、このプロパティーによって Cluster Operator の STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数がオーバーライドされます。以下に例を示します。

        apiVersion: kafka.strimzi.io/v1beta1
        kind: KafkaConnect
        metadata:
          name: my-connect-cluster
        spec:
          #...
          image: debezium-container-with-avro
      • install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml ファイルで、STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数を編集して新しいコンテナーイメージを参照し、Cluster Operator を再インストールします。このファイルを編集する場合は、これを OpenShift クラスターに適用する必要があります。
  5. Avro コンバーターを使用するように設定された各 Debezium コネクターをデプロイします。Debezium コネクターごとに以下を行います。

    1. Debezium コネクターインスタンスを作成します。以下の inventory-connector.yaml ファイル例では、Avro コンバーターを使用するように設定された MySQL コネクターインスタンスを定義する KafkaConnector カスタムリソースを作成します。

      apiVersion: kafka.strimzi.io/v1beta1
      kind: KafkaConnector
      metadata:
        name: inventory-connector
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mysql.MySqlConnector
        tasksMax: 1
        config:
          database.hostname: mysql
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054
          database.server.name: dbserver1
          database.whitelist: inventory
          database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
          database.history.kafka.topic: schema-changes.inventory
          key.converter: io.apicurio.registry.utils.converter.AvroConverter
          key.converter.apicurio.registry.url: http://apicurio:8080/api
          key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
          value.converter: io.apicurio.registry.utils.converter.AvroConverter
          value.converter.apicurio.registry.url: http://apicurio:8080/api
          value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
    2. コネクターインスタンスを適用します。例を以下に示します。

      oc apply -f inventory-connector.yaml

      このレジスターは inventory データベースに対して実行 inventory-connector されるようにします。

  6. コネクターが作成され、指定されたデータベースの変更を追跡していることを確認します。たとえば、を inventory-connector 起動した場合のように Kafka Connect ログ出力を監視して、コネクターインスタンスを確認できます。

    1. Kafka Connect ログ出力を表示します。

      oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
    2. ログ出力を確認し、初期スナップショットが実行されていることを確認します。以下の行のように表示されるはずです。

      ...
      2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      スナップショットには、複数のステップが必要です。

      ...
      2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL master (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,843 INFO 	 using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...
      2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      スナップショットが完了すると、Debezium は変更イベントに関する inventory データベースの変更などの変更の追跡 binlog を開始します。

      ...
      2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
      Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
      INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
      2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
      ...