6.5.2. Debezium Oracle コネクターのデプロイ

Debezium Oracle コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、続いてこのコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。その後、以下のカスタムリソース (CR) を作成する必要があります。

  • Kafka Connect インスタンスを定義する KafkaConnect CR。CR の image プロパティーは、Debezium コネクターを実行するために作成するコンテナーイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる Operator およびイメージを提供します。
  • Debezium Oracle コネクターを定義する KafkaConnector CR。この CR を KafkaConnect CR を適用するのと同じ OpenShift インスタンスに適用します。

    .Prerequisites
  • Oracle Database が稼働し、Oracle を設定して Debezium コネクターと連携する 手順が完了済みである必要があります。
  • AMQ Streams が OpenShift にデプロイされ、Apache Kafka および Kafka Connect を実行している。詳細は、『Deploying and Upgrading AMQ Streams on OpenShift』を参照してください。
  • Podman または Docker がインストールされている。
  • Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー( quay.io または docker.ioなど)でコンテナーを作成および管理するアカウントおよびパーミッションがある。
  • Oracle JDBC ドライバーのコピーがある。ライセンス要件により、Debezium Oracle コネクターには必要な JDBC ドライバーファイルが含まれていません。

    詳細は、Obtaining the Oracle JDBC driverを参照してください。

手順

  1. Kafka Connect の Debezium Oracle コンテナーを作成します。

    1. Debezium Oracle コネクターアーカイブ をダウンロードします。
    2. Debezium Oracle コネクターアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。

      ./my-plugins/
      ├── debezium-connector-oracle
      │   ├── ...
    3. registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.0 をベースイメージとして使用する Dockerfile を作成します。たとえば、ターミナルウィンドウに以下のコマンドを入力します。my -plugins はプラグインディレクトリーの名前に置き換えます。

      cat <<EOF >debezium-container-for-oracle.yaml 1
      FROM registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.0
      USER root:root
      COPY ./<my-plugins>/ /opt/kafka/plugins/ 2
      USER 1001
      EOF
      1 1 1
      任意のファイル名を指定できます。
      2 2 2
      my-plugins は、プラグインディレクトリーの名前に置き換えます。

      このコマンドは、現在のディレクトリーに debezium-container-for-oracle.yaml という名前の Docker ファイルを作成します。

    4. 前の手順で作成した debezium-container-for-oracle.yaml Docker ファイルからコンテナーイメージをビルドします。ファイルを含むディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。

      podman build -t debezium-container-for-oracle:latest .
      docker build -t debezium-container-for-oracle:latest .

      上記のコマンドは、debezium-container-for-oracle という名前のコンテナーイメージを構築します。

    5. カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。

      podman push <myregistry.io>/debezium-container-for-oracle:latest
      docker push <myregistry.io>/debezium-container-for-oracle:latest
    6. 新しい Debezium Oracle KafkaConnect カスタムリソース (CR) を作成します。たとえば、以下の例のように アノテーション およびイメージ プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        #...
        image: debezium-container-for-oracle  2
      1
      metadata.annotations は、KafkaConnectors リソースがこの Kafka Connect クラスターでコネクターを設定するために使用されることを Cluster Operator に示します。
      2
      spec.image は、Debezium コネクターを実行するために作成したイメージの名前を指定します。このプロパティーは、Cluster Operator の STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数を上書きします。
    7. 以下のコマンドを入力して、KafkaConnect CR を OpenShift Kafka Connect 環境に適用します。

      oc create -f dbz-connect.yaml

      このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。

  2. Debezium Oracle コネクターインスタンスを設定する KafkaConnector カスタムリソースを作成します。

    コネクターの設定プロパティーを指定する a .yaml ファイルで Debezium Oracle コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。

    以下の例では、port 1521 で Oracle ホスト IP アドレスに接続する Debezium コネクターを設定します。このホストには ORCLCDB という名前のデータベースがあり、server1 はサーバーの論理名です。

    Oracle inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: inventory-connector 1
      labels:
        strimzi.io/cluster: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: 'true'
    spec:
      class: io.debezium.connector.oracle.OracleConnector 2
      config:
        database.hostname: <oracle_ip_address> 3
        database.port: 1521 4
        database.user: c##dbzuser 5
        database.password: dbz 6
        database.dbname: ORCLCDB 7
        database.pdb.name : ORCLPDB1, 8
        database.server.name: server1 9
        database.history.kafka.bootstrap.servers: kafka:9092 10
        database.history.kafka.topic: schema-changes.inventory 11

    表6.6 コネクター設定の説明

    項目説明

    1

    Kafka Connect サービスに登録する場合のコネクターの名前。

    2

    この Oracle コネクタークラスの名前。

    3

    Oracle インスタンスのアドレス。

    4

    Oracle インスタンスのポート番号。

    5

    コネクターのユーザーの作成 で指定した Oracle ユーザーの名前。

    6

    コネクターのユーザーの作成 で指定した Oracle ユーザーのパスワード。

    7

    変更をキャプチャーするデータベースの名前。

    8

    コネクターが変更をキャプチャーする Oracle のプラグ可能なデータベースの名前。コンテナーデータベース (CDB) のインストールでのみ使用されます。

    9

    コネクターが変更をキャプチャーする Oracle データベースサーバーの namespace を識別および提供する論理名。

    10

    DDL ステートメントをデータベース履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。

    11

    コネクターが DDL ステートメントを書き、復元するデータベース履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。

  3. Kafka Connect でコネクターインスタンスを作成します。たとえば、KafkaConnector リソースを inventory-connector.yaml ファイルに保存した場合、以下のコマンドを実行します。

    oc apply -f inventory-connector.yaml

    上記のコマンドは inventory-connector を登録し、コネクターは KafkaConnector CR に定義されている server1 データベースに対して実行を開始します。

  4. コネクターが作成され、起動されたことを確認します。

    1. Kafka Connect ログ出力を表示して、コネクターが作成され、指定データベースの変更のキャプチャーが開始されたことを確認します。

      oc logs $(oc get pods -o name -l strimzi.io/cluster=my-connect-cluster)
    2. ログの出力を確認し、Debezium が初回のスナップショットを実行することを確認します。ログには、以下のメッセージと同様の出力が表示されます。

      ... INFO Starting snapshot for ...
      ... INFO Snapshot is using user 'c##dbzuser' ...

      コネクターがエラーがなく正常に起動すると、コネクターが変更をキャプチャーする各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブできます。

    3. 以下のコマンドを実行して、コネクターによってトピックが作成されたことを検証します。

      oc get kafkatopics

Debezium Oracle コネクターに設定できる設定プロパティーの完全リストは Oracleコネクタープロパティー を参照してください。

結果

コネクターが起動すると、コネクターが設定された Oracle データベースの整合性スナップショットが実行されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。