第4章 Kafka へのセキュアなアクセスの管理

各クライアントの Kafka ブローカーへのアクセスを管理することで、Kafka クラスターを保護できます。

Kafka ブローカーとクライアント間のセキュアな接続には、以下が含まれます。

  • データ交換の暗号化
  • アイデンティティー証明に使用する認証
  • ユーザーが実行するアクションを許可または拒否する承認

本章では、以下を取り上げ、Kafka ブローカーとクライアント間でセキュアな接続を設定する方法を説明します。

  • Kafka クラスターおよびクライアントのセキュリティーオプション
  • Kafka ブローカーをセキュアにする方法
  • OAuth 2.0 トークンベースの認証および承認に承認サーバーを使用する方法

4.1. Kafka のセキュリティーオプション

Kafka リソースを使用して、Kafka の認証および承認に使用されるメカニズムを設定します。

4.1.1. リスナー認証

OpenShift クラスター内のクライアントの場合は、plain (暗号化なし) または tls 内部 リスナーを作成できます。

OpenShift クラスター外のクライアントの場合は、外部 リスナーを作成して接続メカニズムを指定します。接続メカニズムは nodeportloadbalanceringress、または route (OpenShift の場合) のいずれかです。

外部クライアントを接続するための設定オプションの詳細は、「外部リスナーの設定」を参照してください。

サポートされる認証オプションは次のとおりです。

  1. 相互 TLS 認証 (TLS による暗号化が有効なリスナーのみ)
  2. SCRAM-SHA-512 認証
  3. OAuth 2.0 のトークンベースの認証

選択する認証オプションは、Kafka ブローカーへのクライアントアクセスを認証する方法によって異なります。

図4.1 Kafka リスナーの認証オプション

options for listener authentication configuration

リスナーの authentication プロパティーは、そのリスナーに固有の認証メカニズムを指定するために使用されます。

authentication プロパティーが指定されていない場合、リスナーはそのリスナー経由で接続するクライアントを認証しません。認証がないと、リスナーではすべての接続が許可されます。

認証は、User Operator を使用して KafkaUsers を管理する場合に設定する必要があります。

以下の例で指定されるものは次のとおりです。

  • SCRAM-SHA-512 認証に設定された plain リスナー
  • 相互 TLS 認証を使用する tls リスナー
  • 相互 TLS 認証を使用する external リスナー

各リスナーは、Kafka クラスター内で一意の名前およびポートで設定されます。

注記

ブローカー間通信 (9091) およびメトリクス (9404) 用に確保されたポートを使用するようにリスナーを設定することはできません。

リスナー認証設定の例

# ...
listeners:
  - name: plain
    port: 9092
    type: internal
    tls: true
    authentication:
      type: scram-sha-512
  - name: tls
    port: 9093
    type: internal
    tls: true
    authentication:
      type: tls
  - name: external
    port: 9094
    type: loadbalancer
    tls: true
    authentication:
      type: tls
# ...

4.1.1.1. 相互 TLS 認証

相互 TLS 認証は、Kafka ブローカーと ZooKeeper Pod 間の通信で常に使用されます。

AMQ Streams では、Kafka が TLS (Transport Layer Security) を使用して、相互認証の有無を問わず、Kafka ブローカーとクライアントとの間で暗号化された通信が行われるよう設定できます。相互 (双方向) 認証の場合、サーバーとクライアントの両方が証明書を提示します。相互認証を設定すると、ブローカーはクライアントを認証し (クライアント認証)、クライアントはブローカーを認証します (サーバー認証)。

注記

TLS 認証は一般的には一方向で、一方が他方のアイデンティティーを認証します。たとえば、Web ブラウザーと Web サーバーの間で HTTPS が使用される場合、ブラウザーは Web サーバーのアイデンティティーの証明を取得します。

4.1.1.2. SCRAM-SHA-512 認証

SCRAM (Salted Challenge Response Authentication Mechanism) は、パスワードを使用して相互認証を確立できる認証プロトコルです。AMQ Streams では、Kafka が SASL (Simple Authentication and Security Layer) SCRAM-SHA-512 を使用するよう設定し、暗号化されていないクライアントの接続と暗号化されたクライアントの接続の両方で認証を提供できます。

TLS クライアント接続で SCRAM-SHA-512 認証が使用される場合、TLS プロトコルは暗号化を提供しますが、認証には使用されません。

SCRAM の以下のプロパティーは、暗号化されていない接続でも SCRAM-SHA-512 を安全に使用できるようにします。

  • 通信チャネル上では、パスワードはクリアテキストで送信されません。代わりに、クライアントとサーバーはお互いにチャレンジを生成し、認証するユーザーのパスワードを認識していることを証明します。
  • サーバーとクライアントは、認証を交換するたびに新しいチャレンジを生成します。よって、この交換はリレー攻撃に対する回復性を備えています。

KafkaUser.spec.authentication.typescram-sha-512 に設定すると、User Operator によって、大文字と小文字の ASCII 文字と数字で構成された無作為の 12 文字のパスワードが生成されます。

4.1.1.3. ネットワークポリシー

AMQ Streams では、Kafka ブローカーで有効になっているリスナーごとに NetworkPolicy リソースが自動的に作成されます。デフォルトでは、すべてのアプリケーションと namespace にアクセスする権限が NetworkPolicy によってリスナーに付与されます。

ネットワークレベルでのリスナーへのアクセスを指定のアプリケーションまたは namespace のみに制限するには、networkPolicyPeers プロパティーを使用します。

リスナーの認証設定の一部としてネットワークポリシーを使用します。リスナーごとに、異なる networkPolicyPeers 設定を指定できます。

詳細は、「リスナーネットワークポリシー」のセクションおよび 「NetworkPolicyPeer API reference」を参照してください。

注記

AMQ Streams でネットワークポリシーを使用するには、ingress NetworkPolicies が OpenShift の設定でサポートされる必要があります。

4.1.1.4. 追加のリスナー設定オプション

GenericKafkaListenerConfiguration スキーマのプロパティーを使用して、設定をリスナーに追加できます。

4.1.2. Kafka の承認

Kafka.spec.kafka リソースの authorization プロパティーを使用して Kafka ブローカーの承認を設定できます。authorization プロパティーがないと、承認が有効にならず、クライアントの制限はありません。承認を有効にすると、承認は有効なすべてのリスナーに適用されます。承認方法は type フィールドで定義されます。

サポートされる承認オプションは次のとおりです。

図4.2 Kafka クラスター承認オプション

options for kafks authorization configuration

4.1.2.1. スーパーユーザー

スーパーユーザーは、アクセスの制限に関係なく Kafka クラスターのすべてのリソースにアクセスでき、すべての承認メカニズムでサポートされます。

Kafka クラスターのスーパーユーザーを指定するには、superUsers プロパティーにユーザープリンシパルのリストを追加します。ユーザーが TLS クライアント認証を使用する場合、ユーザー名は CN= で始まる証明書のサブジェクトの共通名になります。

スーパーユーザーを使用した設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: myproject
spec:
  kafka:
    # ...
    authorization:
      type: simple
      superUsers:
        - CN=client_1
        - user_2
        - CN=client_3
    # ...

4.2. Kafka クライアントのセキュリティーオプション

KafkaUser リソースを使用して、Kafka クライアントの認証メカニズム、承認メカニズム、およびアクセス権限を設定します。セキュリティーの設定では、クライアントはユーザーとして表されます。

Kafka ブローカーへのユーザーアクセスを認証および承認できます。認証によってアクセスが許可され、承認によって許容されるアクションへのアクセスが制限されます。

Kafka ブローカーへのアクセスが制限されない スーパーユーザー を作成することもできます。

認証および承認メカニズムは、Kafka ブローカーへのアクセスに使用されるリスナーの仕様 と一致する必要があります。

4.2.1. ユーザー処理用の Kafka クラスターの特定

KafkaUser リソースには、このリソースが属する Kafka クラスターに適した名前 (Kafka リソースの名前から派生) を定義するラベルが含まれています。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster

このラベルは、KafkaUser リソースを特定し、新しいユーザーを作成するために、User Operator によって使用されます。また、以降のユーザーの処理でも使用されます。

ラベルが Kafka クラスターと一致しない場合、User Operator は KafkaUser を識別できず、ユーザーは作成されません。

KafkaUser リソースのステータスが空のままであれば、ラベルを確認してください。

4.2.2. ユーザー認証

ユーザー認証は、KafkaUser.specauthentication プロパティーを使用して設定されます。ユーザーに有効な認証メカニズムは、type フィールドを使用して指定されます。

サポートされる認証メカニズム

  • TLS クライアント認証
  • SCRAM-SHA-512 認証

認証メカニズムを指定しないと、User Operator によってユーザーまたはそのクレデンシャルが作成されません。

4.2.2.1. TLS クライアント認証

TLS クライアント認証を使用するには、type フィールドを tls に設定します。

TLS クライアント認証が有効になっている KafkaUser の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  # ...

ユーザーが User Operator によって作成されると、KafkaUser リソースと同じ名前で新しいシークレットが作成されます。Secret には、TLS クライアント認証の秘密鍵と公開鍵が含まれます。公開鍵は、クライアント認証局 (CA) によって署名されたユーザー証明書に含まれます。

すべての鍵は X.509 形式です。

Secret には、PEM 形式および PKCS #12 形式の秘密鍵と証明書が含まれます。

Kafka と Secret との通信をセキュアにする方法については、11章TLS 証明書の管理 を参照してください。

ユーザークレデンシャルのある Secret の例

apiVersion: v1
kind: Secret
metadata:
  name: my-user
  labels:
    strimzi.io/kind: KafkaUser
    strimzi.io/cluster: my-cluster
type: Opaque
data:
  ca.crt: # Public key of the client CA
  user.crt: # User certificate that contains the public key of the user
  user.key: # Private key of the user
  user.p12: # PKCS #12 archive file for storing certificates and keys
  user.password: # Password for protecting the PKCS #12 archive file

4.2.2.2. SCRAM-SHA-512 認証

SCRAM-SHA-512 認証メカニズムを使用するには、type フィールドを scram-sha-512 に設定します。

SCRAM-SHA-512 認証が有効になっている KafkaUser の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
  # ...

ユーザーが User Operator によって作成されると、KafkaUser リソースと同じ名前で新しいシークレットが作成されます。シークレットの password キーには、生成されたパスワードが含まれ、base64 でエンコードされます。パスワードを使用するにはデコードする必要があります。

ユーザークレデンシャルのある Secret の例

apiVersion: v1
kind: Secret
metadata:
  name: my-user
  labels:
    strimzi.io/kind: KafkaUser
    strimzi.io/cluster: my-cluster
type: Opaque
data:
  password: Z2VuZXJhdGVkcGFzc3dvcmQ= 1
  sasl.jaas.config: b3JnLmFwYWNoZS5rYWZrYS5jb21tb24uc2VjdXJpdHkuc2NyYW0uU2NyYW1Mb2dpbk1vZHVsZSByZXF1aXJlZCB1c2VybmFtZT0ibXktdXNlciIgcGFzc3dvcmQ9ImdlbmVyYXRlZHBhc3N3b3JkIjsK 2

1
base64 でエンコードされた生成されたパスワード。
2
base64 でエンコードされた SASL SCRAM-SHA-512 認証の JAAS 設定文字列。

生成されたパスワードをデコードします。

echo "Z2VuZXJhdGVkcGFzc3dvcmQ=" | base64 --decode

4.2.3. ユーザーの承認

ユーザーの承認は、KafkaUser.specauthorization プロパティーを使用して設定されます。ユーザーに有効な承認タイプは、type フィールドを使用して指定します。

簡易承認を使用するには、KafkaUser.spec.authorizationtype プロパティーを simple に設定します。簡易承認では、デフォルトの Kafka 承認プラグインである AclAuthorizer が使用されます。

代わりに、OPA 承認 を使用することもできます。OAuth 2.0 トークンベースの認証をすでに使用している場合は、OAuth 2.0 承認 を使用することもできます。

承認が指定されていない場合は、User Operator によるユーザーのアクセス権限のプロビジョニングは行われません。KafkaUser がリソースにアクセスできるかどうかは、使用されているオーソライザーによって異なります。たとえば、AclAuthorizer の場合は、allow.everyone.if.no.acl.found 設定によって決定されます。

4.2.3.1. ACL ルール

AclAuthorizer は、ACL ルールを使用して Kafka ブローカーへのアクセスを管理します。

ACL ルールによって、acls プロパティーで指定したユーザーにアクセス権限が付与されます。

AclRule オブジェクトの詳細は、「AclRule スキーマ参照」を参照してください。

4.2.3.2. Kafka ブローカーへのスーパーユーザーアクセス

ユーザーを Kafka ブローカー設定のスーパーユーザーのリストに追加すると、KafkaUser の ACL で定義された承認制約に関係なく、そのユーザーにはクラスターへのアクセスが無制限に許可されます。

ブローカーへのスーパーユーザーアクセスの設定に関する詳細は「Kafka の承認」を参照してください。

4.2.3.3. ユーザークォータ

KafkaUser リソースの spec を設定してクォータを強制し、バイトしきい値または CPU 使用の時間制限に基づいてユーザーが Kafka ブローカーへのアクセスを超過しないようにすることができます。

ユーザークォータをともなう KafkaUser の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  # ...
  quotas:
    producerByteRate: 1048576 1
    consumerByteRate: 2097152 2
    requestPercentage: 55 3

1
ユーザーが Kafka ブローカーにプッシュできるデータ量の、秒あたりのバイトクォータ。
2
ユーザーが Kafka ブローカーからフェッチできるデータ量の、秒あたりのバイトクォータ。
3
クライアントグループあたりの時間割合で示される、CPU 使用制限。

これらのプロパティーの詳細は、KafkaUserQuotas スキーマ参照」を参照してください。

4.3. Kafka ブローカーへのアクセスのセキュア化

Kafka ブローカーへのセキュアなアクセスを確立するには、以下を設定し、適用します。

  • Kafka リソースが以下を行うようにします。

    • 指定された認証タイプでリスナーを作成します。
    • Kafka クラスター全体の承認を設定します。
  • リスナーを使用して Kafka ブローカーにセキュアにアクセスするための KafkaUser リソース。

Kafka リソースで以下を設定します。

  • リスナー認証
  • Kafka リスナーへのアクセスを制限するネットワークポリシー
  • Kafka の承認
  • ブローカーへのアクセスが制限されないスーパーユーザー

認証は、リスナーごとに独立して設定されます。承認は、常に Kafka クラスター全体に対して設定されます。

Cluster Operator はリスナーを作成し、クラスターおよびクライアント認証局 (CA) 証明書を設定して Kafka クラスター内で認証を有効にします。

独自の証明書をインストールして、Cluster Operator によって生成された証明書を置き換えることができます。外部認証局によって管理される Kafka リスナー証明書を使用するようにリスナーを設定することもできます。PKCS #12 形式 (.p12) および PEM 形式 (.crt) の証明書を利用できます。

KafkaUser を使用して、Kafka にアクセスするために特定のクライアントが使用する認証および承認メカニズムを有効にします。

KafkaUser リソースで以下を設定します。

  • 有効なリスナー認証と一致する認証
  • 有効な Kafka 承認と一致する承認
  • クライアントによるリソースの使用を制御するクォータ

User Operator はクライアントに対応するユーザーを作成すると共に、選択した認証タイプに基づいて、クライアント認証に使用されるセキュリティークレデンシャルを作成します。

その他のリソース

スキーマの詳細は、以下を参照してください。

4.3.1. Kafka ブローカーのセキュア化

この手順では、AMQ Streams の実行時に Kafka ブローカーをセキュアにするためのステップを説明します。

Kafka ブローカーに実装されたセキュリティーは、アクセスを必要とするクライアントに実装されたセキュリティーとの互換性を維持する必要があります。

  • Kafka.spec.kafka.listeners[*].authenticationKafkaUser.spec.authentication と一致します。
  • Kafka.spec.kafka.authorizationKafkaUser.spec.authorization と一致します。

この手順では、TLS 認証を使用した簡易承認とリスナーの設定を説明します。リスナーの設定に関する詳細は、「GenericKafkaListener スキーマ参照」を参照してください。

代わりに、リスナー認証 には SCRAM-SHA または OAuth 2.0、Kafka 承認 には OAuth 2.0 または OPA を使用することができます。

手順

  1. Kafka リソースを設定します。

    1. 承認には authorization プロパティーを設定します。
    2. listeners プロパティーを設定し、認証のあるリスナーを作成します。

      以下に例を示します。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      spec:
        kafka:
          # ...
          authorization: 1
            type: simple
            superUsers: 2
              - CN=client_1
              - user_2
              - CN=client_3
          listeners:
            - name: tls
              port: 9093
              type: internal
              tls: true
              authentication:
                type: tls 3
          # ...
        zookeeper:
          # ...
      1
      2
      Kafka へのアクセスを制限されないユーザープリンシパルのリスト。CN は、TLS による認証が使用される場合のクライアント証明書の共通名です。
      3
      リスナーの認証メカニズムは各リスナーに対して設定でき、相互 TLS、SCRAM-SHA-512、またはトークンベース OAuth 2.0 として指定 できます。

      外部リスナーを設定している場合、設定は選択した接続のメカニズムによって異なります。

  2. Kafka リソースを作成または更新します。

    oc apply -f KAFKA-CONFIG-FILE

    Kafka クラスターは、TLS 認証を使用する Kafka ブローカーリスナーと共に設定されます。

    Kafka ブローカー Pod ごとにサービスが作成されます。

    サービスが作成され、Kafka クラスターに接続するための ブートストラップアドレス として機能します。

    kafka ブローカーの ID を検証するためのクラスター CA 証明書も、Kafka リソースと同じ名前で作成されます。

4.3.2. Kafka へのユーザーアクセスのセキュア化

KafkaUser リソースのプロパティーを使用して Kafka ユーザーを設定します。

oc apply を使用すると、ユーザーを作成または編集できます。oc delete を使用すると、既存のユーザーを削除できます。

以下に例を示します。

  • oc apply -f USER-CONFIG-FILE
  • oc delete KafkaUser USER-NAME

KafkaUser 認証および承認メカニズムを設定する場合、必ず同等の Kafka 設定と一致するようにしてください。

  • KafkaUser.spec.authenticationKafka.spec.kafka.listeners[*].authentication と一致
  • KafkaUser.spec.authorizationKafka.spec.kafka.authorization と一致

この手順では、TLS 認証でユーザーを作成する方法を説明します。SCRAM-SHA 認証でユーザーを作成することも可能です。

必要な認証は、Kafka ブローカーリスナーに設定された認証のタイプ によって異なります。

注記

Kafka ユーザーと Kafka ブローカー間の認証は、それぞれの認証設定によって異なります。たとえば、TLS が Kafka 設定で有効になっていない場合は、TLS でユーザーを認証できません。

前提条件

KafkaUser の認証タイプは、Kafka ブローカーで設定された認証と一致する必要があります。

手順

  1. KafkaUser リソースを設定します。

    以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      authentication: 1
        type: tls
      authorization:
        type: simple 2
        acls:
          - resource:
              type: topic
              name: my-topic
              patternType: literal
            operation: Read
          - resource:
              type: topic
              name: my-topic
              patternType: literal
            operation: Describe
          - resource:
              type: group
              name: my-group
              patternType: literal
            operation: Read
    1
    相互 tls または scram-sha-512 として定義されたユーザー認証メカニズム。
    2
    ACL ルールのリストが必要な簡易承認。
  2. KafkaUser リソースを作成または更新します。

    oc apply -f USER-CONFIG-FILE

    KafkaUser リソースと同じ名前の Secret と共に、ユーザーが作成されます。Secret には、TLS クライアント認証の秘密鍵と公開鍵が含まれます。

Kafka ブローカーへの接続をセキュアにするために Kafka クライアントをプロパティーで設定する詳細は『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「OpenShift 外クライアントのアクセスの設定」を参照してください。

4.3.3. ネットワークポリシーを使用した Kafka リスナーへのアクセス制限

networkPolicyPeers プロパティーを使用すると、リスナーへのアクセスを指定のアプリケーションのみに制限できます。

前提条件

  • Ingress NetworkPolicies をサポートする OpenShift クラスター。
  • Cluster Operator が稼働している必要があります。

手順

  1. Kafka リソースを開きます。
  2. networkPolicyPeers プロパティーで、Kafka クラスターへのアクセスが許可されるアプリケーション Pod または namespace を定義します。

    以下は、ラベル appkafka-client に設定されているアプリケーションからの接続のみを許可するよう tls リスナーを設定する例になります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
            networkPolicyPeers:
              - podSelector:
                  matchLabels:
                    app: kafka-client
        # ...
      zookeeper:
        # ...
  3. リソースを作成または更新します。

    oc apply を使用します。

    oc apply -f your-file

その他のリソース

4.4. OAuth 2.0 トークンベース認証の使用

AMQ Streams は、OAUTHBEARER および PLAIN メカニズムを使用して OAuth 2.0 認証の使用をサポートします。

OAuth 2.0 は、アプリケーション間で標準的なトークンベースの認証および承認を有効にし、中央の承認サーバーを使用してリソースに制限されたアクセス権限を付与するトークンを発行します。

Kafka ブローカーおよびクライアントの両方が OAuth 2.0 を使用するように設定する必要があります。OAuth 2.0 認証を設定した後に OAuth 2.0 承認 を設定できます。

注記

OAuth 2.0 認証は、Kafka 承認 と併用できます。

OAuth 2.0 認証を使用すると、アプリケーションクライアントはアカウントのクレデンシャルを公開せずにアプリケーションサーバー (リソースサーバー と呼ばれる) のリソースにアクセスできます。

アプリケーションクライアントは、アクセストークンを認証の手段として渡します。アプリケーションサーバーはこれを使用して、付与するアクセス権限のレベルを決定することもできます。承認サーバーは、アクセスの付与とアクセスに関する問い合わせを処理します。

AMQ Streams のコンテキストでは以下が行われます。

  • Kafka ブローカーは OAuth 2.0 リソースサーバーとして動作します。
  • Kafka クライアントは OAuth 2.0 アプリケーションクライアントとして動作します。

Kafka クライアントは Kafka ブローカーに対して認証を行います。ブローカーおよびクライアントは、必要に応じて OAuth 2.0 承認サーバーと通信し、アクセストークンを取得または検証します。

AMQ Streams のデプロイメントでは、OAuth 2.0 インテグレーションは以下を提供します。

  • Kafka ブローカーのサーバー側 OAuth 2.0 サポート。
  • Kafka MirrorMaker、Kafka Connect、および Kafka Bridge のクライアント側 OAuth 2.0 サポート。

その他のリソース

4.4.1. OAuth 2.0 認証メカニズム

AMQ Streams は、OAuth 2.0 認証で OAUTHBEARER および PLAIN メカニズムをサポートします。どちらのメカニズムも、Kafka クライアントが Kafka ブローカーで認証されたセッションを確立できるようにします。クライアント、承認サーバー、および Kafka ブローカー間の認証フローは、メカニズムごとに異なります。

可能な限り、OAUTHBEARER を使用するようにクライアントを設定することが推奨されます。OAUTHBEARER では、クライアントクレデンシャルは Kafka ブローカーと共有されることがないため、PLAIN よりも高レベルのセキュリティーが提供されます。OAUTHBEARER をサポートしない Kafka クライアントの場合のみ、PLAIN の使用を検討してください。

必要であれば、同じ oauth リスナーで OAUTHBEARER と PLAIN を両方有効にすることができます。

OAUTHBEARER の概要

Kafka は OAUTHBEARER 認証メカニズムをサポートしますが、明示的に設定する必要があります。また、多くの Kafka クライアントツールでは、プロトコルレベルで OAUTHBEARER の基本サポートを提供するライブラリーを使用します。

AMQ Streams では、アプリケーションの開発を容易にするため、アップストリームの Kafka Client Java ライブラリー用の OAuth コールバックハンドラーが提供されます (ただし、他のライブラリーには提供されません)。そのため、このようなクライアントには独自のコールバックハンドラーを作成する必要はありません。アプリケーションクライアントはコールバックハンドラーを使用してアクセストークンを提供できます。Go などの他言語で書かれたクライアントは、カスタムコードを使用して承認サーバーに接続し、アクセストークンを取得する必要があります。

OAUTHBEARER を使用する場合、クライアントはクレデンシャルを交換するために Kafka ブローカーでセッションを開始します。ここで、クレデンシャルはコールバックハンドラーによって提供されるベアラートークンの形式を取ります。コールバックを使用して、以下の 3 つの方法のいずれかでトークンの提供を設定できます。

  • クライアント ID および Secret (OAuth 2.0 クライアントクレデンシャルメカニズムを使用)
  • 設定時に手動で取得された有効期限の長いアクセストークン
  • 設定時に手動で取得された有効期限の長い更新トークン

OAUTHBEARER は、Kafka ブローカーの oauth リスナー設定で自動的に有効になります。enableOauthBearer プロパティーを true に設定できますが、これは必須ではありません。

  # ...
  authentication:
    type: oauth
    # ...
    enableOauthBearer: true
注記

OAUTHBEARER 認証は、プロトコルレベルで OAUTHBEARER メカニズムをサポートする Kafka クライアントでのみ使用できます。

PLAIN の概要

PLAIN は、すべての Kafka クライアントツール (kafkacat などの開発者ツールを含む) によって使用される簡易認証メカニズムです。PLAIN を OAuth 2.0 認証とともに使用できるようにするため、AMQ Streams にはサーバー側のコールバックが含まれ、この OAuth 2.0 over PLAIN を呼び出します。

PLAIN の AMQ Streams 実装では、クライアントのクレデンシャルは ZooKeeper に保存されません。代わりに、OAUTHBEARER 認証が使用される場合と同様に、クライアントのクレデンシャルは準拠した承認サーバーの背後で一元的に処理されます。

OAuth 2.0 over PLAIN コールバックを併用する場合、以下のいずれかの方法を使用して Kafka クライアントは Kafka ブローカーで認証されます。

  • クライアント ID およびシークレット (OAuth 2.0 クライアントクレデンシャルメカニズムを使用)
  • 設定時に手動で取得された有効期限の長いアクセストークン

PLAIN 認証を使用し、username および password を提供するように、クライアントを有効にする必要があります。パスワードの最初に $accessToken: が付けられ、その後にアクセストークンの値が続く場合は、Kafka ブローカーはパスワードをアクセストークンとして解釈します。それ以外の場合は、Kafka ブローカーは username をクライアント ID として解釈し、password をクライアントシークレットとして解釈します。

password がアクセストークンとして設定されている場合、username は Kafka ブローカーによってアクセストークンから取得されるプリンシパル名と同じになるように設定される必要があります。このプロセスは、userNameClaimfallbackUserNameClaimfallbackUsernamePrefix、または userInfoEndpointUri を使用してユーザー名の抽出を設定する方法によって異なります。また、承認サーバーによっても異なり、特にクライアント ID をアカウント名にマッピングする方法によります。

PLAIN を使用するには、Kafka ブローカーの oauth リスナー設定で有効にする必要があります。

以下の例では、デフォルトで有効になっている OAUTHBEARER に加え、PLAIN も有効になっています。PLAIN のみを使用する場合は、enableOauthBearerfalse に設定して OAUTHBEARER を無効にすることができます。

  # ...
  authentication:
    type: oauth
    # ...
    enablePlain: true
    tokenEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/token

4.4.2. OAuth 2.0 Kafka ブローカーの設定

OAuth 2.0 の Kafka ブローカー設定には、以下が関係します。

  • 承認サーバーでの OAuth 2.0 クライアントの作成
  • Kafka カスタムリソースでの OAuth 2.0 認証の設定
注記

承認サーバーに関連する Kafka ブローカーおよび Kafka クライアントはどちらも OAuth 2.0 クライアントと見なされます。

4.4.2.1. 承認サーバーの OAuth 2.0 クライアント設定

セッションの開始中に受信されたトークンを検証するように Kafka ブローカーを設定するには、承認サーバーで OAuth 2.0 の クライアント 定義を作成し、以下のクライアントクレデンシャルが有効な状態で 機密情報 として設定することが推奨されます。

  • kafka のクライアント ID (例)
  • 認証メカニズムとしてのクライアント ID およびシークレット
注記

承認サーバーのパブリックでないイントロスペクションエンドポイントを使用する場合のみ、クライアント ID およびシークレットを使用する必要があります。高速のローカル JWT トークンの検証と同様に、パブリック承認サーバーのエンドポイントを使用する場合は、通常クレデンシャルは必要ありません。

4.4.2.2. Kafka クラスターでの OAuth 2.0 認証設定

Kafka クラスターで OAuth 2.0 認証を使用するには、たとえば、認証方法が oauth の Kafka クラスターカスタムリソースの TLS リスナー設定を指定します。

OAuth 2.0 の認証方法タイプの割り当て

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    # ...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
      #...

plaintls、および external リスナーを設定できますが、OAuth 2.0 では TLS による暗号化が無効になっている plain リスナーまたは external リスナーを使用しないことが推奨されます。これは、ネットワークでのデータ漏えいの脆弱性や、トークンの盗難による不正アクセスへの脆弱性が発生するためです。

external リスナーを type: oauth で設定し、セキュアなトランスポート層がクライアントと通信するようにします。

OAuth 2.0 の外部リスナーとの使用

# ...
listeners:
  - name: external
    port: 9094
    type: loadbalancer
    tls: true
    authentication:
      type: oauth
    #...

tls プロパティーはデフォルトで false に設定されているため、有効にする必要があります。

認証のタイプを OAuth 2.0 として定義した場合、検証のタイプに基づいて、 高速のローカル JWT 検証 または イントロスペクションエンドポイントを使用したトークンの検証 のいずれかとして、設定を追加します。

説明や例を用いてリスナー向けに OAuth 2.0 を設定する手順は、「Kafka ブローカーの OAuth 2.0 サポートの設定」を参照してください。

4.4.2.3. 高速なローカル JWT トークン検証の設定

高速なローカル JWT トークンの検証では、JWTトークンの署名がローカルでチェックされます。

ローカルチェックでは、トークンに対して以下が確認されます。

  • アクセストークンに Bearer の (typ) 要求値が含まれ、トークンがタイプに準拠することを確認します。
  • 有効であるか (期限切れでない) を確認します。
  • トークンに validIssuerURI と一致する発行元があることを確認します。

承認サーバーによって発行されなかったすべてのトークンが拒否されるよう、リスナーの設定時に validIssuerURI 属性を指定します。

高速のローカル JWT トークン検証の実行中に、承認サーバーの通信は必要はありません。OAuth 2.0 の承認サーバーによって公開されるエンドポイントの jwksEndpointUri 属性を指定して、高速のローカル JWT トークン検証をアクティベートします。エンドポイントには、署名済み JWT トークンの検証に使用される公開鍵が含まれます。これらは、Kafka クライアントによってクレデンシャルとして送信されます。

注記

承認サーバーとの通信はすべて TLS による暗号化を使用して実行する必要があります。

証明書トラストストアを AMQ Streams プロジェクト namespace の OpenShift シークレットとして設定し、tlsTrustedCertificates 属性を使用してトラストストアファイルが含まれる OpenShift シークレットを示すことができます。

JWT トークンからユーザー名を適切に取得するため、userNameClaim の設定を検討してください。Kafka ACL 承認を使用する場合は、認証中にユーザー名でユーザーを特定する必要があります。JWT トークンの sub 要求は、通常は一意な ID でユーザー名ではありません。

高速なローカル JWT トークン検証の設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    #...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
          validIssuerUri: <https://<auth-server-address>/auth/realms/tls>
          jwksEndpointUri: <https://<auth-server-address>/auth/realms/tls/protocol/openid-connect/certs>
          userNameClaim: preferred_username
          maxSecondsWithoutReauthentication: 3600
          tlsTrustedCertificates:
          - secretName: oauth-server-cert
            certificate: ca.crt
    #...

4.4.2.4. OAuth 2.0 イントロスペクションエンドポイントの設定

OAuth 2.0 のイントロスペクションエンドポイントを使用したトークンの検証では、受信したアクセストークンは不透明として対処されます。Kafka ブローカーは、アクセストークンをイントロスペクションエンドポイントに送信します。このエンドポイントは、検証に必要なトークン情報を応答として返します。ここで重要なのは、特定のアクセストークンが有効である場合は最新情報を返すことで、トークンの有効期限に関する情報も返します。

OAuth 2.0 のイントロスペクションベースの検証を設定するには、高速のローカル JWT トークン検証に指定された jwksEndpointUri 属性ではなく、introspectionEndpointUri 属性を指定します。通常、イントロスペクションエンドポイントは保護されているため、承認サーバーに応じて clientId および clientSecret を指定する必要があります。

イントロスペクションエンドポイントの設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
          clientId: kafka-broker
          clientSecret:
            secretName: my-cluster-oauth
            key: clientSecret
          validIssuerUri: <https://<auth-server-address>/auth/realms/tls>
          introspectionEndpointUri: <https://<auth-server-address>/auth/realms/tls/protocol/openid-connect/token/introspect>
          userNameClaim: preferred_username
          maxSecondsWithoutReauthentication: 3600
          tlsTrustedCertificates:
          - secretName: oauth-server-cert
            certificate: ca.crt

4.4.3. Kafka ブローカーの再認証の設定

Kafka クライアントと Kafka ブローカー間の OAuth 2.0 セッションに Kafka セッション再認証 を使用するように、oauth リスナーを設定できます。このメカニズムは、定義された期間後に、クライアントとブローカー間の認証されたセッションを期限切れにします。セッションの有効期限が切れると、クライアントは既存のコネクションを破棄せずに再使用して、新しいセッションを即座に開始します。

セッションの再認証はデフォルトで無効になっています。これを有効にするには、oauth リスナー設定で maxSecondsWithoutReauthentication に時間の値を設定します。OAUTHBEARER および PLAIN 認証では、同じプロパティーを使用してセッションの再認証が設定されます。設定例については、「Kafka ブローカーの OAuth 2.0 サポートの設定」 を参照してください。

セッションの再認証は、クライアントによって使用される Kafka クライアントライブラリーによってサポートされる必要があります。

セッションの再認証は、高速ローカル JWT またはイントロスペクションエンドポイントのトークン検証と使用できます。

クライアントの再認証

ブローカーの認証されたセッションが期限切れになると、クライアントは接続を切断せずに新しい有効なアクセストークンをブローカーに送信し、既存のセッションを再認証する必要があります。

トークンの検証に成功すると、既存の接続を使用して新しいクライアントセッションが開始されます。クライアントが再認証に失敗した場合、さらにメッセージを送受信しようとすると、ブローカーは接続を閉じます。ブローカーで再認証メカニズムが有効になっていると、Kafka クライアントライブラリー 2.2 以降を使用する Java クライアントが自動的に再認証されます。

更新トークンが使用される場合、セッションの再認証は更新トークンにも適用されます。セッションが期限切れになると、クライアントは更新トークンを使用してアクセストークンを更新します。その後、クライアントは新しいアクセストークンを使用して既存のセッションに再認証されます。

OAUTHBEARER および PLAIN のセッションの有効期限

セッションの再認証が設定されている場合、OAUTHBEARER と PLAIN 認証ではセッションの有効期限は異なります。

クライアント ID とシークレットによる方法を使用する OAUTHBEARER および PLAIN の場合:

  • ブローカーの認証されたセッションは、設定された maxSecondsWithoutReauthentication で期限切れになります。
  • アクセストークンが設定期間前に期限切れになると、セッションは設定期間前に期限切れになります。

有効期間の長いアクセストークンによる方法を使用する PLAIN の場合:

  • ブローカーの認証されたセッションは、設定された maxSecondsWithoutReauthentication で期限切れになります。
  • アクセストークンが設定期間前に期限切れになると、再認証に失敗します。セッションの再認証は試行されますが、PLAIN にはトークンを更新するメカニズムがありません。

maxSecondsWithoutReauthentication が設定されていない場合は、再認証しなくても、OAUTHBEARER および PLAIN クライアントはブローカーへの接続を無期限に維持します。認証されたセッションは、アクセストークンの期限が切れても終了しません。ただし、これは keycloak 承認を使用したり、カスタムオーソライザーをインストールしたりして、承認を設定する場合に考慮されます。

4.4.4. OAuth 2.0 Kafka クライアントの設定

Kafka クライアントは以下のいずれかで設定されます。

  • 承認サーバーから有効なアクセストークンを取得するために必要なクレデンシャル (クライアント ID およびシークレット)。
  • 承認サーバーから提供されたツールを使用して取得された、有効期限の長い有効なアクセストークンまたは更新トークン。

アクセストークンは、Kafka ブローカーに送信される唯一の情報です。アクセストークンを取得するために承認サーバーでの認証に使用されるクレデンシャルは、ブローカーに送信されません。

クライアントによるアクセストークンの取得後、承認サーバーと通信する必要はありません。

クライアント ID とシークレットを使用した認証が最も簡単です。有効期間の長いアクセストークンまたは更新トークンを使用すると、承認サーバーツールに追加の依存関係があるため、より複雑になります。

注記

有効期間が長いアクセストークンを使用している場合は、承認サーバーでクライアントを設定し、トークンの最大有効期間を長くする必要があります。

Kafka クライアントが直接アクセストークンで設定されていない場合、クライアントは承認サーバーと通信して Kafka セッションの開始中にアクセストークンのクレデンシャルを交換します。Kafka クライアントは以下のいずれかを交換します。

  • クライアント ID およびシークレット
  • クライアント ID、更新トークン、および (任意の) シークレット

4.4.5. OAuth 2.0 のクライアント認証フロー

ここでは、Kafka セッションの開始時における Kafka クライアント、Kafka ブローカー、および承認ブローカー間の通信フローを説明および可視化します。フローは、クライアントとサーバーの設定によって異なります。

Kafka クライアントがアクセストークンをクレデンシャルとして Kafka ブローカーに送信する場合、トークンを検証する必要があります。

使用する承認サーバーや利用可能な設定オプションによっては、以下の使用が適している場合があります。

  • 承認サーバーと通信しない、JWT の署名確認およびローカルトークンのイントロスペクションをベースとした高速なローカルトークン検証。
  • 承認サーバーによって提供される OAuth 2.0 のイントロスペクションエンドポイント。

高速のローカルトークン検証を使用するには、トークンでの署名検証に使用される公開証明書のある JWKS エンドポイントを提供する承認サーバーが必要になります。

この他に、承認サーバーで OAuth 2.0 のイントロスペクションエンドポイントを使用することもできます。新しい Kafka ブローカー接続が確立されるたびに、ブローカーはクライアントから受け取ったアクセストークンを承認サーバーに渡し、応答を確認してトークンが有効であるかどうかを確認します。

Kafka クライアントのクレデンシャルは以下に対して設定することもできます。

  • 以前に生成された有効期間の長いアクセストークンを使用した直接ローカルアクセス。
  • 新しいアクセストークンの発行についての承認サーバーとの通信。
注記

承認サーバーは不透明なアクセストークンの使用のみを許可する可能性があり、この場合はローカルトークンの検証は不可能です。

4.4.5.1. クライアント認証フローの例

Kafka クライアントおよびブローカーが以下に設定されている場合の、Kafka セッション認証中のコミュニケーションフローを確認できます。

クライアントではクライアント ID とシークレットが使用され、ブローカーによって検証が承認サーバーに委譲される場合

Client using client ID and secret with broker delegating validation to authorization server

  1. Kafka クライアントは承認サーバーからアクセストークンを要求します。これにはクライアント ID とシークレットを使用し、任意で更新トークンも使用します。
  2. 承認サーバーによって新しいアクセストークンが生成されます。
  3. Kafka クライアントは SASL OAUTHBEARER メカニズムを使用してアクセストークンを渡し、Kafka ブローカーの認証を行います。
  4. Kafka ブローカーは、独自のクライアント ID およびシークレットを使用して、承認サーバーでトークンイントロスペクションエンドポイントを呼び出し、アクセストークンを検証します。
  5. トークンが有効な場合は、Kafka クライアントセッションが確立されます。

クライアントではクライアント ID およびシークレットが使用され、ブローカーによって高速のローカルトークン検証が実行される場合

Client using client ID and secret with broker performing fast local token validation

  1. Kafka クライアントは、トークンエンドポイントから承認サーバーの認証を行います。これにはクライアント ID とシークレットが使用され、任意で更新トークンも使用されます。
  2. 承認サーバーによって新しいアクセストークンが生成されます。
  3. Kafka クライアントは SASL OAUTHBEARER メカニズムを使用してアクセストークンを渡し、Kafka ブローカーの認証を行います。
  4. Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。

クライアントでは有効期限の長いアクセストークンが使用され、ブローカーによって検証が承認サーバーに委譲される場合

Client using long-lived access token with broker delegating validation to authorization server

  1. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して有効期限の長いアクセストークンを渡し、Kafka ブローカーの認証を行います。
  2. Kafka ブローカーは、独自のクライアント ID およびシークレットを使用して、承認サーバーでトークンイントロスペクションエンドポイントを呼び出し、アクセストークンを検証します。
  3. トークンが有効な場合は、Kafka クライアントセッションが確立されます。

クライアントでは有効期限の長いアクセストークンが使用され、ブローカーによって高速のローカル検証が実行される場合

Client using long-lived access token with broker performing fast local validation

  1. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して有効期限の長いアクセストークンを渡し、Kafka ブローカーの認証を行います。
  2. Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
警告

トークンが取り消された場合に承認サーバーとのチェックが行われないため、高速のローカル JWT トークン署名の検証は有効期限の短いトークンにのみ適しています。トークンの有効期限はトークンに書き込まれますが、失効はいつでも発生する可能性があるため、承認サーバーと通信せずに対応することはできません。発行されたトークンはすべて期限切れになるまで有効とみなされます。

4.4.6. OAuth 2.0 認証の設定

OAuth 2.0 は、Kafka クライアントと AMQ Streams コンポーネントとの対話に使用されます。

AMQ Streams に OAuth 2.0 を使用するには、以下を行う必要があります。

4.4.6.1. OAuth 2.0 承認サーバーとしての Red Hat Single Sign-On の設定

この手順では、Red Hat Single Sign-On を承認サーバーとしてデプロイし、AMQ Streams と統合するための設定方法を説明します。

承認サーバーは、一元的な認証および承認の他、ユーザー、クライアント、およびパーミッションの一元管理を実現します。Red Hat Single Sign-On にはレルムの概念があります。レルム はユーザー、クライアント、パーミッション、およびその他の設定の個別のセットを表します。デフォルトの マスターレルム を使用できますが、新しいレルムを作成することもできます。各レルムは独自の OAuth 2.0 エンドポイントを公開します。そのため、アプリケーションクライアントとアプリケーションサーバーはすべて同じレルムを使用する必要があります。

AMQ Streams で OAuth 2.0 を使用するには、Red Hat Single Sign-On のデプロイメントを使用して認証レルムを作成および管理します。

注記

Red Hat Single Sign-On がすでにデプロイされている場合は、デプロイメントの手順を省略して、現在のデプロイメントを使用できます。

作業を開始する前の注意事項

Red Hat Single Sign-On を使用するための知識が必要です。

デプロイメントおよび管理の手順は、以下を参照してください。

前提条件

  • AMQ Streams および Kafka が稼働している必要があります。

Red Hat Single Sign-On デプロイメントに関する条件:

手順

  1. Red Hat Single Sign-On を OpenShift クラスターにデプロイします。

    OpenShift Web コンソールでデプロイメントの進捗を確認します。

  2. Red Hat Single Sign-On の Admin Console にログインし、AMQ Streams の OAuth 2.0 ポリシーを作成します。

    ログインの詳細は、Red Hat Single Sign-On のデプロイ時に提供されます。

  3. レルムを作成し、有効にします。

    既存のマスターレルムを使用できます。

  4. 必要に応じて、レルムのセッションおよびトークンのタイムアウトを調整します。
  5. kafka-broker というクライアントを作成します。
  6. Settings タブで以下を設定します。

    • Access TypeConfidential に設定します。
    • Standard Flow EnabledOFF に設定し、このクライアントからの Web ログインを無効にします。
    • Service Accounts EnabledON に設定し、このクライアントが独自の名前で認証できるようにします。
  7. 続行する前に Save クリックします。
  8. Credentials タブにある、AMQ Streams の Kafka クラスター設定で使用するシークレットを書き留めておきます。
  9. Kafka ブローカーに接続するすべてのアプリケーションクライアントに対して、このクライアント作成手順を繰り返し行います。

    新しいクライアントごとに定義を作成します。

    設定では、名前をクライアント ID として使用します。

次のステップ

承認サーバーのデプロイおよび設定後に、Kafka ブローカーが OAuth 2.0 を使用するように設定 します。

4.4.6.2. Kafka ブローカーの OAuth 2.0 サポートの設定

この手順では、ブローカーリスナーが承認サーバーを使用して OAuth 2.0 認証を使用するように、Kafka ブローカーを設定する方法について説明します。

TLS リスナーを設定して、暗号化されたインターフェースで OAuth 2.0 を使用することが推奨されます。プレーンリスナーは推奨されません。

承認サーバーが信頼できる CA によって署名された証明書を使用し、OAuth 2.0 サーバーのホスト名と一致する場合、TLS 接続はデフォルト設定を使用して動作します。それ以外の場合は、プローバー証明書でトラストストアを設定するか、証明書のホスト名の検証を無効にする必要があります。

Kafka ブローカーの設定する場合、新たに接続された Kafka クライアントの OAuth 2.0 認証中にアクセストークンを検証するために使用されるメカニズムには、以下の 2 つのオプションがあります。

作業を開始する前の注意事項

Kafka ブローカーリスナーの OAuth 2.0 認証の設定に関する詳細は、以下を参照してください。

前提条件

  • AMQ Streams および Kafka が稼働している必要があります。
  • OAuth 2.0 の承認サーバーがデプロイされている必要があります。

手順

  1. エディターで、Kafka リソースの Kafka ブローカー設定 (Kafka.spec.kafka) を更新します。

    oc edit kafka my-cluster
  2. Kafka ブローカーの listeners 設定を行います。

    各タイプのリスナーは独立しているため、同じ設定にする必要はありません。

    以下は、外部リスナーに設定された設定オプションの例になります。

    例 1: 高速なローカル JWT トークン検証の設定

    #...
    - name: external
      port: 9094
      type: loadbalancer
      tls: true
      authentication:
        type: oauth 1
        validIssuerUri: <https://<auth-server-address>/auth/realms/external> 2
        jwksEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/certs> 3
        userNameClaim: preferred_username 4
        maxSecondsWithoutReauthentication: 3600 5
        tlsTrustedCertificates: 6
        - secretName: oauth-server-cert
          certificate: ca.crt
        disableTlsHostnameVerification: true 7
        jwksExpirySeconds: 360 8
        jwksRefreshSeconds: 300 9
        jwksMinRefreshPauseSeconds: 1 10
        enableECDSA: "true" 11

    1
    oauth に設定されたリスナータイプ。
    2
    認証に使用されるトークン発行者の URI。
    3
    ローカルの JWT 検証に使用される JWKS 証明書エンドポイントの URI。
    4
    トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。userNameClaim の値は、使用される認証フローと承認サーバーによって異なります。
    5
    (任意設定): セッションの有効期限がアクセストークンと同じ期間になるよう強制する Kafka の再認証メカニズムを有効にします。指定された値がアクセストークンの有効期限が切れるまでの残り時間未満の場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。
    6
    (任意設定): 承認サーバーへの TLS 接続用の信用できる証明書。
    7
    (任意設定): TLS ホスト名の検証を無効にします。デフォルトは false です。
    8
    JWKS 証明書が期限切れになる前に有効であるとみなされる期間。デフォルトは 360 秒 です。デフォルトよりも長い時間を指定する場合は、無効になった証明書へのアクセスが許可されるリスクを考慮してください。
    9
    JWKS 証明書を更新する間隔。この間隔は、有効期間よりも 60 秒以上短くする必要があります。デフォルトは 300 秒 です。
    10
    JWKS 公開鍵の更新が連続して試行される間隔の最小一時停止時間 (秒単位)。不明な署名キーが検出されると、JWKS キーの更新は、最後に更新を試みてから少なくとも指定された期間は一時停止し、通常の定期スケジュール以外でスケジュールされます。キーの更新はエクスポネンシャルバックオフ (exponential backoff) のルールに従い、jwksRefreshSeconds に到達するまで、一時停止を増やして失敗した更新の再試行を行います。デフォルト値は 1 です。
    11
    (任意設定): ECDSA を使用して承認サーバーで JWT トークンを署名する場合は、これを有効にする必要があります。BouncyCastle 暗号ライブラリーを使用して追加の暗号プロバイダーがインストールされます。デフォルトは false です。

    例 2: イントロスペクションエンドポイントを使用したトークンの検証の設定

    - name: external
      port: 9094
      type: loadbalancer
      tls: true
      authentication:
        type: oauth
        validIssuerUri: <https://<auth-server-address>/auth/realms/external>
        introspectionEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token/introspect> 1
        clientId: kafka-broker 2
        clientSecret: 3
          secretName: my-cluster-oauth
          key: clientSecret
        userNameClaim: preferred_username 4
        maxSecondsWithoutReauthentication: 3600 5

    1
    トークンイントロスペクションエンドポイントの URI。
    2
    クライアントを識別するためのクライアント ID。
    3
    認証にはクライアントシークレットとクライアント ID が使用されます。
    4
    トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。userNameClaim の値は、使用される承認サーバーによって異なります。
    5
    (任意設定): セッションの有効期限がアクセストークンと同じ期間になるよう強制する Kafka の再認証メカニズムを有効にします。指定された値がアクセストークンの有効期限が切れるまでの残り時間未満の場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。

    OAuth 2.0 認証の適用方法や、承認サーバーのタイプによっては、追加 (任意) の設定を使用できます。

      # ...
      authentication:
        type: oauth
        # ...
        checkIssuer: false 1
        checkAudience: true 2
        fallbackUserNameClaim: client_id 3
        fallbackUserNamePrefix: client-account- 4
        validTokenType: bearer 5
        userInfoEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/userinfo 6
        enableOauthBearer: false 7
        enablePlain: true 8
        tokenEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/token 9
        customClaimCheck: "@.custom == 'custom-value'" 10
    1
    承認サーバーが iss クレームを提供しない場合は、発行者チェックを行うことができません。このような場合、checkIssuerfalse に設定し、validIssuerUri を指定しないようにします。デフォルトは true です。
    2
    承認サーバーによって aud (オーディエンス) クレームが提供され、オーディエンスチェックを強制する場合は、checkAudiencetrue に設定します。オーディエンスチェックによって、トークンの目的の受信者が特定されます。そのため、Kafka ブローカーによって、aud クレームに clientId のないトークンは拒否されます。デフォルトは false です。
    3
    承認サーバーは、通常ユーザーとクライアントの両方を識別する単一の属性を提供しない場合があります。クライアントが独自の名前で認証される場合、サーバーによって クライアント ID が提供されることがあります。更新トークンまたはアクセストークンを取得するために、ユーザー名およびパスワードを使用してユーザーが認証される場合、サーバーによってクライアント ID の他に ユーザー名 が提供されることがあります。プライマリーユーザー ID 属性が使用できない場合は、このフォールバックオプションで、使用するユーザー名クレーム (属性) を指定します。
    4
    fallbackUserNameClaim が適用される場合、ユーザー名クレームの値とフォールバックユーザー名クレームの値が競合しないようにする必要もあることがあります。producer というクライアントが存在し、producer という通常ユーザーも存在する場合について考えてみましょう。この 2 つを区別するには、このプロパティーを使用してクライアントのユーザー ID に接頭辞を追加します。
    5
    (introspectionEndpointUri を使用する場合のみ該当): 使用している認証サーバーによっては、イントロスペクションエンドポイントによってトークンタイプ属性が返されるかどうかは分からず、異なる値が含まれることがあります。イントロスペクションエンドポイントからの応答に含まれなければならない有効なトークンタイプ値を指定できます。
    6
    (introspectionEndpointUri を使用する場合のみ該当): イントロスペクションエンドポイントの応答に識別可能な情報が含まれないように、承認サーバーが設定または実装されることがあります。ユーザー ID を取得するには、userinfo エンドポイントの URI をフォールバックとして設定します。userNameClaimfallbackUserNameClaim、および fallbackUserNamePrefix 設定が userinfo エンドポイントの応答に適用されます。
    7
    これを false`to disable the OAUTHBEARER mechanism on the listener. At least one of PLAIN or OAUTHBEARER has to be enabled. Default is `true に設定します。
    8
    リスナーで PLAIN メカニズムを有効にするには、これを true に設定します。これは、すべてのプラットフォームのすべてのクライアントでサポートされます。Kafka クライアントで PLAIN メカニズムを有効にし、username および password を設定する必要があります。このメカニズムは、OAuth アクセストークンを使用するか、OAuth クライアント ID およびシークレット (クライアントクレデンシャル) を使用して、認証するために使用できます。クライアントによって、文字列 $accessToken: で始まる password が設定された場合、パスワードはサーバー上のアクセストークンとして解釈され、username はアカウントのユーザー名として解釈されます。それ以外の場合は、ユーザーはクライアント ID として解釈され、パスワードはクライアントシークレットとして解釈されます。デフォルトは false です。
    9
    これは、前のポイントで説明されているように、enablePlain が true に設定されている場合にクライアントクレデンシャル認証をサポートするように設定する必要があります。
    10
    これを JsonPath フィルタークエリーに設定すると、検証中に追加のカスタムルールを JWT アクセストークンに適用できます。アクセストークンに必要なデータが含まれていないと拒否されます。introspectionEndpointUri を使用する場合、カスタムチェックはイントロスペクションエンドポイントの応答 JSON に適用されます。
  3. エディターを保存して終了し、ローリングアップデートの完了を待ちます。
  4. ログで更新を確認するか、Pod の状態遷移を監視して確認します。

    oc logs -f ${POD_NAME} -c ${CONTAINER_NAME}
    oc get pod -w

    ローリングアップデートによって、ブローカーが OAuth 2.0 認証を使用するように設定されます。

4.4.6.3. OAuth 2.0 を使用するよう Kafka Java クライアントを設定

この手順では、Kafka ブローカーとの対話に OAuth 2.0 を使用するように Kafka プロデューサーおよびコンシューマー API を設定する方法を説明します。

クライアントコールバックプラグインを pom.xml ファイルに追加し、システムプロパティーを設定します。

前提条件

  • AMQ Streams および Kafka が稼働している必要があります。
  • OAuth 2.0 承認サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている必要があります。
  • Kafka ブローカーが OAuth 2.0 に対して設定されている必要があります。

手順

  1. OAuth 2.0 サポートのあるクライアントライブラリーを Kafka クライアントの pom.xml ファイルに追加します。

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.7.1.redhat-00003</version>
    </dependency>
  2. コールバックのシステムプロパティーを設定します。

    以下に例を示します。

    System.setProperty(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, “https://<auth-server-address>/auth/realms/master/protocol/openid-connect/token”); 1
    System.setProperty(ClientConfig.OAUTH_CLIENT_ID, "<client-name>"); 2
    System.setProperty(ClientConfig.OAUTH_CLIENT_SECRET, "<client-secret>"); 3
    1
    承認サーバーのトークンエンドポイントの URI です。
    2
    クライアント ID。承認サーバーで client を作成するときに使用される名前です。
    3
    承認サーバーで client を作成するときに作成されるクライアントシークレット。
  3. Kafka クライアント設定の TLS で暗号化された接続で SASL OAUTHBEARER メカニズムを有効にします。

    以下に例を示します。

    props.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
    props.put("security.protocol", "SASL_SSL"); 1
    props.put("sasl.mechanism", "OAUTHBEARER");
    props.put("sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
    1
    この例では、TLS 接続で SASL_SSL を使用します。暗号化されていない接続では SASL_PLAINTEXT を使用します。
  4. Kafka クライアントが Kafka ブローカーにアクセスできることを確認します。

4.4.6.4. Kafka コンポーネントの OAuth 2.0 の設定

この手順では、承認サーバーを使用して OAuth 2.0 認証を使用するように Kafka コンポーネントを設定する方法を説明します。

以下の認証を設定できます。

  • Kafka Connect
  • Kafka MirrorMaker
  • Kafka Bridge

この手順では、Kafka コンポーネントと承認サーバーは同じサーバーで稼働しています。

作業を開始する前の注意事項

Kafka コンポーネントの OAuth 2.0 認証の設定に関する詳細は、以下を参照してください。

前提条件

  • AMQ Streams および Kafka が稼働している必要があります。
  • OAuth 2.0 承認サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている必要があります。
  • Kafka ブローカーが OAuth 2.0 に対して設定されている必要があります。

手順

  1. クライアントシークレットを作成し、これを環境変数としてコンポーネントにマウントします。

    以下は、Kafka Bridge の Secret を作成する例になります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Secret
    metadata:
     name: my-bridge-oauth
    type: Opaque
    data:
     clientSecret: MGQ1OTRmMzYtZTllZS00MDY2LWI5OGEtMTM5MzM2NjdlZjQw 1
    1
    clientSecret キーは base64 形式である必要があります。
  2. Kafka コンポーネントのリソースを作成または編集し、OAuth 2.0 認証が認証プロパティーに設定されるようにします。

    OAuth 2.0 認証では、以下を使用できます。

    • クライアント ID およびシークレット
    • クライアント ID および更新トークン
    • アクセストークン
    • TLS

    KafkaClientAuthenticationOAuth スキーマ参照は、それぞれの例を提供します

    以下は、クライアント ID、シークレット、および TLS を使用して OAuth 2.0 が Kafka Bridge クライアントに割り当てられる例になります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      # ...
      authentication:
        type: oauth 1
        tokenEndpointUri: https://<auth-server-address>/auth/realms/master/protocol/openid-connect/token 2
        clientId: kafka-bridge
        clientSecret:
          secretName: my-bridge-oauth
          key: clientSecret
        tlsTrustedCertificates: 3
        - secretName: oauth-server-cert
          certificate: tls.crt
    1
    oauth に設定された認証タイプ。
    2
    認証用のトークンエンドポイントの URI。
    3
    承認サーバーへの TLS 接続用の信用できる証明書。

    OAuth 2.0 認証の適用方法や、承認サーバーのタイプによって、使用できる追加の設定オプションがあります。

    # ...
    spec:
      # ...
      authentication:
        # ...
        disableTlsHostnameVerification: true 1
        checkAccessTokenType: false 2
        accessTokenIsJwt: false 3
        scope: any 4
    1
    (任意設定): TLS ホスト名の検証を無効にします。デフォルトは false です。
    2
    承認サーバーによって、JWT トークン内部で typ (タイプ) 要求が返されない場合は、checkAccessTokenType: false を適用するとトークンタイプがチェックされず次に進むことができます。デフォルトは true です。
    3
    不透明なトークンを使用している場合、アクセストークンが JWT トークンとして処理されないように accessTokenIsJwt: false を適用することができます。
    4
    (任意設定): トークンエンドポイントからトークンを要求するための scope。認証サーバーでは、クライアントによるスコープの指定が必要になることがあります。この場合では any になります。
  3. Kafka リソースのデプロイメントに変更を適用します。

    oc apply -f your-file
  4. ログで更新を確認するか、Pod の状態遷移を監視して確認します。

    oc logs -f ${POD_NAME} -c ${CONTAINER_NAME}
    oc get pod -w

    ローリングアップデートでは、OAuth 2.0 認証を使用して Kafka ブローカーと対話するコンポーネントが設定されます。

4.5. OAuth 2.0 トークンベース承認の使用

トークンベースの認証に OAuth 2.0 と Red Hat Single Sign-On を使用している場合、Red Hat Single Sign-On を使用して承認ルールを設定し、Kafka ブローカーへのクライアントのアクセスを制限することもできます。認証はユーザーのアイデンティティーを確立します。承認は、そのユーザーのアクセスレベルを決定します。

AMQ Streams は、Red Hat Single Sign-On の 承認サービス による OAuth 2.0 トークンベースの承認をサポートします。これにより、セキュリティーポリシーとパーミッションの一元的な管理が可能になります。

Red Hat Single Sign-On で定義されたセキュリティーポリシーおよびパーミッションは、Kafka ブローカーのリソースへのアクセスを付与するために使用されます。ユーザーとクライアントは、Kafka ブローカーで特定のアクションを実行するためのアクセスを許可するポリシーに対して照合されます。

Kafka では、デフォルトですべてのユーザーがブローカーに完全アクセスできます。また、アクセス制御リスト (ACL) を基にして承認を設定するために AclAuthorizer プラグインが提供されます。

ZooKeeper には、 ユーザー名 を基にしてリソースへのアクセスを付与または拒否する ACL ルールが保存されます。ただし、Red Hat Single Sign-On を使用した OAuth 2.0 トークンベースの承認では、より柔軟にアクセス制御を Kafka ブローカーに実装できます。さらに、Kafka ブローカーで OAuth 2.0 の承認および ACL が使用されるように設定することができます。

4.5.1. OAuth 2.0 の承認メカニズム

AMQ Streams の OAuth 2.0 での承認では、Red Hat Single Sign-On サーバーの Authorization Services REST エンドポイントを使用して、Red Hat Single Sign-On を使用するトークンベースの認証が拡張されます。これは、定義されたセキュリティーポリシーを特定のユーザーに適用し、そのユーザーの異なるリソースに付与されたパーミッションの一覧を提供します。ポリシーはロールとグループを使用して、パーミッションをユーザーと照合します。OAuth 2.0 の承認では、Red Hat Single Sign-On の Authorization Services から受信した、ユーザーに付与された権限のリストを基にして、権限がローカルで強制されます。

4.5.1.1. Kafka ブローカーのカスタムオーソライザー

AMQ Streams では、Red Hat Single Sign-On の オーソライザー (KeycloakRBACAuthorizer) が提供されます。Red Hat Single Sign-On によって提供される Authorization Services で Red Hat Single Sign-On REST エンドポイントを使用できるようにするには、Kafka ブローカーでカスタムオーソライザーを設定します。

オーソライザーは必要に応じて付与された権限のリストを承認サーバーから取得し、ローカルで Kafka ブローカーに承認を強制するため、クライアントの要求ごとに迅速な承認決定が行われます。

4.5.2. OAuth 2.0 承認サポートの設定

この手順では、Red Hat Single Sign-On の Authorization Services を使用して、OAuth 2.0 承認を使用するように Kafka ブローカーを設定する方法を説明します。

作業を始める前に

特定のユーザーに必要なアクセス、または制限するアクセスについて検討してください。Red Hat Single Sign-On では、Red Hat Single Sign-On の グループロールクライアント、および ユーザー の組み合わせを使用して、アクセスを設定できます。

通常、グループは組織の部門または地理的な場所を基にしてユーザーを照合するために使用されます。また、ロールは職務を基にしてユーザーを照合するために使用されます。

Red Hat Single Sign-On を使用すると、ユーザーおよびグループを LDAP で保存できますが、クライアントおよびロールは LDAP で保存できません。ユーザーデータへのアクセスとストレージを考慮して、承認ポリシーの設定方法を選択する必要がある場合があります。

注記

スーパーユーザー は、Kafka ブローカーに実装された承認にかかわらず、常に制限なく Kafka ブローカーにアクセスできます。

前提条件

  • AMQ Streams は、トークンベースの認証 に Red Hat Single Sign-On と OAuth 2.0 を使用するように設定されている必要があります。承認を設定するときに、同じ Red Hat Single Sign-On サーバーエンドポイントを使用する必要があります。
  • OAuth 2.0 認証は、再認証を有効にするために maxSecondsWithoutReauthentication オプションで設定する必要があります。

手順

  1. Red Hat Single Sign-On の Admin Console にアクセスするか、Red Hat Single Sign-On の Admin CLI を使用して、OAuth 2.0 認証の設定時に作成した Kafka ブローカークライアントの Authorization Services を有効にします。
  2. 承認サービスを使用して、クライアントのリソース、承認スコープ、ポリシー、およびパーミッションを定義します。
  3. ロールとグループをユーザーとクライアントに割り当てて、パーミッションをユーザーとクライアントにバインドします。
  4. エディターで Kafka リソースの Kafka ブローカー設定 (Kafka.spec.kafka) を更新して、Kafka ブローカーで Red Hat Single Sign-On による承認が使用されるように設定します。

    oc edit kafka my-cluster
  5. Kafka ブローカーの kafka 設定を指定して、keycloak による承認を使用し、承認サーバーと Red Hat Single Sign-On の Authorization Services にアクセスできるようにします。

    以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        authorization:
          type: keycloak 1
          tokenEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token> 2
          clientId: kafka 3
          delegateToKafkaAcls: false 4
          disableTlsHostnameVerification: false 5
          superUsers: 6
          - CN=fred
          - sam
          - CN=edward
          tlsTrustedCertificates: 7
          - secretName: oauth-server-cert
            certificate: ca.crt
          grantsRefreshPeriodSeconds: 60 8
          grantsRefreshPoolSize: 5 9
        #...
    1
    タイプ keycloak によって Red Hat Single Sign-On の承認が有効になります。
    2
    Red Hat Single Sign-On トークンエンドポイントの URI。本番環境では常に HTTP を使用してください。
    3
    承認サービスが有効になっている Red Hat Single Sign-On の OAuth 2.0 クライアント定義のクライアント ID。通常、kafka が ID として使用されます。
    4
    (任意設定): Red Hat Single Sign-On の Authorization Services のポリシーによってアクセスが拒否される場合は、Kafka AclAuthorizer に承認を委譲します。デフォルトは false です。
    5
    (任意設定): TLS ホスト名の検証を無効にします。デフォルトは false です。
    6
    (任意設定): 指定の スーパーユーザー
    7
    (任意設定): 承認サーバーへの TLS 接続用の信用できる証明書。
    8
    (任意設定): 連続する付与 (Grants) 更新実行の間隔。これは、アクティブなセッションが Red Hat Single Sign-On でユーザーのパーミッション変更を検出する最大時間です。デフォルト値は 60 です。
    9
    (任意設定): アクティブなセッションの付与 (Grants) の更新 (並行して) に使用するスレッドの数。デフォルト値は 5 です。
  6. エディターを保存して終了し、ローリングアップデートの完了を待ちます。
  7. ログで更新を確認するか、Pod の状態遷移を監視して確認します。

    oc logs -f ${POD_NAME} -c kafka
    oc get pod -w

    ローリングアップデートによって、ブローカーが OAuth 2.0 承認を使用するように設定されます。

  8. クライアントまたは特定のロールを持つユーザーとして Kafka ブローカーにアクセスして、設定したパーミッションを検証し、必要なアクセス権限があり、付与されるべきでないアクセス権限がないことを確認します。