15.2. Kafka ブローカーからデータを取得してインテグレーションの実行をトリガー

Kafka ブローカーからデータを受信したときにインテグレーションの実行をトリガーするには、Kafka コネクションを最初のコネクションとして追加します。インテグレーションの実行中、Kafka コネクションは指定した Kafka トピックでデータを継続的に監視します。コネクションが新しいデータを見つけると、そのデータをインテグレーションの次のステップに渡します。

前提条件

Kafka ブローカーへのコネクションが作成されている必要があります。

手順

  1. Fuse Online パネルの左側にある Integrations をクリックします。
  2. Create Integration をクリックします。
  3. Choose a connection ページで、インテグレーションを開始するために使用する Kafka コネクションをクリックします。
  4. Choose an action ページで、Subscribe アクションを選択し、指定したトピックからデータを受信します。
  5. Topic Name フィールドで、下向きの矢印をクリックしてトピックのリストを表示し、サブスクライブするトピックをクリックします。または、トピック名を入力してトピックを作成します。
  6. Next をクリックして、アクションの出力タイプを指定します。
  7. データタイプの認識が必要がない場合は、Select Type フィールドに Type specification not required を使用し、Next をクリックします。残りの手順を行う必要はありません。

    ただし、構造化データタイプが推奨されます。たとえば、Kafka コネクション出力を後続のステップにマップする場合、データタイプを指定する必要があります。データマッパーは、非構造化データを認識しません。

    データタイプを指定するには、Select Type フィールドをクリックし、以下の 1 つをスキーマタイプとして選択します。

    • JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは application/schema+json です。
    • JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは application/json です。
    • XML schema は XML データの構造を記述するドキュメントです。ドキュメントのファイル拡張子は .xsd です。
    • XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は .xml です。
  8. Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが application/schema+json の JSON スキーマファイルの内容を貼り付けます。
  9. Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、Vendor をデータタイプ名として指定することができます。

    このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーションビジュアライゼーションとデータマッパーにタイプ名が表示されます。

  10. Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
  11. Next をクリックします。

結果

インテグレーションに最初のコネクションが追加され、Fuse Online は最後のコネクションを選択するよう要求します。