4.5.3. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium MongoDB コネクターのデプロイ

Debezium MongoDB コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、2 つのカスタムリソース (CR) を作成します。

  • Kafka Connect インスタンスを定義する KafkaConnect CR。image は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる operator およびイメージを提供します。
  • Debezium MongoDB コネクターを定義する KafkaConnector CR。この CR を KafkaConnect CR を適用するのと同じ OpenShift インスタンスに適用します。

前提条件

  • MongoDB が稼働し、MongoDB を設定して Debezium コネクターと連携する 手順が完了済みである必要があります。
  • AMQ Streams は OpenShift にデプロイされ、Apache Kafka および Kafka Connect が稼働している必要があります。詳細は、Deploying and Upgrading AMQ Streams on OpenShift を参照してください。
  • Podman または Docker がインストールされている。
  • Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (quay.iodocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。

手順

  1. Kafka Connect の Debezium MongoDB コンテナーを作成します。

    1. Debezium MongoDB コネクターアーカイブをダウンロードします。
    2. Debezium MongoDB コネクターアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。

      ./my-plugins/
      ├── debezium-connector-mongodb
      │   ├── ...
    3. registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0 をベースイメージとして使用して、新規の Dockerfile を作成します。たとえば、ターミナルウィンドウから以下のコマンドを入力します。my-plugins はプラグインディレクトリーの名前に置き換えます。

      cat <<EOF >debezium-container-for-mongodb.yaml 1
      FROM registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0
      USER root:root
      COPY ./<my-plugins>/ /opt/kafka/plugins/ 2
      USER 1001
      EOF
      1 1 1 1 1 1 1 1 1
      任意のファイル名を指定できます。
      2 2 2 2 2 2 2 2 2
      my-plugins は、プラグインディレクトリーの名前に置き換えます。

      このコマンドは、現在のディレクトリーに debezium-container-for-mongodb.yaml という名前の Dockerfile を作成します。

    4. 前のステップで作成した debezium-container-for-mongodb.yaml Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。

      podman build -t debezium-container-for-mongodb:latest .
      docker build -t debezium-container-for-mongodb:latest .

      上記のコマンドは、debezium-container-for-mongodb という名前のコンテナーイメージを構築します。

    5. カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。

      podman push <myregistry.io>/debezium-container-for-mongodb:latest
      docker push <myregistry.io>/debezium-container-for-mongodb:latest
    6. 新しい Debezium MongoDB KafkaConnect カスタムリソース (CR) を作成します。たとえば、以下の例のように annotations および image プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        #...
        image: debezium-container-for-mongodb  2
      1
      KafkaConnector リソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotations は Cluster Operator に示します。
      2
      spec.image は Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator の STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数がオーバーライドされます。
    7. 以下のコマンドを入力して、KafkaConnect CR を OpenShift Kafka Connect 環境に適用します。

      oc create -f dbz-connect.yaml

      このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。

  2. Debezium PostgreSQL コネクターインスタンスを設定する KafkaConnector カスタムリソースを作成します。

    通常、コネクターに使用できる設定プロパティーを使用して、.yaml ファイルに Debezium MongoDB コネクターを設定します。コネクター設定で、Debezium に指示を出して MongoDB レプリカセットまたはシャードクラスターのサブセットの変更イベントを生成する場合があります。任意で、不必要なコレクションを除外するプロパティーを設定できます。

    以下の例では、MongoDB レプリカセット rs0192.168.99.100 のポート 27017 に接続し、インベントリー コレクションで発生した変更をキャプチャーする Debezium コネクターを設定します。fullfillment はレプリカセットの論理名です。

    MongoDB inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector 1
        labels: strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mongodb.MongoDbConnector 2
        config:
         mongodb.hosts: rs0/192.168.99.100:27017 3
         mongodb.name: fulfillment 4
         collection.include.list: inventory[.]* 5

    1
    コネクターを Kafka Connect に登録するために使用される名前。
    2
    MongoDB コネクタークラスの名前。
    3
    MongoDB レプリカセットへの接続に使用するホストアドレス。
    4
    生成されたイベントの namespace を形成する MongoDB レプリカセットの論理名。コネクターが書き込む Kafka トピックの名前、Kafka Connect スキーマ名、および Arvo コンバーターが使用される場合に対応する Avro スキーマの namespace のすべてに使用されます。
    5
    監視するすべてのコレクションのコレクション namespace (例: <dbName>.<collectionName>) と一致する正規表現の任意リスト。
  3. Kafka Connect でコネクターインスタンスを作成します。たとえば、KafkaConnector リソースを inventory-connector.yaml ファイルに保存した場合は、以下のコマンドを実行します。

    oc apply -f inventory-connector.yaml

    上記のコマンドは inventory-connector を登録し、コネクターは KafkaConnector CR に定義されている inventory コレクションに対して実行を開始します。

Debezium MongoDB コネクターに設定できる設定プロパティーの完全リストは、MongoDB コネクター設定プロパティーを参照してください。

結果

コネクターが起動したら、以下のアクションを完了します。

  • MongoDB レプリカセットでコレクションの スナップショット 一貫性をもたせて実行する。
  • レプリカセットの oplogs を読み取る。
  • 挿入、更新、削除されたすべてのドキュメントの変更イベントを生成する。
  • Kafka トピックに変更イベントレコードをストリーミングする。