6.2. OpenShift 外クライアントのアクセスの設定

以下の手順では、OpenShift 外部からの Kafka クラスターへのクライアントアクセスを設定する方法を説明します。

Kafka クラスターのアドレスを使用して、異なる OpenShift namespace または完全に OpenShift 外のクライアントに外部アクセスを提供できます。

アクセスを提供するために、外部 Kafka リスナーを設定します。

以下のタイプの外部リスナーがサポートされます。

  • OpenShift Route およびデフォルトの HAProxy ルーターを使用する route
  • ロードバランサーサービスを使用する loadbalancer
  • OpenShift ノードのポートを使用する nodeport
  • OpenShift IngressNGINX Ingress Controller for Kubernetes を使用する ingress

要件ならびにお使いの環境およびインフラストラクチャーに応じて、選択するタイプは異なります。たとえば、ロードバランサーは、ベアメタル等の特定のインフラストラクチャーには適さない場合があります。ベアメタルでは、ノードポートがより適したオプションを提供します。

以下の手順では、

  1. TLS 暗号化および認証、ならびに Kafka 簡易承認 を有効にして、Kafka クラスターに外部リスナーが設定されます。
  2. 簡易承認 用に TLS 認証および アクセス制御リスト (ACL) を定義して、クライアントに KafkaUser が作成されます。

TLS、SCRAM-SHA-512、または OAuth 2.0 認証を使用するようにリスナーを設定できます。TLS は常に暗号化を使用しますが、SCRAM-SHA-512 および OAuth 2.0 認証でも暗号化を使用することが推奨されます。

Kafka ブローカーの simple、OAuth 2.0、OPA、またはカスタム承認を設定できます。承認を有効にすると、承認は有効なすべてのリスナーに適用されます。

KafkaUser 認証および承認メカニズムを設定する場合、必ず同等の Kafka 設定と一致するようにしてください。

  • KafkaUser.spec.authenticationKafka.spec.kafka.listeners[*].authentication と一致します。
  • KafkaUser.spec.authorizationKafka.spec.kafka.authorization と一致します。

KafkaUser に使用する認証をサポートするリスナーが少なくとも 1 つ必要です。

注記

Kafka ユーザーと Kafka ブローカー間の認証は、それぞれの認証設定によって異なります。たとえば、TLS が Kafka 設定で有効になっていない場合は、TLS でユーザーを認証できません。

AMQ Streams Operator により設定プロセスが自動されます。

  • Cluster Operator はリスナーを作成し、クラスターおよびクライアント認証局 (CA) 証明書を設定して Kafka クラスター内で認証を有効にします。
  • User Operator はクライアントに対応するユーザーを作成すると共に、選択した認証タイプに基づいて、クライアント認証に使用されるセキュリティークレデンシャルを作成します。

この手順では、Cluster Operator によって生成された証明書が使用されますが、独自の証明書をインストール してそれらを置き換えることができます。外部認証局によって管理される Kafka リスナー証明書を使用するようにリスナーを設定することもできます。

PKCS #12 (.p12) および PEM 形式 (.crt) の証明書を利用できます。この手順では、PKCS #12 証明書について説明します。

前提条件

  • クライアントが Kafka クラスターを使用できる必要があります。
  • Cluster Operator および User Operator がクラスターで実行されている必要があります。
  • OpenShift クラスター外のクライアントが Kafka クラスターに接続できる必要があります。

手順

  1. external Kafka リスナーと共に Kafka クラスターを設定します。

    • リスナーを通じて Kafka ブローカーにアクセスするのに必要な認証を定義します。
    • Kafka ブローカーで承認を有効にします。

      以下に例を示します。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
        namespace: myproject
      spec:
        kafka:
          # ...
          listeners: 1
          - name: external 2
            port: 9094 3
            type: LISTENER-TYPE 4
            tls: true 5
            authentication:
              type: tls 6
            configuration:
              preferredNodePortAddressType: InternalDNS 7
              bootstrap and broker service overrides 8
              #...
          authorization: 9
            type: simple
            superUsers:
              - super-user-name 10
        # ...
      1
      外部リスナーを有効にする設定オプションは、「Generic Kafka listener schema reference」に記載されています。
      2
      リスナーを識別するための名前。Kafka クラスター内で一意である必要があります。
      3
      Kafka 内でリスナーによって使用されるポート番号。ポート番号は指定の Kafka クラスター内で一意である必要があります。許可されるポート番号は 9092 以上ですが、すでに Prometheus および JMX によって使用されているポート 9404 および 9999 以外になります。リスナーのタイプによっては、ポート番号は Kafka クライアントに接続するポート番号と同じではない場合があります。
      4
      routeloadbalancernodeport、または ingress として指定される外部リスナータイプ。内部リスナーは internal として指定されます。
      5
      リスナーで TLS による暗号化を有効にします。デフォルトは false です。route リスナーには TLS 暗号化は必要ありません。
      6
      認証は tls として指定されます。
      7
      8
      (任意設定) AMQ Streams はクライアントに公開するアドレスを自動的に決定します。アドレスは OpenShift によって自動的に割り当てられます。AMQ Streams を実行しているインフラストラクチャーが正しい ブートストラップおよびブローカーサービスのアドレス を提供しない場合、そのアドレスを上書きできます。検証はオーバーライドに対しては実行されません。オーバーライド設定はリスナーのタイプによって異なります。たとえば、route の場合はホストを、 loadbalancer の場合は DNS 名または IP アドレスを、また nodeport の場合はノードポートを、それぞれ上書きすることができます。
      9
      simple と指定された承認 (AclAuthorizer Kafka プラグインを使用する)。
      10
      (任意設定) スーパーユーザーは、ACL で定義されたアクセス制限に関係なく、すべてのブローカーにアクセスできます。
      警告

      OpenShift Route アドレスは、Kafka クラスターの名前、リスナーの名前、および作成される namespace の名前で構成されます。たとえば、my-cluster-kafka-listener1-bootstrap-myproject (CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE) となります。route リスナータイプを使用している場合、アドレス全体の長さが上限の 63 文字を超えないように注意してください。

  2. Kafka リソースを作成または更新します。

    oc apply -f KAFKA-CONFIG-FILE

    Kafka クラスターは、TLS 認証を使用する Kafka ブローカーリスナーと共に設定されます。

    Kafka ブローカー Pod ごとにサービスが作成されます。

    サービスが作成され、Kafka クラスターに接続するための ブートストラップアドレス として機能します。

    サービスは、nodeport リスナーを使用した Kafka クラスターへの外部接続用 外部ブートストラップアドレス としても作成されます。

    kafka ブローカーの ID を検証するためのクラスター CA 証明書も、Kafka リソースと同じ名前で作成されます。

  3. Kafka リソースのステータスからブートストラップアドレスおよびポートを見つけます。

    oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'

    Kafka クライアントのブートストラップアドレスを使用して、Kafka クラスターに接続します。

  4. Kafka クラスターにアクセスする必要があるクライアントに対応するユーザーを作成または変更します。

    • Kafka リスナーと同じ認証タイプを指定します。
    • 簡易承認に承認 ACL を指定します。

      以下に例を示します。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaUser
      metadata:
        name: my-user
        labels:
          strimzi.io/cluster: my-cluster 1
      spec:
        authentication:
          type: tls 2
        authorization:
          type: simple
          acls: 3
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operation: Read
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operation: Describe
            - resource:
                type: group
                name: my-group
                patternType: literal
              operation: Read
      1
      ラベルは、作成するユーザーの Kafka クラスターのラベルと一致する必要があります。
      2
      認証は tls として指定されます。
      3
      簡易承認には、ユーザーに適用する ACL ルールのリストが必要です。ルールは、ユーザー名 (my-user) を基に Kafka リソースで許可される操作を定義します。
  5. KafkaUser リソースを作成または変更します。

    oc apply -f USER-CONFIG-FILE

    KafkaUser リソースと同じ名前の Secret と共に、ユーザーが作成されます。Secret には、TLS クライアント認証の秘密鍵と公開鍵が含まれます。

    以下は例になります。

    apiVersion: v1
    kind: Secret
    metadata:
      name: my-user
      labels:
        strimzi.io/kind: KafkaUser
        strimzi.io/cluster: my-cluster
    type: Opaque
    data:
      ca.crt: PUBLIC-KEY-OF-THE-CLIENT-CA
      user.crt: USER-CERTIFICATE-CONTAINING-PUBLIC-KEY-OF-USER
      user.key: PRIVATE-KEY-OF-USER
      user.p12: P12-ARCHIVE-FILE-STORING-CERTIFICATES-AND-KEYS
      user.password: PASSWORD-PROTECTING-P12-ARCHIVE
  6. パブリッククラスター CA 証明書を目的の証明書形式に抽出します。

    oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
  7. パスワードファイルからパスワードを抽出します。

    oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
  8. パブリッククラスター証明書の認証詳細でクライアントを設定します。

    クライアントコードのサンプル

    properties.put("security.protocol","SSL"); 1
    properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/path/to/ca.p12"); 2
    properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,CA-PASSWORD); 3
    properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"PKCS12"); 4

    1
    (TLS クライアント認証ありまたはなしで) TLS による暗号化を有効にします。
    2
    証明書がインポートされたトラストストアの場所を指定します。
    3
    トラストストアにアクセスするためのパスワードを指定します。このプロパティーは、トラストストアで必要なければ省略できます。
    4
    トラストストアのタイプを識別します。
    注記

    TLS 経由で SCRAM-SHA 認証を使用する場合は、security.protocol: SASL_SSL を使用します。

  9. ユーザーシークレットから目的の証明書形式にユーザー CA 証明書を抽出します。

    oc get secret USER-NAME -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
  10. パスワードファイルからパスワードを抽出します。

    oc get secret USER-NAME -o jsonpath='{.data.user\.password}' | base64 -d > user.password
  11. ユーザー CA 証明書の認証詳細でクライアントを設定します。

    クライアントコードのサンプル

    properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/path/to/user.p12"); 1
    properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"<user.password>"); 2
    properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"PKCS12"); 3

    1
    証明書がインポートされたキーストアの場所を指定します。
    2
    キーストアにアクセスするためのパスワードを指定します。このプロパティーは、キーストアで必要なければ省略できます。パブリックユーザー証明書は、作成時にクライアント CA により署名されます。
    3
    キーストアのタイプを識別します。
  12. Kafka クラスターに接続するためのブートストラップアドレスおよびポートを追加します。

    bootstrap.servers: BOOTSTRAP-ADDRESS:PORT

関連情報