7.5. Kafka Connect の設定
AMQ Streams の KafkaConnect リソースを使用して、新しい Kafka Connect クラスターをすばやく簡単に作成します。
KafkaConnect リソースを使用して Kafka Connect をデプロイする場合は、Kafka クラスターに接続するためのブートストラップサーバーアドレスを (spec.bootstrapServers で) 指定します。サーバーがダウンした場合に備えて、複数のアドレスを指定できます。また、認証情報と TLS 暗号化証明書を指定して、安全な接続を確立します。
Kafka クラスターは、AMQ Streams で管理したり、OpenShift クラスターにデプロイしたりする必要はありません。
KafkaConnect リソースを使用して、以下を指定することもできます。
- 接続を確立するためのプラグインを含むコンテナーイメージを構築するためのプラグイン設定
- Kafka Connect クラスターに属するワーカー Pod の設定
-
KafkaConnectorリソースを使用してプラグインを管理できるようにするアノテーション
Cluster Operator は、KafkaConnect リソースを使用してデプロイされた Kafka Connect クラスターと、KafkaConnector リソースを使用して作成されたコネクターを管理します。
プラグイン設定
プラグインは、コネクターインスタンスを作成するための実装を提供します。プラグインがインスタンス化されると、特定のタイプの外部データシステムに接続するための設定が提供されます。プラグインは、特定の種類のデータソースに接続するためのコネクターとタスクの実装を定義する 1 つ以上の JAR ファイルのセットを提供します。多くの外部システム用のプラグインは、Kafka Connect で使用できます。独自のプラグインを作成することもできます。
この設定では、Kafka Connect にフィードするソース入力データおよびターゲット出力データを記述します。ソースコネクターの場合、外部ソースデータは、メッセージを格納する特定のトピックを参照する必要があります。プラグインには、データの変換に必要なライブラリーとファイルが含まれている場合もあります。
Kafka Connect デプロイメントには、1 つ以上のプラグインを含めることができますが、各プラグインのバージョンは 1 つだけです。
選択したプラグインを含むカスタム Kafka Connect イメージを作成できます。イメージは次の 2 つの方法で作成できます。
コンテナーイメージを自動的に作成するには、KafkaConnect リソースの build プロパティーを使用して、Kafka Connect クラスターに追加するプラグインを指定します。AMQ Streams は、プラグインアーティファクトを自動的にダウンロードして新しいコンテナーイメージに追加します。
プラグイン設定の例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
# ...
build: 1
output: 2
type: docker
image: my-registry.io/my-org/my-connect-cluster:latest
pushSecret: my-registry-credentials
plugins: 3
- name: debezium-postgres-connector
artifacts:
- type: tgz
url: https://ARTIFACT-ADDRESS.tgz
sha512sum: HASH-NUMBER-TO-VERIFY-ARTIFACT
# ...
# ...
- 1
- コネクタープラグインで自動的にコンテナーイメージをビルドするための ビルド設定プロパティー。
- 2
- 新しいイメージがプッシュされるコンテナーレジストリーの設定。
outputプロパティーは、イメージのタイプおよび名前を記述し、任意でコンテナーレジストリーへのアクセスに必要なクレデンシャルが含まれる Secret の名前を記述します。 - 3
- 新しいコンテナーイメージに追加するプラグインとそのアーティファクトのリスト。
pluginsプロパティーは、アーティファクトのタイプとアーティファクトのダウンロード元となる URL を記述します。各プラグインは、1 つ以上のアーティファクトで設定する必要があります。さらに、SHA-512 チェックサムを指定して、アーティファクトを展開する前に検証することもできます。
Dockerfile を使用してイメージをビルドしている場合は、AMQ Streams の最新のコンテナーイメージをベースイメージとして使用して、プラグイン設定ファイルを追加できます。
プラグイン設定の手動追加を示す例
FROM registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
ワーカー用の Kafka Connect クラスター設定
ワーカーの設定は、KafkaConnect リソースの config プロパティーで指定します。
分散型 Kafka Connect クラスターには、グループ ID と一連の内部設定トピックがあります。
-
group.id -
offset.storage.topic -
config.storage.topic -
status.storage.topic
Kafka Connect クラスターは、デフォルトでこれらのプロパティーに同じ値で設定されています。Kafka Connect クラスターは、エラーを作成するため、グループ ID またはトピック名を共有できません。複数の異なる Kafka Connect クラスターを使用する場合、これらの設定は、作成された各 Kafka Connect クラスターのワーカーに対して一意である必要があります。
各 Kafka Connect クラスターで使用されるコネクターの名前も一意である必要があります。
次のワーカー設定の例では、JSON コンバーターが指定されています。レプリケーション係数は、Kafka Connect によって使用される内部 Kafka トピックに設定されます。これは、実稼働環境では少なくとも 3 つ必要です。トピックの作成後にレプリケーション係数を変更しても効果はありません。
ワーカー設定の例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
# ...
spec:
config:
# ...
group.id: my-connect-cluster 1
offset.storage.topic: my-connect-cluster-offsets 2
config.storage.topic: my-connect-cluster-configs 3
status.storage.topic: my-connect-cluster-status 4
key.converter: org.apache.kafka.connect.json.JsonConverter 5
value.converter: org.apache.kafka.connect.json.JsonConverter 6
key.converter.schemas.enable: true 7
value.converter.schemas.enable: true 8
config.storage.replication.factor: 3 9
offset.storage.replication.factor: 3 10
status.storage.replication.factor: 3 11
# ...
- 1
- Kafka 内の Kafka Connect クラスター ID。Kafka Connect クラスターごとに一意である必要があります。
- 2
- コネクターオフセットを保存する Kafka トピック。Kafka Connect クラスターごとに一意である必要があります。
- 3
- コネクターおよびタスクステータスの設定を保存する Kafka トピック。Kafka Connect クラスターごとに一意である必要があります。
- 4
- コネクターおよびタスクステータスの更新を保存する Kafka トピック。Kafka Connect クラスターごとに一意である必要があります。
- 5
- Kafka に保存するためにメッセージキーを JSON 形式に変換するコンバーター。
- 6
- Kafka に保存するためにメッセージ値を JSON 形式に変換するコンバーター。
- 7
- メッセージキーを構造化された JSON 形式に変換できるスキーマ。
- 8
- メッセージ値を構造化された JSON 形式に変換できるスキーマ。
- 9
- コネクターオフセットを保存する Kafka トピックのレプリケーション係数。
- 10
- コネクターとタスクのステータス設定を保存する Kafka トピックのレプリケーション係数。
- 11
- コネクターとタスクのステータスの更新を保存する Kafka トピックのレプリケーション係数。
コネクターの Kafka Connector 管理
デプロイメントでワーカー Pod に使用されるコンテナーイメージにプラグインが追加されたら、AMQ Streams のKafkaConnector カスタムリソースまたは Kafka Connect API を使用してコネクターインスタンスを管理できます。これらのオプションを使用して、新しいコネクターインスタンスを作成することもできます。
KafkaConnector リソースは、Cluster Operator によるコネクターの管理に OpenShift ネイティブのアプローチを提供します。KafkaConnector リソースを使用してコネクターを管理するには、KafkaConnect カスタムリソースでアノテーションを指定する必要があります。
KafkaConnectors を有効にするためのアノテーション
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
# ...
use-connector-resources を true に設定すると、KafkaConnectors はコネクターを作成、削除、および再設定できます。
KafkaConnect 設定で use-connector-resources が有効になっている場合は、KafkaConnector リソースを使用してコネクターを定義および管理する必要があります。KafkaConnector リソースは、外部システムに接続するように設定されています。これらは、外部データシステムと相互作用する Kafka Connect クラスターおよび Kafka クラスターと同じ OpenShift クラスターにデプロイされます。
同じ OpenShift クラスター内に含まれる Kafka コンポーネント
設定は、認証を含め、コネクターインスタンスが外部データシステムに接続する方法を指定します。また、監視するデータを指定する必要があります。ソースコネクターの場合は、設定でデータベース名を指定できます。ターゲットトピック名を指定することで、Kafka のどこにデータを配置するかを指定することもできます。
タスクの最大数を指定するには、tasksMax を使用します。たとえば、tasksMax: 2 のソースコネクターは、ソースデータのインポートを 2 つのタスクに分割する場合があります。
KafkaConnector ソースコネクター設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 config: 5 file: "/opt/kafka/LICENSE" 6 topic: my-topic 7 # ...
- 1
- コネクターの名前として使用される
KafkaConnectorリソースの名前。OpenShift リソースで有効な名前を使用します。 - 2
- コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
- 3
- コネクタークラスのフルネーム。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
- 4
- コネクターが作成できる Kafka Connect タスクの最大数。
- 5
- キーと値のペアとしての コネクター設定。
- 6
- 外部データファイルの場所。この例では、
/opt/kafka/LICENSEファイルから読み取るようにFileStreamSourceConnectorを設定しています。 - 7
- ソースデータのパブリッシュ先となる Kafka トピック。
OpenShift Secrets または ConfigMaps から コネクターの機密設定値をロード できます。
Kafka Connect API
KafkaConnector リソースを使用してコネクターを管理する代わりに、Kafka Connect REST API を使用します。Kafka Connect REST API は、<connect_cluster_name>-connect-api:8083 で実行しているサービスとして利用できます。ここで、<connect_cluster_name> は、お使いの Kafka Connect クラスターの名前になります。
コネクター設定を JSON オブジェクトとして追加します。
コネクター設定を追加するための curl 要求の例
curl -X POST \
http://my-connect-cluster-connect-api:8083/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "my-source-connector",
"config":
{
"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/opt/kafka/LICENSE",
"topic":"my-topic",
"tasksMax": "4",
"type": "source"
}
}'
KafkaConnectors が有効になっている場合、Kafka Connect REST API に直接手作業で追加された変更は Cluster Operator によって元に戻されます。
REST API でサポートされる操作は、Apache Kafka のドキュメント を参照してください。
Kafka Connect API サービスを OpenShift の外部に公開できます。これを行うには、入力やルートなどのアクセスを提供する接続メカニズムを使用するサービスを作成します。接続のセキュリティーが低いので慎重に使用してください。