5.2.5. サンプル KafkaConnector リソースのデプロイ

AMQ Streams には、examples/connect/source-connector.yaml に KafkaConnector の例 が含まれています。これにより、Kafka ライセンスファイルの各行(サンプルファイルソース)を 1 つの Kafka トピックに送信する基本的な FileStreamSourceConnector インスタンスが作成されます。

この手順では、以下を作成する方法を説明します。

  • Kafka ライセンスファイル(ソース)からデータを読み取り、データをメッセージとして Kafka トピックに書き込む FileStreamSourceConnector
  • Kafka トピックからメッセージを読み取り、メッセージを一時ファイル(シンク)に書き込む FileStreamSinkConnector
注記

実稼働環境で、「コネクタープラグインでの Kafka Connect の拡張」 の説明どおりに、必要な Kafka Connect コネクターが含まれるコンテナーイメージを準備します。

FileStreamSourceConnector および FileStreamSinkConnector が例として提供されています。ここで説明するように、コンテナーでこれらのコネクターを実行することは、実稼働のユースケースには適していません。

前提条件

手順

  1. examples/connect/source-connector.yaml ファイルを編集します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector 1
      labels:
        strimzi.io/cluster: my-connect-cluster 2
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector 3
      tasksMax: 2 4
      config: 5
        file: "/opt/kafka/LICENSE" 6
        topic: my-topic 7
        # ...
    1
    コネクターの名前として使用される KafkaConnector リソースの名前。OpenShift リソースで有効な名前を使用します。
    2
    コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
    3
    コネクタークラスのフルネームまたはエイリアス。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
    4
    コネクターが作成できる Kafka Connect Tasks の最大数。
    5
    キーと値のペアとしてのコネクター設定
    6
    このサンプルソースコネクター設定では、/opt/kafka/LICENSE ファイルからデータが読み取られます。
    7
    ソースデータのパブリッシュ先となる Kafka トピック。
  2. OpenShift クラスターでソース KafkaConnector を作成します。

    oc apply -f examples/connect/source-connector.yaml
  3. examples/connect/sink-connector.yaml ファイルを作成します。

    touch examples/connect/sink-connector.yaml
  4. 以下の YAML を sink-connector.yaml ファイルに貼り付けます。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-sink-connector
      labels:
        strimzi.io/cluster: my-connect
    spec:
      class: org.apache.kafka.connect.file.FileStreamSinkConnector 1
      tasksMax: 2
      config: 2
        file: "/tmp/my-file" 3
        topics: my-topic 4
    1
    コネクタークラスのフルネームまたはエイリアス。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
    2
    キーと値のペアとしてのコネクター設定
    3
    ソースデータのパブリッシュ先となる一時ファイル。
    4
    ソースデータの読み取り元となる Kafka トピック。
  5. OpenShift クラスターにシンク KafkaConnector を作成します。

    oc apply -f examples/connect/sink-connector.yaml
  6. コネクターリソースが作成されたことを確認します。

    oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name
    
    my-source-connector
    my-sink-connector

    MY-CONNECT-CLUSTER を Kafka Connect クラスターに置き換えます。

  7. コンテナーで、kafka -console-consumer.sh を実行して、ソースコネクターによってトピックに書き込まれたメッセージを読み取ります。

    oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
ソースおよびシンクコネクターの設定オプション

コネクター設定は KafkaConnector リソースの spec.config プロパティーで定義されます。

FileStreamSourceConnector クラスおよび FileStreamSinkConnector クラスは、Kafka Connect REST API と同じ設定オプションをサポートします。他のコネクターは異なる設定オプションをサポートします。

表5.2 FileStreamSource コネクタークラスの設定オプション

名前タイプデフォルト値説明

file

文字列

Null

メッセージを書き込むソースファイル。指定のない場合は、標準入力が使用されます。

topic

List

Null

データのパブリッシュ先となる Kafka トピック。

表5.3 FileStreamSinkConnector クラスの設定オプション

名前タイプデフォルト値説明

file

文字列

Null

メッセージを書き込む宛先ファイル。指定のない場合は標準出力が使用されます。

topics

List

Null

データの読み取り元となる 1 つ以上の Kafka トピック。

topics.regex

文字列

Null

データの読み取り元となる 1 つ以上の Kafka トピックと一致する正規表現。