7.5. Kafka Connect の設定

AMQ Streams の KafkaConnect リソースを使用して、新しい Kafka Connect クラスターをすばやく簡単に作成します。

KafkaConnect リソースを使用して Kafka Connect をデプロイする場合は、Kafka クラスターに接続するためのブートストラップサーバーアドレスを (spec.bootstrapServers で) 指定します。サーバーがダウンした場合に備えて、複数のアドレスを指定できます。また、認証情報と TLS 暗号化証明書を指定して、安全な接続を確立します。

注記

Kafka クラスターは、AMQ Streams で管理したり、OpenShift クラスターにデプロイしたりする必要はありません。

KafkaConnect リソースを使用して、以下を指定することもできます。

  • 接続を確立するためのプラグインを含むコンテナーイメージを構築するためのプラグイン設定
  • Kafka Connect クラスターに属するワーカー Pod の設定
  • KafkaConnector リソースを使用してプラグインを管理できるようにするアノテーション

Cluster Operator は、KafkaConnect リソースを使用してデプロイされた Kafka Connect クラスターと、KafkaConnector リソースを使用して作成されたコネクターを管理します。

プラグイン設定

プラグインは、コネクターインスタンスを作成するための実装を提供します。プラグインがインスタンス化されると、特定のタイプの外部データシステムに接続するための設定が提供されます。プラグインは、特定の種類のデータソースに接続するためのコネクターとタスクの実装を定義する 1 つ以上の JAR ファイルのセットを提供します。多くの外部システム用のプラグインは、Kafka Connect で使用できます。独自のプラグインを作成することもできます。

この設定では、Kafka Connect にフィードするソース入力データおよびターゲット出力データを記述します。ソースコネクターの場合、外部ソースデータは、メッセージを格納する特定のトピックを参照する必要があります。プラグインには、データの変換に必要なライブラリーとファイルが含まれている場合もあります。

Kafka Connect デプロイメントには、1 つ以上のプラグインを含めることができますが、各プラグインのバージョンは 1 つだけです。

選択したプラグインを含むカスタム Kafka Connect イメージを作成できます。イメージは次の 2 つの方法で作成できます。

コンテナーイメージを自動的に作成するには、KafkaConnect リソースの build プロパティーを使用して、Kafka Connect クラスターに追加するプラグインを指定します。AMQ Streams は、プラグインアーティファクトを自動的にダウンロードして新しいコンテナーイメージに追加します。

プラグイン設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  build: 1
    output: 2
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: 3
      - name: debezium-postgres-connector
        artifacts:
          - type: tgz
            url: https://ARTIFACT-ADDRESS.tgz
            sha512sum: HASH-NUMBER-TO-VERIFY-ARTIFACT
      # ...
  # ...

1
コネクタープラグインで自動的にコンテナーイメージをビルドするための ビルド設定プロパティー
2
新しいイメージがプッシュされるコンテナーレジストリーの設定。output プロパティーは、イメージのタイプおよび名前を記述し、任意でコンテナーレジストリーへのアクセスに必要なクレデンシャルが含まれる Secret の名前を記述します。
3
新しいコンテナーイメージに追加するプラグインとそのアーティファクトのリスト。plugins プロパティーは、アーティファクトのタイプとアーティファクトのダウンロード元となる URL を記述します。各プラグインは、1 つ以上のアーティファクトで設定する必要があります。さらに、SHA-512 チェックサムを指定して、アーティファクトを展開する前に検証することもできます。

Dockerfile を使用してイメージをビルドしている場合は、AMQ Streams の最新のコンテナーイメージをベースイメージとして使用して、プラグイン設定ファイルを追加できます。

プラグイン設定の手動追加を示す例

FROM registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

ワーカー用の Kafka Connect クラスター設定

ワーカーの設定は、KafkaConnect リソースの config プロパティーで指定します。

分散型 Kafka Connect クラスターには、グループ ID と一連の内部設定トピックがあります。

  • group.id
  • offset.storage.topic
  • config.storage.topic
  • status.storage.topic

Kafka Connect クラスターは、デフォルトでこれらのプロパティーに同じ値で設定されています。Kafka Connect クラスターは、エラーを作成するため、グループ ID またはトピック名を共有できません。複数の異なる Kafka Connect クラスターを使用する場合、これらの設定は、作成された各 Kafka Connect クラスターのワーカーに対して一意である必要があります。

各 Kafka Connect クラスターで使用されるコネクターの名前も一意である必要があります。

次のワーカー設定の例では、JSON コンバーターが指定されています。レプリケーション係数は、Kafka Connect によって使用される内部 Kafka トピックに設定されます。これは、実稼働環境では少なくとも 3 つ必要です。トピックの作成後にレプリケーション係数を変更しても効果はありません。

ワーカー設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
# ...
spec:
  config:
    # ...
    group.id: my-connect-cluster 1
    offset.storage.topic: my-connect-cluster-offsets 2
    config.storage.topic: my-connect-cluster-configs 3
    status.storage.topic: my-connect-cluster-status 4
    key.converter: org.apache.kafka.connect.json.JsonConverter 5
    value.converter: org.apache.kafka.connect.json.JsonConverter 6
    key.converter.schemas.enable: true 7
    value.converter.schemas.enable: true 8
    config.storage.replication.factor: 3 9
    offset.storage.replication.factor: 3 10
    status.storage.replication.factor: 3 11
  # ...

1
Kafka 内の Kafka Connect クラスター ID。Kafka Connect クラスターごとに一意である必要があります。
2
コネクターオフセットを保存する Kafka トピック。Kafka Connect クラスターごとに一意である必要があります。
3
コネクターおよびタスクステータスの設定を保存する Kafka トピック。Kafka Connect クラスターごとに一意である必要があります。
4
コネクターおよびタスクステータスの更新を保存する Kafka トピック。Kafka Connect クラスターごとに一意である必要があります。
5
Kafka に保存するためにメッセージキーを JSON 形式に変換するコンバーター。
6
Kafka に保存するためにメッセージ値を JSON 形式に変換するコンバーター。
7
メッセージキーを構造化された JSON 形式に変換できるスキーマ。
8
メッセージ値を構造化された JSON 形式に変換できるスキーマ。
9
コネクターオフセットを保存する Kafka トピックのレプリケーション係数。
10
コネクターとタスクのステータス設定を保存する Kafka トピックのレプリケーション係数。
11
コネクターとタスクのステータスの更新を保存する Kafka トピックのレプリケーション係数。

コネクターの Kafka Connector 管理

デプロイメントでワーカー Pod に使用されるコンテナーイメージにプラグインが追加されたら、AMQ Streams のKafkaConnector カスタムリソースまたは Kafka Connect API を使用してコネクターインスタンスを管理できます。これらのオプションを使用して、新しいコネクターインスタンスを作成することもできます。

KafkaConnector リソースは、Cluster Operator によるコネクターの管理に OpenShift ネイティブのアプローチを提供します。KafkaConnector リソースを使用してコネクターを管理するには、KafkaConnect カスタムリソースでアノテーションを指定する必要があります。

KafkaConnectors を有効にするためのアノテーション

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...

use-connector-resourcestrue に設定すると、KafkaConnectors はコネクターを作成、削除、および再設定できます。

KafkaConnect 設定で use-connector-resources が有効になっている場合は、KafkaConnector リソースを使用してコネクターを定義および管理する必要があります。KafkaConnector リソースは、外部システムに接続するように設定されています。これらは、外部データシステムと相互作用する Kafka Connect クラスターおよび Kafka クラスターと同じ OpenShift クラスターにデプロイされます。

同じ OpenShift クラスター内に含まれる Kafka コンポーネント

Kafka and Kafka Connect clusters

設定は、認証を含め、コネクターインスタンスが外部データシステムに接続する方法を指定します。また、監視するデータを指定する必要があります。ソースコネクターの場合は、設定でデータベース名を指定できます。ターゲットトピック名を指定することで、Kafka のどこにデータを配置するかを指定することもできます。

タスクの最大数を指定するには、tasksMax を使用します。たとえば、tasksMax: 2 のソースコネクターは、ソースデータのインポートを 2 つのタスクに分割する場合があります。

KafkaConnector ソースコネクター設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector  1
  labels:
    strimzi.io/cluster: my-connect-cluster 2
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector 3
  tasksMax: 2 4
  config: 5
    file: "/opt/kafka/LICENSE" 6
    topic: my-topic 7
    # ...

1
コネクターの名前として使用される KafkaConnector リソースの名前。OpenShift リソースで有効な名前を使用します。
2
コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
3
コネクタークラスのフルネーム。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
4
コネクターが作成できる Kafka Connect タスクの最大数。
5
キーと値のペアとしての コネクター設定
6
外部データファイルの場所。この例では、/opt/kafka/LICENSE ファイルから読み取るように FileStreamSourceConnector を設定しています。
7
ソースデータのパブリッシュ先となる Kafka トピック。
注記

OpenShift Secrets または ConfigMaps から コネクターの機密設定値をロード できます。

Kafka Connect API

KafkaConnector リソースを使用してコネクターを管理する代わりに、Kafka Connect REST API を使用します。Kafka Connect REST API は、<connect_cluster_name>-connect-api:8083 で実行しているサービスとして利用できます。ここで、<connect_cluster_name> は、お使いの Kafka Connect クラスターの名前になります。

コネクター設定を JSON オブジェクトとして追加します。

コネクター設定を追加するための curl 要求の例

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
    }
}'

KafkaConnectors が有効になっている場合、Kafka Connect REST API に直接手作業で追加された変更は Cluster Operator によって元に戻されます。

REST API でサポートされる操作は、Apache Kafka のドキュメント を参照してください。

注記

Kafka Connect API サービスを OpenShift の外部に公開できます。これを行うには、入力やルートなどのアクセスを提供する接続メカニズムを使用するサービスを作成します。接続のセキュリティーが低いので慎重に使用してください。