5.2.5. サンプル KafkaConnector リソースのデプロイ
AMQ Streams には、examples/connect/source-connector.yaml に
が含まれています。これにより、Kafka ライセンスファイルの各行(サンプルファイルソース)を 1 つの Kafka トピックに送信する基本的な KafkaConnector
の例FileStreamSourceConnector
インスタンスが作成されます。
この手順では、以下を作成する方法を説明します。
-
Kafka ライセンスファイル(ソース)からデータを読み取り、データをメッセージとして Kafka トピックに書き込む
FileStreamSourceConnector
。 -
Kafka トピックからメッセージを読み取り、メッセージを一時ファイル(シンク)に書き込む
FileStreamSinkConnector
。
実稼働環境で、「コネクタープラグインでの Kafka Connect の拡張」 の説明どおりに、必要な Kafka Connect コネクターが含まれるコンテナーイメージを準備します。
FileStreamSourceConnector
および FileStreamSinkConnector
が例として提供されています。ここで説明するように、コンテナーでこれらのコネクターを実行することは、実稼働のユースケースには適していません。
前提条件
- Kafka Connect デプロイメント。
- Kafka Connect デプロイメントで KafkaConnectors が有効になっている。
- Cluster Operator が稼働している必要があります。
手順
examples/connect/source-connector.yaml
ファイルを編集します。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 config: 5 file: "/opt/kafka/LICENSE" 6 topic: my-topic 7 # ...
- 1
- コネクターの名前として使用される KafkaConnector リソースの名前。OpenShift リソースで有効な名前を使用します。
- 2
- コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
- 3
- コネクタークラスのフルネームまたはエイリアス。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
- 4
- コネクターが作成できる Kafka Connect
Tasks
の最大数。 - 5
- キーと値のペアとしてのコネクター設定。
- 6
- このサンプルソースコネクター設定では、
/opt/kafka/LICENSE
ファイルからデータが読み取られます。 - 7
- ソースデータのパブリッシュ先となる Kafka トピック。
OpenShift クラスターでソース
KafkaConnector
を作成します。oc apply -f examples/connect/source-connector.yaml
examples/connect/sink-connector.yaml
ファイルを作成します。touch examples/connect/sink-connector.yaml
以下の YAML を
sink-connector.yaml
ファイルに貼り付けます。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: strimzi.io/cluster: my-connect spec: class: org.apache.kafka.connect.file.FileStreamSinkConnector 1 tasksMax: 2 config: 2 file: "/tmp/my-file" 3 topics: my-topic 4
OpenShift クラスターにシンク
KafkaConnector
を作成します。oc apply -f examples/connect/sink-connector.yaml
コネクターリソースが作成されたことを確認します。
oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name my-source-connector my-sink-connector
MY-CONNECT-CLUSTER を Kafka Connect クラスターに置き換えます。
コンテナーで、
kafka -console-consumer.sh
を実行して、ソースコネクターによってトピックに書き込まれたメッセージを読み取ります。oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
ソースおよびシンクコネクターの設定オプション
コネクター設定は KafkaConnector リソースの spec.config
プロパティーで定義されます。
FileStreamSourceConnector
クラスおよび FileStreamSinkConnector
クラスは、Kafka Connect REST API と同じ設定オプションをサポートします。他のコネクターは異なる設定オプションをサポートします。
表5.2 FileStreamSource
コネクタークラスの設定オプション
名前 | タイプ | デフォルト値 | 説明 |
---|---|---|---|
| 文字列 | Null | メッセージを書き込むソースファイル。指定のない場合は、標準入力が使用されます。 |
| List | Null | データのパブリッシュ先となる Kafka トピック。 |
表5.3 FileStreamSinkConnector
クラスの設定オプション
名前 | タイプ | デフォルト値 | 説明 |
---|---|---|---|
| 文字列 | Null | メッセージを書き込む宛先ファイル。指定のない場合は標準出力が使用されます。 |
| List | Null | データの読み取り元となる 1 つ以上の Kafka トピック。 |
| 文字列 | Null | データの読み取り元となる 1 つ以上の Kafka トピックと一致する正規表現。 |
その他のリソース