第15章 Kafka への接続

Apache Kafka はデータの取得および公開に使用できる分散ストリーミングプラットフォームです。インテグレーションでは、指定した Kafka トピックからデータをサブスクライブでき、または指定した Kafka トピックにデータを公開することもできます。これには、Fuse Online を有効にし、クラスターの Kafka ブローカーを検出します。その後、Kafka へのコネクションを作成し、そのコネクションをインテグレーションフローに追加します。詳細は以下を参照してください。

15.1. Kafka ブローカー/AMQ Streams の自動検出の有効化

インテグレーションで、AMQ Streams インスタンスである Kafka ブローカーに接続する予定である場合は、Kafka コネクションを作成する前に、このようなブローカーの自動検出を有効にします。これにより、コネクションの設定に必要な Kafka ブローカーの詳細が Fuse Online によって提供されるため、Kafka コネクションの作成が容易になります。

自動検出を有効にするには、クラスターのリソースを確認できる権限を OpenShift ユーザーアカウントにクラスターに付与します。特定の権限をアカウントに付与すると、ブローカーの異なるサブセットを検出できるようになります。

注記

OpenShift 4.5 では、Kafka ブローカーの自動検出の有効化はサポートされていません。Kafka ブローカーに接続するには、Kafka ブローカーの URI (host:port 形式) を知っている必要があります。

前提条件

  • オンサイトで Fuse Online を OCP 上で稼働している必要があります。
  • oc クライアントツールがインストール済みで、Kafka ブローカーの自動検出を有効にするクラスターに接続されている必要があります。
  • クラスターの管理者権限が必要です。
  • インテグレーションで接続する Kafka ブローカー (AMQ Streams インスタンス) が Fuse Online と同じクラスターにインストールされている必要があります。

手順

  1. クラスター管理者権限を持つアカウントで OpenShift にログインします。以下に例を示します。

    $ oc login -u admin -p admin
  2. 現在のプロジェクトが Fuse Online で稼働しているプロジェクトであることを確認します。現在のプロジェクトを表示するには、以下のコマンドを実行します。

    $ oc project
  3. クラスターの Kafka ブローカーが oc ユーザーアカウントにアクセスできるようにするための権限を付与します。クラスターの設定方法によって、付与する必要のある権限が決まります。たとえば、minishift 上の Strimzi クラスターの場合、以下を行うことができます。

    1. クラスターの Kafka/Strimzi リソースにアクセスできるクラスターロールを作成します。以下の例では、kafkas.kafka.strimzi.io-view が新しいロールの名前になります。

      oc create clusterrole kafkas.kafka.strimzi.io-view --verb=get,list --resource=kafkas --resource=crd
    2. このクラスターロールを oc ユーザーに追加します。以下の例では、syndesis-serveroc ユーザーアカウントの名前になります。

      oc adm policy add-cluster-role-to-user kafkas.kafka.strimzi.io-view -z syndesis-server

結果

Fuse Online で Kafka コネクションを作成すると、コネクション設定ページに利用できる Kafka ブローカーが表示されます。コネクションがアクセスするブローカーを選択します。

15.2. Kafka ブローカーへのコネクションの作成

インテグレーションで、Kafka トピックからデータをサブスクライブしたり、Kafka トピックにデータを公開するには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションに追加します。

前提条件

  • Kafka ブローカー/AMQ Streams の自動検出の有効化」の説明どおりに、クラスターで Kafka ブローカーの自動検出を有効化済みである必要があります。または、接続する Kafka ブローカーの URI を知っている必要があります。
  • Transport Layer Security (TLS) を使用してデータを暗号化する場合は、Kafka ブローカーの PEM 形式の証明書テキストが必要です。通常、Kafka サーバー管理者からブローカー証明書テキストを取得します。

手順

  1. Fuse Online の左パネルで Connections をクリックし、利用可能なコネクションを表示します。
  2. Create Connection をクリックしてコネクターを表示します。
  3. Kafka Message Broker コネクターをクリックします。
  4. Kafka broker URIs フィールドで、このコネクションがアクセスするブローカーを選択するか、Kafka ブローカー URI のコンマ区切りリストを入力します。各 URI は host:port 形式で指定する必要があります。
  5. Transport Protocol フィールドでは、以下のオプションのいずれかを選択します。

    • 転送中のデータを保護するためにデータを暗号化する場合は、TLS (Transport Layer Security) を選択します。
    • データを暗号化しない場合は、Plain を選択し、7. に進みます。
  6. 5. で TLS を選択した場合、Broker certificate フィールドに Kafka ブローカーの PEM 形式の証明書テキストを貼り付けます。
  7. Validate をクリックします。Fuse Online は即座にコネクションを検証しようとし、検証の成功または失敗を示すメッセージを表示します。検証に失敗した場合は、入力パラメーターを訂正し、再度検証を行います。
  8. 検証に成功した場合は Next をクリックします。
  9. Name フィールドに、このコネクションを別のコネクションと区別するために使用する名前を入力します。たとえば、Kafka West を入力します。
  10. Description は任意のフィールドで、このコネクションに関する便利な情報を入力します。
  11. Save をクリックし、作成したコネクションが利用可能になったことを確認します。上記の例の名前を入力した場合は、インテグレーションに追加できるコネクションとして Kafka West が表示されます。

15.3. Kafka ブローカーからデータを取得してインテグレーションの実行をトリガー

Kafka ブローカーからデータを受信したときにインテグレーションの実行をトリガーするには、Kafka コネクションを最初のコネクションとして追加します。インテグレーションの実行中、Kafka コネクションは指定した Kafka トピックでデータを継続的に監視します。コネクションが新しいデータを見つけると、そのデータをインテグレーションの次のステップに渡します。

前提条件

Kafka ブローカーへのコネクションが作成されている必要があります。

手順

  1. Fuse Online パネルの左側にある Integrations をクリックします。
  2. Create Integration をクリックします。
  3. Choose a connection ページで、インテグレーションを開始するために使用する Kafka コネクションをクリックします。
  4. Choose an action ページで、Subscribe アクションを選択し、指定したトピックからデータを受信します。
  5. Topic Name フィールドで、下向きの矢印をクリックしてトピックのリストを表示し、サブスクライブするトピックをクリックします。または、トピック名を入力してトピックを作成します。
  6. Next をクリックして、アクションの出力タイプを指定します。
  7. データタイプの認識が必要がない場合は、Select Type フィールドに Type specification not required を使用し、Next をクリックします。残りの手順を行う必要はありません。

    ただし、構造化データタイプが推奨されます。たとえば、Kafka コネクション出力を後続のステップにマップする場合、データタイプを指定する必要があります。データマッパーは、非構造化データを認識しません。

    データタイプを指定するには、Select Type フィールドをクリックし、以下の 1 つをスキーマタイプとして選択します。

    • JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは application/schema+json です。
    • JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは application/json です。
    • XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は .xsd です。
    • XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は .xml です。
  8. Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが application/schema+json の JSON スキーマファイルの内容を貼り付けます。
  9. Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、Vendor をデータタイプ名として指定することができます。

    このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーションビジュアライゼーションとデータマッパーにタイプ名が表示されます。

  10. Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
  11. Next をクリックします。

結果

インテグレーションに最初のコネクションが追加され、Fuse Online は最後のコネクションを選択するよう要求します。

15.4. Kafka ブローカーへのデータの公開

インテグレーションでは、フローの途中で Kafka ブローカーにデータを公開したり、シンプルなインテグレーションを終了するためにデータを公開したりすることができます。これには、Kafka コネクションをフローの途中に追加するか、シンプルなインテグレーションの最後のコネクションとして追加します。

前提条件

  • Kafka ブローカーへのコネクションが作成されている必要があります。
  • フローを作成または編集することになり、Fuse Online でインテグレーションに追加するよう要求されます。または、Fuse Online で最後のコネクションを選択するよう要求されます。

手順

  1. Add to Integration ページで、Kafka コネクションの追加先のプラス記号をクリックします。シンプルなインテグレーションの最後のコネクションを追加する場合は、このステップをスキップします。
  2. メッセージの公開に使用する Kafka コネクションをクリックします。
  3. Choose an action ページで Publish を選択します。
  4. Topic Name フィールドで、下向きの矢印をクリックしてトピックのリストを表示し、公開するトピックをクリックします。
  5. Next をクリックしてアクションの入力タイプを指定します。
  6. データタイプの認識が必要がない場合は、Select Type フィールドに Type specification not required を使用し、Next をクリックします。残りの手順を行う必要はありません。

    ただし、構造化データタイプが推奨されます。たとえば、コネクション入力をデータマッパーステップでマップする場合、データタイプを指定する必要があります。データマッパーは、非構造化データのフィールドを表示できません。

    データタイプを指定するには、Select Type フィールドをクリックし、以下の 1 つをスキーマタイプとして選択します。

    • JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは application/schema+json です。
    • JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは application/json です。
    • XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は .xsd です。
    • XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は .xml です。
  7. Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが application/schema+json の JSON スキーマファイルの内容を貼り付けます。
  8. Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、Vendor をデータタイプ名として指定することができます。

    このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーションビジュアライゼーションとデータマッパーにタイプ名が表示されます。

  9. Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
  10. Next をクリックします。

結果

インテグレーションビジュアライゼーションで追加した場所にコネクションが表示されます。