第7章 Kafka Bridge

本章では、AMQ Streams Kafka Bridge について概説し、その REST API を使用して AMQ Streams と対話するために役立つ情報を提供します。ローカル環境で Kafka Bridge を試すには、本章で後述する 「Kafka Bridge クイックスタート」 を参照してください。

7.1. Kafka Bridge の概要

Kafka Bridge をインターフェイスとして使用し、Kafka クラスターに対して特定タイプのリクエストを行うことができます。

7.1.1. Kafka Bridge インターフェイス

AMQ Streams Kafka Bridge では、HTTP ベースのクライアントと Kafka クラスターとの対話を可能にする RESTful インターフェイスが提供されます。  Kafka Bridge では、クライアントアプリケーションによる Kafka プロトコルの変換は必要なく、Web API コネクションの利点が AMQ Streams に提供されます。

API には consumerstopics の 2 つの主なリソースがあります。これらのリソースは、Kafka クラスターでコンシューマーおよびプロデューサーと対話するためにエンドポイント経由で公開され、アクセスが可能になります。リソースと関係があるのは Kafka ブリッジのみで、Kafka に直接接続されたコンシューマーやプロデューサーとは関係はありません。

7.1.1.1. HTTP 要求

Kafka Bridge は、以下の方法で Kafka クラスターへの HTTP 要求をサポートします。

  • トピックにメッセージを送信する。
  • トピックからメッセージを取得する。
  • コンシューマーを作成および削除する。
  • コンシューマーをトピックにサブスクライブし、このようなトピックからメッセージを受信できるようにする。
  • コンシューマーがサブスクライブしているトピックの一覧を取得する。
  • トピックからコンシューマーのサブスクライブを解除する。
  • パーティションをコンシューマーに割り当てる。
  • コンシューマーオフセットの一覧をコミットする。
  • パーティションで検索して、コンシューマーが最初または最後のオフセットの位置、または指定のオフセットの位置からメッセージを受信できるようにする。

上記の方法で、JSON 応答と HTTP 応答コードのエラー処理を行います。メッセージは JSON またはバイナリー形式で送信できます。

クライアントは、ネイティブの Kafka プロトコルを使用する必要なくメッセージを生成して使用できます。

その他のリソース

  • リクエストおよび応答の例など、API ドキュメントを確認するには、AMQ Streams Web サイトの Strimzi Kafka Bridge Documentation を参照してください。

7.1.2. Kafka Bridge でサポートされるクライアント

Kafka Bridge を使用して、内部および外部の HTTP クライアントアプリケーションの両方を Kafka クラスターに統合できます。

内部クライアント
内部クライアントとは、Kafka Bridge 自体と同じ OpenShift クラスターで実行されるコンテナーベースの HTTP クライアントのことです。内部クライアントは、ホストの Kafka Bridge および KafkaBridge のカスタムリソースで定義されたポートにアクセスできます。
外部クライアント
外部クライアントとは、Kafka Bridge がデプロイおよび実行される OpenShift クラスター外部で実行される HTTP クライアントのことです。外部クライアントは、OpenShift Route、ロードバランサーサービス、または Ingress を使用して Kafka Bridge にアクセスできます。

HTTP 内部および外部クライアントの統合

Kafka Bridge

7.1.3. Kafka Bridge のセキュリティー保護

AMQ Streams には、現在 Kafka Bridge の暗号化、認証、または承認は含まれていません。そのため、外部クライアントから Kafka Bridge に送信されるリクエストは以下のようになります。

  • 暗号化されず、HTTPS ではなく HTTP を使用する必要がある。
  • 認証なしで送信される。

ただし、以下のような他の方法で Kafka Bridge をセキュアにできます。

  • Kafka Bridge にアクセスできる Pod を定義する OpenShift ネットワークポリシー。
  • 認証または承認によるリバースプロキシー (例: OAuth2 プロキシー)。
  • API ゲートウェイ。
  • TLS 終端をともなう Ingress または OpenShift ルート。

Kafka Bridge では、Kafka Broker への接続時に TLS 暗号化と、TLS および SASL 認証がサポートされます。OpenShift クラスター内で以下を設定できます。

  • Kafka Bridge と Kafka クラスター間の TLS または SASL ベースの認証。
  • Kafka Bridge と Kafka クラスター間の TLS 暗号化接続。

詳細は、「Kafka Bridge での認証サポート」 を参照してください。

Kafka ブローカーで ACL を使用することで、Kafka Bridge を使用して消費および生成できるトピックを制限することができます。

7.1.4. OpenShift 外部の Kafka Bridge へのアクセス

デプロイメント後、AMQ Streams Kafka Bridge には同じ OpenShift クラスターで実行しているアプリケーションのみがアクセスできます。これらのアプリケーションは、kafka-bridge-name-bridge-service サービスを使用して API にアクセスします。

OpenShift クラスター外部で実行しているアプリケーションに Kafka Bridge がアクセスできるようにする場合は、以下の機能のいずれかを使用して Kafka Bridge を手動で公開できます。

  • LoadBalancer または NodePort タイプのサービス
  • Ingress リソース
  • OpenShift ルート

サービスを作成する場合には、selector で以下のラベルを使用して、サービスがトラフィックをルーティングする Pod を設定します。

  # ...
  selector:
    strimzi.io/cluster: kafka-bridge-name 1
    strimzi.io/kind: KafkaBridge
  #...
1
OpenShift クラスターでの Kafka Bridge カスタムリソースの名前。

7.1.5. Kafka Bridge へのリクエスト

データ形式と HTTP ヘッダーを指定し、有効な要求が Kafka Bridge に送信されるようにします。

7.1.5.1. コンテンツタイプヘッダー

API 要求および応答本文は、常に JSON としてエンコードされます。

  • コンシューマー操作の実行時に、POST 要求の本文が空でない場合は、以下の Content-Type ヘッダーが含まれている必要があります。

    Content-Type: application/vnd.kafka.v2+json
  • プロデューサー操作の実行時に、POST リクエストは、以下の表のように、json または binary のいずれかの 埋め込みデータ形式 を指定する Content-Type ヘッダーが含まれている必要があります。

    埋め込みデータ形式Content-Type ヘッダー

    JSON

    Content-Type: application/vnd.kafka.json.v2+json

    バイナリー

    Content-Type: application/vnd.kafka.binary.v2+json

consumer/groupid エンドポイントを使用してコンシューマーを作成するときに、埋め込みデータ形式を設定します。詳細については、次のセクションを参照してください。

POST リクエストに空の本文がある場合は、Content-Type を設定しないでください。空の本文を使用して、デフォルト値のコンシューマーを作成できます。

7.1.5.2. 埋め込みデータ形式

埋め込みデータ形式は、Kafka メッセージが Kafka Bridge によりプロデューサーからコンシューマーに HTTP で送信される際の形式です。サポート対象の埋め込みデータ形式には、JSON とバイナリーの 2 種類があります。

/consumers/groupid エンドポイントを使用してコンシューマーを作成する場合、POST 要求本文で JSON またはバイナリーいずれかの埋め込みデータ形式を指定する必要があります。これは、以下の例のように format フィールドで指定します。

{
  "name": "my-consumer",
  "format": "binary", 1
...
}
1
バイナリー埋め込みデータ形式。

コンシューマーの作成時に指定する埋め込みデータ形式は、コンシューマーが消費する Kafka メッセージのデータ形式と一致する必要があります。

バイナリー埋め込みデータ形式を指定する場合は、以降のプロデューサー要求で、要求本文にバイナリーデータが Base64 でエンコードされた文字列として含まれる必要があります。たとえば、/topics/topicname エンドポイントを使用してメッセージを送信する場合は、records.value を Base64 でエンコードする必要があります。

{
  "records": [
    {
      "key": "my-key",
      "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ="
    },
  ]
}

プロデューサー要求には、埋め込みデータ形式に対応する Content-Type ヘッダーも含まれる必要があります (例: Content-Type: application/vnd.kafka.binary.v2+json)。

7.1.5.3. Accept ヘッダー

コンシューマーを作成したら、以降のすべての GET 要求には Accept ヘッダーが以下のような形式で含まれる必要があります。

Accept: application/vnd.kafka.embedded-data-format.v2+json

embedded-data-formatjson または binary のいずれかです。

たとえば、サブスクライブされたコンシューマーのレコードを JSON 埋め込みデータ形式で取得する場合、この Accept ヘッダーが含まれるようにします。

Accept: application/vnd.kafka.json.v2+json

7.1.6. Kafka Bridge API リソース

リクエストやレスポンスの例などを含む REST API エンドポイントおよび説明の完全リストは、Kafka Bridge の API リファレンス を参照してください。

7.1.7. Kafka Bridge デプロイメント

Cluster Operator を使用して、Kafka Bridge を OpenShift クラスターにデプロイします。

Kafka Bridge をデプロイすると、Cluster Operator により OpenShift クラスターに Kafka Bridge オブジェクトが作成されます。オブジェクトには、デプロイメントサービス、および Pod が含まれ、それぞれ Kafka Bridge のカスタムリソースに付与された名前が付けられます。

関連情報