8.3.3. Debezium コンテナーの使用
お使いの環境では、提供される Debezium コンテナーを使用して、Avro serializaion を使用する Debezium コネクターをデプロイすることができます。ここの手順に従ってこれを行います。この手順では、Debezium のカスタム Kafka Connect コンテナーイメージをビルドし、Debezium コネクターが Avro コンバーターを使用するように設定します。
前提条件
- OpenShift クラスターへのクラスター管理者アクセスがある。
- Avro のシリアライズでデプロイする Debezium コネクタープラグインをダウンロードしている。
手順
Service Registry のインスタンスをデプロイします。以下の手順は、『Getting Started with Service Registry』の「 Installing Service Registry from the OpenShift OperatorHub 」を参照してください。
- AMQ Streams のインストール
- AMQ Streams ストレージの設定
- Service Registry のインストール
Debezium コネクターアーカイブを展開し、コネクタープラグインのディレクトリー構造を作成します。各 Debezium コネクターのアーカイブをダウンロードして抽出した場合、構造は以下のようになります。
tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb | ├── ... ├── debezium-connector-mysql │ ├── ... ├── debezium-connector-postgres │ ├── ... └── debezium-connector-sqlserver ├── ...
Avro コンバーターを、Avro のシリアライズを使用するように設定する Debezium コネクターが含まれるディレクトリーに追加します。
- Red Hat Integration のダウンロードサイト に移動し、Service Registry Kafka Connect zip ファイルをダウンロードします。
- アーカイブを希望の Debezium コネクターディレクトリーに展開します。
Avro のシリアライズを使用するように複数のタイプの Debezium コネクターを設定するには、関連するコネクタータイプのディレクトリーでアーカイブを展開します。これによりファイルが複製されますが、競合する依存関係の可能性がなくなります。
Avro コンバーターを使用するように設定された Debezium コネクターを実行するためにカスタムイメージを作成し、パブリッシュします。
をベースイメージ
registry.redhat.io/amq7/amq-streams-kafka-25:1.5.0
として使用Dockerfile
して、新規のを作成します。以下の例では、my-plugins をプラグインディレクトリーの名前に置き換えます。FROM registry.redhat.io/amq7/amq-streams-kafka-25:1.5.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
Kafka Connect がコネクターの実行を開始する前に、Kafka Connect は
/opt/kafka/plugins
ディレクトリーにあるサードパーティープラグインをロードします。Docker コンテナーイメージをビルドします。たとえば、で作成した docker ファイルをとして保存した場合
debezium-container-with-avro
、以下のコマンドを実行します。docker build -t debezium-container-with-avro:latest
カスタムイメージをコンテナーレジストリーにプッシュします。以下に例を示します。
docker push debezium-container-with-avro:latest
新しいコンテナーイメージを示します。次のいずれかを行います。
KafkaConnect
カスタムリソースのKafkaConnect.spec.image
プロパティーを編集します。設定された場合、このプロパティーによって Cluster Operator のSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
変数がオーバーライドされます。以下に例を示します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... image: debezium-container-with-avro
-
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
ファイルで、STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
変数を編集して新しいコンテナーイメージを参照し、Cluster Operator を再インストールします。このファイルを編集する場合は、これを OpenShift クラスターに適用する必要があります。
Avro コンバーターを使用するように設定された各 Debezium コネクターをデプロイします。Debezium コネクターごとに以下を行います。
Debezium コネクターインスタンスを作成します。以下の
inventory-connector.yaml
ファイル例では、Avro コンバーターを使用するように設定された MySQL コネクターインスタンスを定義するKafkaConnector
カスタムリソースを作成します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: database.hostname: mysql database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 database.server.name: dbserver1 database.whitelist: inventory database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 database.history.kafka.topic: schema-changes.inventory key.converter: io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url: http://apicurio:8080/api key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url: http://apicurio:8080/api value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
コネクターインスタンスを適用します。例を以下に示します。
oc apply -f inventory-connector.yaml
このレジスターは
inventory
データベースに対して実行inventory-connector
されるようにします。
コネクターが作成され、指定されたデータベースの変更を追跡していることを確認します。たとえば、を
inventory-connector
起動した場合のように Kafka Connect ログ出力を監視して、コネクターインスタンスを確認できます。Kafka Connect ログ出力を表示します。
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
ログ出力を確認し、初期スナップショットが実行されていることを確認します。以下の行のように表示されるはずです。
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
スナップショットには、複数のステップが必要です。
... 2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL master (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ... 2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
スナップショットが完了すると、Debezium は変更イベントに関する
inventory
データベースの変更などの変更の追跡binlog
を開始します。... 2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306] Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5) 2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306] ...