Red Hat Training
A Red Hat training course is available for Red Hat Fuse
第11章 Kafka への接続
Apache Kafka はデータの取得およびパブリッシュに使用できる分散ストリーミングプラットフォームです。インテグレーションでは、指定した Kafka トピックからデータをサブスクライブでき、または指定した Kafka トピックにデータをパブリッシュすることもできます。これには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションフローに追加します。詳細は以下を参照してください。
11.1. Kafka ブローカーへのコネクションの作成
インテグレーションで、Kafka トピックからデータをサブスクライブしたり、Kafka トピックにデータをパブリッシュするには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションに追加します。
前提条件
接続する Kafka ブローカーの URI を知っている必要があります。
手順
- Fuse Online の左パネルで Connections をクリックし、利用可能なコネクションを表示します。
- 右上の Create Connection をクリックしてコネクターを表示します。
- Kafka コネクターをクリックします。
- Kafka bootstraps URI フィールドに、Kafka ブローカー URI のコンマ区切りリストを入力します。各 URI は host:port 形式で指定する必要があります。
- Validate をクリックします。Fuse Online は即座にコネクションを検証しようとし、検証の成功または失敗を示すメッセージを表示します。検証に失敗した場合は、入力パラメーターを訂正し、再度検証を行います。
- 検証に成功した場合は Next をクリックします。
-
Connection Name フィールドに、このコネクションを別のコネクションと区別するために使用する名前を入力します。たとえば、
Kafka Westを入力します。 - Description は任意のフィールドで、このコネクションに関する便利な情報を入力します。
- 右上にある Create をクリックし、作成したコネクションが利用可能になったことを確認します。上記の例の名前を入力した場合は、インテグレーションに追加できるコネクションとして Kafka West が表示されます。
11.2. Kafka ブローカーからデータを取得してインテグレーションの実行をトリガー
Kafka ブローカーからデータを受信したときにインテグレーションの実行をトリガーするには、Kafka コネクションを最初のコネクションとして追加します。インテグレーションの実行中、Kafka コネクションは指定した Kafka トピックでデータを継続的に監視します。コネクションが新しいデータを見つけると、そのデータをインテグレーションの次のステップに渡します。
前提条件
Kafka ブローカーへのコネクションが作成されている必要があります。
手順
- Fuse Online パネルの左側にある Integrations をクリックします。
- Create Integration をクリックします。
- Choose a Start Connection ページで、インテグレーションを開始するために使用する Kafka コネクションをクリックします。
- Choose an Action ページで、Subscribe for messages アクションをクリックし、指定したトピックからデータを受信します。
- Topic Name フィールドで、下向きの矢印をクリックしてトピックのリストを表示し、サブスクライブするトピックをクリックします。
- Next をクリックして、アクションの出力タイプを指定します。
データタイプの認識が必要がない場合は、Select Type フィールドで Type specification not required を受け入れ、下部にある Done をクリックします。残りの手順を行う必要はありません。
その他の場合は、以下の 1 つをスキーマタイプとして選択します。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
application/schema+jsonです。 -
JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは
application/jsonです。 -
XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は
.xsdです。 -
XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は
.xmlです。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
-
Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが
application/schema+jsonの JSON スキーマファイルの内容を貼り付けます。 Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、データ型名として
Vendorを指定できます。このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーション仮想化パネルとデータマッパーにタイプ名が表示されます。
- Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
- Done をクリックします。
結果
インテグレーションフローの最初にコネクションが表示されます。
11.3. Kafka ブローカーへのデータのパブリッシュ
インテグレーションでは、フローの途中で Kafka ブローカーにデータをパブリッシュしたり、シンプルなインテグレーションを終了するためにデータをパブリッシュしたりすることができます。これには、Kafka コネクションをフローの途中に追加するか、シンプルなインテグレーションの最後のコネクションとして追加します。
前提条件
- Kafka ブローカーへのコネクションが作成されている必要があります。
- フローを作成または編集することになります。Fuse Online はステップの選択を要求します。または、Fuse Online で最後のコネクションを選択するよう要求されます。
手順
- メッセージのパブリッシュに使用する Kafka コネクションをクリックします。
- Choose an Action ページで Publish をクリックします。
- Topic Name フィールドで、下向きの矢印をクリックしてトピックのリストを表示し、パブリッシュするトピックをクリックします。
- Next をクリックしてアクションの入力タイプを指定します。
データタイプの認識が必要がない場合は、Select Type フィールドで Type specification not required を受け入れ、下部にある Done をクリックします。残りの手順を行う必要はありません。
その他の場合は、以下の 1 つをスキーマタイプとして選択します。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
application/schema+jsonです。 -
JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは
application/jsonです。 -
XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は
.xsdです。 -
XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は
.xmlです。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
-
Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが
application/schema+jsonの JSON スキーマファイルの内容を貼り付けます。 Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、データ型名として
Vendorを指定できます。このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーション仮想化パネルとデータマッパーにタイプ名が表示されます。
- Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
- Done をクリックします。
結果
インテグレーションフローで追加した場所にコネクションが表示されます。