第2章 Kamelets を使用した Kafka への接続

Apache Kafka は、耐障害性のあるリアルタイムデータフィードを作成する、オープンソースの分散型 publish/subscribe メッセージングシステムです。Kafka は多数のコンシューマー (外部コネクション) 用にデータを素早く保存およびレプリケートします。

Kafka は、ストリーミングイベントを処理するソリューションの構築に役立ちます。分散されたイベント駆動型のアーキテクチャーでは、イベントをキャプチャーし、通信し、処理するのに役立つバックボーンが必要です。Kafka は、データソースとイベントをアプリケーションに接続する通信バックボーンとして機能します。

Kamelets を使用して、Kafka と外部リソース間の通信を設定できます。Kamelets を使用すると、コードを作成せずに、Kafka stream-processing フレームワークでデータをあるエンドポイントから別のエンドポイントに移動する方法を設定できます。Kamelets は、パラメーター値を指定して設定するルートテンプレートです。

たとえば、Kafka はデータをバイナリー形式で保存します。Kamelets を使用して、外部接続との間で送受信するデータをシリアライズおよびデシリアライズできます。Kamelets を使用すると、スキーマを検証し、データに追加、フィルターリング、マスクなどの変更を加えることができます。Kamelet はエラーを対処および処理できます。

2.1. Kamelets を使用した Kafka への接続の概要

Apache Kafka のストリーム処理フレームワークを使用する場合は、Kamelets を使用してサービスおよびアプリケーションを Kafka トピックに接続できます。Kamelet Catalog は、特に Kafka トピックへの接続のために以下の Kamelets を提供します。

  • kafka-sink: データプロデューサーから Kafka トピックにイベントを移動します。Kamelet Binding では、kafka-sink Kamelet をシンクとして指定します。
  • kafka-source: Kafka トピックからデータコンシューマーにイベントを移動します。Kamelet Binding では、kafka-source Kamelet をソースとして指定します。

図 2.1 は、ソースおよびシンク Kamelets を Kafka トピックに接続するフローを示しています。

kafkafow generic

図 2.1: Kamelets と Kafka トピックのデータフロー

以下は、Kamelets および Kamelet Bindings を使用してアプリケーションとサービスを Kafka トピックに接続する基本的な手順の概要です。

  1. Kafka を設定します。

    1. 必要な OpenShift Operator をインストールします。

      • OpenShift Streams for Apache Kafka の場合は、Camel K Operator、Camel K CLI、および Red Hat OpenShift Application Services (RHOAS) CLI をインストールします。
      • AMQ Streams の場合は、Camel K および AMQ Streams Operator ならびに Camel K CLI をインストールします。
    2. Kafka インスタンスを作成します。Kafka インスタンスはメッセージブローカーとして動作します。ブローカーにはトピックが含まれ、ストレージとメッセージの渡しをオーケストレーションします。
    3. Kafka トピックを作成します。トピックは、データの保存先を提供します。
    4. Kafka 認証クレデンシャルを取得します。
  2. Kafka トピックに接続するサービスまたはアプリケーションを決定します。
  3. Kamelet Catalog を表示して、インテグレーションに追加するソースおよびシンクコンポーネントの Kamelets を検索します。また、使用する各 Kamelet に必要な設定パラメーターを決定します。
  4. Kamelet Binding を作成します。

    • データソース (データを生成するコンポーネント) を Kafka トピックに接続する Kamelet Binding を作成します (kafka-sink Kamelet を使用します)。
    • (kafka-source Kamelet を使用して)kafka トピックをデータシンク (データを使用するコンポーネント) に接続する Kamelet Binding を作成します。
  5. 必要に応じて、Kamelet Binding 内で 1 つ以上のアクション Kamelets を中間ステップとして追加して、Kafka トピックとデータソースまたはシンク間で渡すデータを操作します。
  6. 必要に応じて、Kamelet Binding 内でエラーを処理する方法を定義します。
  7. Kamelet Binding をリソースとしてプロジェクトに適用します。

    Camel K Operator は、Kamelet Binding ごとに個別の Camel K インテグレーションを生成します。