第13章 セキュリティー

AMQ Streams は、Kafka と AMQ Streams コンポーネントとの間で TLS プロトコルを使用して暗号化された通信をサポートします。Kafka ブローカー間の通信 (Interbroker 通信)、ZooKeeper ノード間の通信 (Internodal 通信)、およびこれらと AMQ Streams Operator 間の通信は、常に暗号化されます。Kafka クライアントと Kafka ブローカーとの間の通信は、クラスターが設定された方法に応じて暗号化されます。Kafka および AMQ Streams コンポーネントでは、TLS 証明書も認証に使用されます。

Cluster Operator は、自動で TLS 証明書の設定および更新を行い、クラスター内での暗号化および認証を有効にします。また、Kafka ブローカーとクライアントとの間の暗号化または TLS 認証を有効にする場合、他の TLS 証明書も設定されます。ユーザーが用意した証明書は更新されません。

TLS 暗号化が有効になっている TLS リスナーまたは外部リスナーの、Kafka リスナー証明書 と呼ばれる独自のサーバー証明書を提供できます。詳細は 「Kafka リスナー証明書」 を参照してください。

13.1. 認証局

暗号化のサポートには、AMQ Streams コンポーネントごとに固有の秘密鍵と公開鍵証明書が必要です。すべてのコンポーネント証明書は、クラスター CA と呼ばれる内部認証局 (CA) により署名されます。

同様に、TLS クライアント認証を使用して AMQ Streams に接続する各 Kafka クライアントアプリケーションは、秘密鍵と証明書を提供する必要があります。クライアント CA という第 2 の内部 CA を使用して、Kafka クライアントの証明書に署名します。

13.1.1. CA 証明書

クラスター CA とクライアント CA の両方には、自己署名の公開鍵証明書があります。

Kafka ブローカーは、クラスター CA またはクライアント CA のいずれかが署名した証明書を信頼するように設定されます。クライアントによる接続が不要なコンポーネント (ZooKeeper など) のみが、クラスター CA によって署名された証明書を信頼します。外部リスナーの TLS 暗号化が無効でない限り、クライアントアプリケーションはクラスター CA により署名された証明書を必ず信頼する必要があります。これは、相互 TLS 認証 を実行するクライアントアプリケーションにも当てはまります。

デフォルトで、AMQ Streams はクラスター CA またはクライアント CA によって発行された CA 証明書を自動で生成および更新します。これらの CA 証明書の管理は、Kafka.spec.clusterCa および Kafka.spec.clientsCa オブジェクトで設定できます。ユーザーが用意した証明書は更新されません。

クラスター CA またはクライアント CA に、独自の CA 証明書を提供できます。詳細は「独自の CA 証明書のインストール」を参照してください。独自の証明書を提供する場合は、証明書の更新が必要なときに手作業で更新する必要があります。

13.1.2. CA 証明書の有効期間

CA 証明書の有効期間は、証明書の生成からの日数で提示されます。有効期間は、それぞれ以下で設定できます。

  • クラスター CA 証明書の場合: Kafka.spec.clusterCa.validityDays
  • クライアント CA 証明書の場合: Kafka.spec.clientsCa.validityDays

13.1.3. 独自の CA 証明書のインストール

この手順では、Cluster Operator で生成される CA 証明書と秘密鍵を使用する代わりに、独自の CA 証明書と秘密鍵をインストールする方法について説明します。

前提条件

  • Cluster Operator が稼働している必要があります。
  • Kafka クラスターがデプロイされていない必要があります。
  • クラスター CA またはクライアントの、PEM 形式による独自の X.509 証明書および鍵が必要です。

    • ルート CA ではないクラスターまたはクライアント CA を使用する場合、証明書ファイルにチェーン全体を含める必要があります。チェーンの順序は以下のとおりです。

      1. クラスターまたはクライアント CA
      2. 1 つ以上の中間 CA
      3. ルート CA
    • チェーン内のすべての CA は、X509v3 の基本制約 (Basic Constraint) で CA として設定する必要があります。

手順

  1. 使用する CA 証明書を対応する Secret に挿入します (クラスター CA の場合は <cluster>-cluster-ca-cert、クライアント CA の場合は <cluster>-clients-ca-cert)。

    以下のコマンドを実行します。

    # Delete any existing secret (ignore "Not Exists" errors)
    oc delete secret <ca-cert-secret>
    # Create and label the new secret
    oc create secret generic <ca-cert-secret> --from-file=ca.crt=<ca-cert-file>
  2. 使用する CA キーを対応する Secret に挿入します (クラスター CA の場合は <cluster>-cluster-ca、クライアント CA の場合は <cluster>-clients-ca)。

    # Delete the existing secret
    oc delete secret <ca-key-secret>
    # Create the new one
    oc create secret generic <ca-key-secret> --from-file=ca.key=<ca-key-file>
  3. 両方の Secrets にラベル strimzi.io/kind=Kafka および strimzi.io/cluster=<my-cluster> を付けます。

    oc label secret <ca-cert-secret> strimzi.io/kind=Kafka strimzi.io/cluster=<my-cluster>
    oc label secret <ca-key-secret> strimzi.io/kind=Kafka strimzi.io/cluster=<my-cluster>
  4. クラスターの Kafka リソースを作成し、生成された CA を使用 しない ように Kafka.spec.clusterCa および Kafka.spec.clientsCa オブジェクトを設定します。

    独自指定の証明書を使用するようにクラスター CA を設定する Kafka リソースの例 (抜粋)

    kind: Kafka
    version: kafka.strimzi.io/v1beta1
    spec:
      # ...
      clusterCa:
        generateCertificateAuthority: false

その他のリソース

13.2. Secret

Strimzi は Secret を使用して、Kafka クラスターコンポーネントおよびクライアントの秘密鍵と証明書を格納します。Secret は、Kafka ブローカー間やブローカーとクライアントの間で TLS で暗号化された接続を確立するために使用されます。Secret は相互 TLS 認証にも使用されます。

  • Cluster Secret には、Kafka ブローカー証明書に署名するためのクラスター CA 証明書が含まれます。また、接続クライアントによって、Kafka クラスターとの TLS 暗号化接続を確立してブローカー ID を検証するために使用されます。
  • Client Secret にはクライアント CA 証明書が含まれ、これによりユーザーは独自のクライアント証明書に署名し、Kafka クラスターに対する相互認証が可能になります。ブローカーはクライアント CA 証明書を使用してクライアント ID を検証します。
  • User Secret には、新規ユーザーの作成時にクライアント CA 証明書によって生成および署名される秘密鍵と証明書が含まれています。この鍵と証明書は、クラスターへのアクセス時の認証および承認に使用されます。

Secret には、PEM 形式および PKCS #12 形式の秘密鍵と証明書が含まれます。PEM 形式の秘密鍵と証明書を使用する場合、ユーザーは Secret からそれらの秘密鍵と証明書を取得し、Java アプリケーションで使用するために対応するトラストストア (またはキーストア) を生成します。PKCS #12 ストレージは、直接使用できるトラストストア (またはキーストア) を提供します。

すべての鍵のサイズは 2048 ビットです。

13.2.1. PKCS #12 ストレージ

PKCS #12 は、暗号化オブジェクトをパスワードで保護された単一のファイルに格納するためのアーカイブファイル形式 (.p12) を定義します。PKCS #12 を使用して、証明書および鍵を一元的に管理できます。

各 Secret には、PKCS #12 特有のフィールドが含まれています。

  • .p12 フィールドには、証明書と鍵が含まれます。
  • .password フィールドは、アーカイブを保護するパスワードです。

13.2.2. クラスター CA Secret

表13.1 Cluster Operator によって <cluster> で管理されるクラスター CA Secrets

Secret 名Secret 内のフィールド説明

<cluster>-cluster-ca

ca.key

クラスター CA の現在の秘密鍵。

<cluster>-cluster-ca-cert

ca.p12

証明書および鍵を格納するための PKCS #12 アーカイブファイル。

ca.password

PKCS #12 アーカイブのファイルを保護するパスワード。

ca.crt

クラスター CA の現在の証明書。

<cluster>-kafka-brokers

<cluster>-kafka-<num>.p12

証明書および鍵を格納するための PKCS #12 アーカイブファイル。

<cluster>-kafka-<num>.password

PKCS #12 アーカイブのファイルを保護するパスワード。

<cluster>-kafka-<num>.crt

Kafka ブローカー Pod <num> の証明書。<cluster>-cluster-ca で現行または以前のクラスター CA の秘密鍵により署名されます。

<cluster>-kafka-<num>.key

Kafka ブローカー Pod <num> の秘密鍵。

<cluster>-zookeeper-nodes

<cluster>-zookeeper-<num>.p12

証明書および鍵を格納するための PKCS #12 アーカイブファイル。

<cluster>-zookeeper-<num>.password

PKCS #12 アーカイブのファイルを保護するパスワード。

<cluster>-zookeeper-<num>.crt

ZooKeeper ノード <num> の証明書。<cluster>-cluster-ca で現行または以前のクラスター CA の秘密鍵により署名されます。

<cluster>-zookeeper-<num>.key

ZooKeeper Pod <num> の秘密鍵。

<cluster>-entity-operator-certs

entity-operator_.p12

証明書および鍵を格納するための PKCS #12 アーカイブファイル。

entity-operator_.password

PKCS #12 アーカイブのファイルを保護するパスワード。

entity-operator_.crt

Entity Operator と Kafka または ZooKeeper との間の TLS 通信の証明書。<cluster>-cluster-ca で現行または以前のクラスター CA の秘密鍵により署名されます。

entity-operator.key

Entity Operator と Kafka または ZooKeeper との間の TLS 通信の秘密鍵

TLS を介した Kafka ブローカーへの接続時に Kafka ブローカー証明書を検証するため、<cluster>-cluster-ca-cert の CA 証明書は Kafka クライアントアプリケーションによって信頼される必要があります。

注記

クライアントは <cluster>-cluster-ca-cert のみを使用する必要があります。上表の他のすべての Secrets は、AMQ Streams コンポーネントによるアクセスのみが必要です。これは、必要な場合に OpenShift のロールベースのアクセス制御を使用して強制できます。

13.2.3. クライアント CA Secret

表13.2 Cluster Operator によって <cluster> で管理されるクライアント CA Secrets

Secret 名Secret 内のフィールド説明

<cluster>-clients-ca

ca.key

クライアント CA の現在の秘密鍵。

<cluster>-clients-ca-cert

ca.p12

証明書および鍵を格納するための PKCS #12 アーカイブファイル。

ca.password

PKCS #12 アーカイブのファイルを保護するパスワード。

ca.crt

クライアント CA の現在の証明書。

<cluster>-clients-ca-cert の証明書は、Kafka ブローカーが信頼する証明書です。

注記

<cluster>-clients-ca は、クライアントアプリケーションの証明書への署名に使用されます。また AMQ Streams コンポーネントにアクセスできる必要があり、User Operator を使わずにアプリケーション証明書を発行する予定であれば管理者のアクセス権限が必要です。これは、必要な場合に OpenShift のロールベースのアクセス制御を使用して強制できます。

13.2.4. User Secret

表13.3 User Operator によって管理される Secrets

Secret 名Secret 内のフィールド説明

<user>

user.p12

証明書および鍵を格納するための PKCS #12 アーカイブファイル。

user.password

PKCS #12 アーカイブのファイルを保護するパスワード。

user.crt

ユーザーの証明書、クライアント CA により署名されます。

user.key

ユーザーの秘密鍵。

13.3. 証明書の更新

クラスター CA およびクライアント CA の証明書は、限定された期間、すなわち有効期間に限り有効です。通常、この期間は証明書の生成からの日数として定義されます。自動生成される CA 証明書では、有効期間を Kafka.spec.clusterCa.validityDays および Kafka.spec.clientsCa.validityDays で設定できます。デフォルトの有効期間は、両方の証明書で 365 日です。手動でインストールした CA 証明書には、独自の有効期間を定義する必要があります。

CA 証明書の期限が切れると、その証明書をまだ信頼しているコンポーネントおよびクライアントは、その CA 秘密鍵で署名された証明書を持つ相手からの TLS 接続を受け付けません。代わりに、コンポーネントおよびクライアントは 新しい CA 証明書を信頼する必要があります。

サービスを中断せずに CA 証明書を更新できるようにするため、Cluster Operator は古い CA 証明書が期限切れになる前に証明書の更新を開始します。更新期間は、Kafka.spec.clusterCa.renewalDays および Kafka.spec.clientsCa.renewalDays で設定できます (デフォルトは両方とも 30 日)。更新期間は、現在の証明書の有効期日から逆算されます。

Not Before                                     Not After
    |                                              |
    |<--------------- validityDays --------------->|
                              <--- renewalDays --->|

更新期間中の Cluster Operator の動作は、関連する設定が Kafka.spec.clusterCa.generateCertificateAuthority または Kafka.spec.clientsCa.generateCertificateAuthority のいずれかで有効であるかどうかによって異なります。

13.3.1. 生成された CA での更新プロセス

Cluster Operator は以下のプロセスを実行して CA 証明書を更新します。

  1. 新しい CA 証明書を生成しますが、既存の鍵は保持します。該当する Secret 内の ca.crt という名前の古い証明書が新しい証明書に置き換えられます。
  2. 新しいクライアント証明書を生成します (ZooKeeper ノード、Kafka ブローカー、および Entity Operator 用)。署名鍵は変わっておらず、CA 証明書と同期してクライアント証明書の有効期間を維持するため、これは必須ではありません。
  3. ZooKeeper ノードを再起動して、ZooKeeper ノードが新しい CA 証明書を信頼し、新しいクライアント証明書を使用するようにします。
  4. Kafka ブローカーを再起動して、Kafka ブローカーが新しい CA 証明書を信頼し、新しいクライアント証明書を使用するようにします。
  5. Topic Operator および User Operator を再起動して、それらの Operator が新しい CA 証明書を信頼し、新しいクライアント証明書を使用するようにします。

13.3.2. クライアントアプリケーション

Cluster Operator は、Kafka クラスターを使用するクライアントアプリケーションを認識しません。

クラスターに接続し、クライアントアプリケーションが正しく機能するように確認するには、クライアントアプリケーションは以下を行う必要があります。

  • <cluster>-cluster-ca-cert Secret でパブリッシュされるクラスター CA 証明書を信頼する必要があります。
  • <user-name> Secret でパブリッシュされたクレデンシャルを使用してクラスターに接続します。

    User Secret は PEM および PKCS #12 形式のクレデンシャルを提供し、SCRAM-SHA 認証を使用する場合はパスワードを提供できます。ユーザーの作成時に User Operator によってユーザークレデンシャルが生成されます。

同じ OpenShift クラスターおよび namespace 内で実行中のワークロードの場合、Secrets はボリュームとしてマウントできるので、クライアント Pod はそれらのキーストアとトラストストアを現在の状態の Secrets から構築できます。この手順の詳細は、「クラスター CA を信頼する内部クライアントの設定」を参照してください。

13.3.2.1. クライアント証明書の更新

証明書の更新後もクライアントが動作するようにする必要があります。更新プロセスは、クライアントの設定によって異なります。

クライアント証明書と鍵のプロビジョニングを手動で行う場合、新しいクライアント証明書を生成し、更新期間内に新しい証明書がクライアントによって使用されるようにする必要があります。更新期間の終了までにこれが行われないと、クライアントアプリケーションがクラスターに接続できなくなる可能性があります。

13.3.3. CA 証明書の手動更新

Kafka.spec.clusterCa.generateCertificateAuthority および Kafka.spec.clientsCa.generateCertificateAuthority オブジェクトが false に設定されていない限り、クラスターおよびクライアント CA 証明書は、それぞれの証明書の更新期間の開始時に自動で更新されます。セキュリティー上の理由で必要であれば、証明書の更新期間が始まる前に、これらの証明書のいずれかまたは両方を手動で更新できます。更新された証明書は、古い証明書と同じ秘密鍵を使用します。

前提条件

  • Cluster Operator が稼働している必要があります。
  • CA 証明書と秘密鍵がインストールされている Kafka クラスターが必要です。

手順

  • strimzi.io/force-renew アノテーションを、更新対象の CA 証明書が含まれる Secret に適用します。

    証明書Secretannotate コマンド

    クラスター CA

    <cluster-name>-cluster-ca-cert

    oc annotate secret <cluster-name>-cluster-ca-cert strimzi.io/force-renew=true

    クライアント CA

    <cluster-name>-clients-ca-cert

    oc annotate secret <cluster-name>-clients-ca-cert strimzi.io/force-renew=true

次回の調整で、アノテーションを付けた Secret の新規 CA 証明書が Cluster Operator によって生成されます。メンテナンス時間枠が設定されている場合、Cluster Operator によって、最初の調整時に次のメンテナンス時間枠内で新規 CA 証明書が生成されます。

Cluster Operator によって更新されたクラスターおよびクライアント CA 証明書をクライアントアプリケーションがリロードする必要があります。

13.3.4. 独自の CA 証明書の更新

この手順では、以前にインストールした CA 証明書および秘密鍵を更新する方法を説明します。期限切れ間近の CA 証明書を交換するには、更新期間中にこの手順を実施する必要があります。

前提条件

  • Cluster Operator が稼働している必要があります。
  • 以前に独自の CA 証明書と秘密鍵をインストールした Kafka クラスターが必要です。
  • クラスターおよびクライアントの PEM 形式による新しい X.509 証明書と鍵が必要です。これらは、openssl を使用して以下のようなコマンドで生成できます。

    openssl req -x509 -new -days <validity> --nodes -out ca.crt -keyout ca.key

手順

  1. Secret にすでに存在する CA 証明書を確認します。

    以下のコマンドを使用します。

    oc describe secret <ca-cert-secret>
  2. Secret に既存の CA 証明書が含まれるディレクトリーを準備します。

    mkdir new-ca-cert-secret
    cd new-ca-cert-secret

    前のステップで確認した証明書 <ca-certificate> ごとに、以下を実行します。

    # Fetch the existing secret
    oc get secret <ca-cert-secret> -o 'jsonpath={.data.<ca-certificate>}' | base64 -d > <ca-certificate>
  3. 古い ca.crt ファイルの名前を ca_<date>_.crt に変更します。ここで、<date> は証明書の有効期限日に置き換え、<year>-<month>-<day>_T<hour>_-<minute>-_<second>_Z の形式 (たとえば ca-2018-09-27T17-32-00Z.crt) を使用します。

    mv ca.crt ca-$(date -u -d$(openssl x509 -enddate -noout -in ca.crt | sed 's/.*=//') +'%Y-%m-%dT%H-%M-%SZ').crt
  4. 新規 CA 証明書をディレクトリーにコピーし、ca.crt という名前を付けます。

    cp <path-to-new-cert> ca.crt
  5. CA 証明書 Secret を置き換えます (<cluster>-cluster-ca または <cluster>-clients-ca)。これは、以下のコマンドで実行できます。

    # Delete the existing secret
    oc delete secret <ca-cert-secret>
    # Re-create the secret with the new private key
    oc create secret generic <ca-cert-secret> --from-file=.

    これで、作成したディレクトリーを削除できます。

    cd ..
    rm -r new-ca-cert-secret
  6. CA キー Secret を置き換えます (<cluster>-cluster-ca または <cluster>-clients-ca)。これは、以下のコマンドで実行できます。

    # Delete the existing secret
    oc delete secret <ca-key-secret>
    # Re-create the secret with the new private key
    oc create secret generic <ca-key-secret> --from-file=ca.key=<ca-key-file>

13.4. 秘密鍵の交換

クラスター CA およびクライアント CA 証明書によって使用される秘密鍵を交換できます。秘密鍵を交換すると、Cluster Operator は新しい秘密鍵の新規 CA 証明書を生成します。

前提条件

  • Cluster Operator が稼働している必要があります。
  • CA 証明書と秘密鍵がインストールされている Kafka クラスターが必要です。

手順

  • strimzi.io/force-replace アノテーションを、更新対象の秘密鍵が含まれる Secret に適用します。

    秘密鍵Secretannotate コマンド

    クラスター CA

    <cluster-name>-cluster-ca

    oc annotate secret <cluster-name>-cluster-ca strimzi.io/force-replace=true

    クライアント CA

    <cluster-name>-clients-ca

    oc annotate secret <cluster-name>-clients-ca strimzi.io/force-replace=true

次回の調整時に、Cluster Operator は以下を生成します。

  • アノテーションを付けた Secret の新しい秘密鍵
  • 新規 CA 証明書

メンテナンス時間枠が設定されている場合、Cluster Operator によって、最初の調整時に次のメンテナンス時間枠内で新しい秘密鍵と CA 証明書が生成されます。

Cluster Operator によって更新されたクラスターおよびクライアント CA 証明書をクライアントアプリケーションがリロードする必要があります。

13.5. TLS 接続

13.5.1. ZooKeeper の通信

ZooKeeper は TLS 自体をサポートしません。すべての ZooKeeper Pod に TLS サイドカーをデプロイすることで、Cluster Operator はクラスター内の ZooKeeper ノード間でデータ暗号化および認証を提供できるようになります。ZooKeeper は、ループバックインターフェース上で TLS サイドカーのみと通信します。次に TLS サイドカーは、ZooKeeper トラフィック、ZooKeeper Pod 受信時の TLS 復号化データ、および ZooKeeper Pod 送信時の TLS 暗号化データすべてのプロキシーとして動作します。

この TLS 暗号化 stunnel プロキシーは、Kafka リソースに指定された spec.zookeeper.stunnelImage からインスタンス化されます。

13.5.2. Kafka の Interbroker 通信

Kafka ブローカー間の通信は、ポート 9091 の内部リスナーを介して行われます。この通信はデフォルトで暗号化され、Kafka クライアントからはアクセスできません。

Kafka ブローカーと ZooKeeper ノード間の通信では、上記のように TLS サイドカーが使用されます。

13.5.3. Topic Operator および User Operator

Cluster Operator と同様に、Topic Operator および User Operator はそれぞれ ZooKeeper との通信時に TLS サイドカーを使用します。Topic Operator は、ポート 9091 で Kafka ブローカーに接続します。

13.5.4. Kafka クライアント接続

ポート 9093 でリッスンする spec.kafka.listeners.tls リスナーを設定すると、同じ OpenShift クラスター内で稼働している Kafka ブローカーとクライアントとの通信を暗号化できます。

同じ OpenShift クラスターの外部で稼働している Kafka ブローカーとクライアントとの通信を暗号化するには、spec.kafka.listeners.external リスナーを設定します (external リスナーのポートはそのタイプによって異なります)。

注記

暗号化されていないクライアントとブローカーの通信は、ポート 9092 でリッスンする spec.kafka.listeners.plain で設定できます。

13.6. クラスター CA を信頼する内部クライアントの設定

この手順では、OpenShift クラスター内部に存在する Kafka クライアントの設定方法を説明します。ポート 9093 で tls リスナーに接続し、クラスター CA 証明書を信頼するように設定します。

これを内部クライアントで実現するには、ボリュームマウントを使用して、必要な証明書および鍵が含まれる Secrets にアクセスするのが最も簡単な方法です。

以下の手順に従い、クラスター CA によって署名された信頼できる証明書を Java ベースの Kafka Producer、Consumer、および Streams API に設定します。

クラスター CA の証明書の形式が PKCS #12 (.p12) または PEM (.crt) であるかに応じて、手順を選択します。

この手順では、Kafka クラスターの ID を検証する Cluster Secret をクライアント Pod にマウントする方法を説明します。

前提条件

  • Cluster Operator が稼働している必要があります。
  • OpenShift クラスター内に Kafka リソースが必要です。
  • TLS を使用して接続し、クラスター CA 証明書を必ず信頼する Kafka クライアントアプリケーションが、OpenShift クラスター外部に必要です。
  • クライアントアプリケーションが Kafka リソースと同じ namespace で実行している必要があります。

PKCS #12 形式 (.p12) の使用

  1. クライアント Pod の定義時に、Cluster Secret をボリュームとしてマウントします。

    以下に例を示します。

    kind: Pod
    apiVersion: {API_Version}
    metadata:
      name: client-pod
    spec:
      containers:
      - name: client-name
        image: client-name
        volumeMounts:
        - name: secret-volume
          mountPath: /data/p12
        env:
        - name: SECRET_PASSWORD
          valueFrom:
            secretKeyRef:
              name: my-secret
              key: my-password
      volumes:
      - name: secret-volume
        secret:
          secretName: my-cluster-cluster-cert

    ここでは、以下をマウントしています。

    • PKCS #12 ファイルを設定可能な正確なパスにマウント。
    • パスワードを Java 設定に使用できる環境変数にマウント。
  2. Kafka クライアントを以下のプロパティーで設定します。

    • セキュリティープロトコルのオプション:

      • security.protocol: SSL (TLS 認証ありまたはなしで、暗号化に TLS を使用する場合)。
      • security.protocol: SASL_SSL (TLS 経由で SCRAM-SHA 認証を使用する場合)。
    • ssl.truststore.location (証明書がインポートされたトラストストアを指定)。
    • ssl.truststore.password (トラストストアにアクセスするためのパスワードを指定)。
    • ssl.truststore.type=PKCS12 (トラストストアのタイプを識別)。

PEM 形式の使用 (.crt)

  1. クライアント Pod の定義時に、Cluster Secret をボリュームとしてマウントします。

    以下に例を示します。

    kind: Pod
    apiVersion: {API_Version}
    metadata:
      name: client-pod
    spec:
      containers:
      - name: client-name
        image: client-name
        volumeMounts:
        - name: secret-volume
          mountPath: /data/crt
      volumes:
      - name: secret-volume
        secret:
          secretName: my-cluster-cluster-cert
  2. X.509 形式の証明書を使用するクライアントでこの証明書を使用します。

13.7. クラスター CA を信頼する外部クライアントの設定

この手順では、OpenShift クラスター外部に存在する Kafka クライアントの設定方法を説明します。ポート 9094 で external リスナーに接続し、クラスター CA 証明書を信頼するように設定します。クライアントのセットアップ時および更新期間中に、古いクライアント CA 証明書を交換する場合に、以下の手順に従います。

以下の手順に従い、クラスター CA によって署名された信頼できる証明書を Java ベースの Kafka Producer、Consumer、および Streams API に設定します。

クラスター CA の証明書の形式が PKCS #12 (.p12) または PEM (.crt) であるかに応じて、手順を選択します。

この手順では、Kafka クラスターの ID を検証する Cluster Secret から証明書を取得する方法を説明します。

重要

CA 証明書の更新期間中に、<cluster-name>-cluster-ca-cert Secret に複数の CA 証明書が含まれます。クライアントは、それらを すべて をクライアントのトラストストアに追加する必要があります。

前提条件

  • Cluster Operator が稼働している必要があります。
  • OpenShift クラスター内に Kafka リソースが必要です。
  • TLS を使用して接続し、クラスター CA 証明書を必ず信頼する Kafka クライアントアプリケーションが、OpenShift クラスター外部に必要です。

PKCS #12 形式 (.p12) の使用

  1. 生成された <cluster-name>-cluster-ca-cert Secret から、クラスター CA 証明書およびパスワードを抽出します。

    oc get secret <cluster-name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
    oc get secret <cluster-name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
  2. Kafka クライアントを以下のプロパティーで設定します。

    • セキュリティープロトコルのオプション:

      • security.protocol: SSL (TLS 認証ありまたはなしで、暗号化に TLS を使用する場合)。
      • security.protocol: SASL_SSL (TLS 経由で SCRAM-SHA 認証を使用する場合)。
    • ssl.truststore.location (証明書がインポートされたトラストストアを指定)。
    • ssl.truststore.password (トラストストアにアクセスするためのパスワードを指定)。このプロパティーは、トラストストアで必要なければ省略できます。
    • ssl.truststore.type=PKCS12 (トラストストアのタイプを識別)。

PEM 形式の使用 (.crt)

  1. 生成された <cluster-name>-cluster-ca-cert Secret から、クラスター CA 証明書を抽出します。

    oc get secret <cluster-name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
  2. X.509 形式の証明書を使用するクライアントでこの証明書を使用します。

13.8. Kafka リスナー証明書

以下のタイプのリスナーに、独自のサーバー証明書と秘密鍵を指定できます。

  • クラスター間通信の TLS リスナー
  • Kafka クライアントと Kafka ブローカー間の通信に TLS 暗号化が有効になっている外部リスナー (routeloadbalanceringress、および nodeport タイプ)

これらのユーザー提供による証明書は、Kafka リスナー証明書 と呼ばれます。

外部リスナーに Kafka リスナー証明書を提供すると、既存のセキュリティーインフラストラクチャー (所属組織のプライベート CA やパブリック CA など) を利用できます。Kafka クライアントは Kafka ブローカーに接続する際に、クラスター CA またはクライアント CA によって署名された証明書ではなく、Kafka リスナー証明書を使用します。

Kafka リスナー証明書の更新が必要な場合は、手作業で更新する必要があります。

13.8.1. 独自の Kafka リスナー証明書の指定

この手順では、独自の秘密鍵と Kafka リスナー証明書と呼ばれるサーバー証明書を使用するようにリスナーを設定する方法について説明します。

Kafka ブローカーの ID を検証するため、クライアントアプリケーションは CA 公開鍵を信頼できる証明書として使用する必要があります。

前提条件

  • OpenShift クラスターが必要です。
  • Cluster Operator が稼働している必要があります。
  • リスナーごとに、外部 CA によって署名された互換性のあるサーバー証明書が必要です。

    • X.509 証明書を PEM 形式で提供します。
    • リスナーごとに正しい SAN (サブジェクト代替名) を指定します。詳細は「Kafka リスナーのサーバー証明書の SAN」を参照してください。
    • 証明書ファイルに CA チェーン全体が含まれる証明書を提供できます。

手順

  1. 秘密鍵およびサーバー証明書が含まれる Secret を作成します。

    oc create secret generic my-secret --from-file=my-listener-key.key --from-file=my-listener-certificate.crt
  2. クラスターの Kafka リソースを編集します。Secret、証明書ファイル、および秘密鍵ファイルを使用するように、リスナーを configuration.brokerCertChainAndKey プロパティーで設定します。

    TLS 暗号化が有効な loadbalancer 外部リスナーの設定例

    # ...
    listeners:
      plain: {}
      external:
        type: loadbalancer
        configuration:
          brokerCertChainAndKey:
            secretName: my-secret
            certificate: my-listener-certificate.crt
            key: my-listener-key.key
        tls: true
        authentication:
          type: tls
    # ...

    TLS リスナーの設定例

    # ...
    listeners:
      plain: {}
      tls:
        configuration:
          brokerCertChainAndKey:
            secretName: my-secret
            certificate: my-listener-certificate.pem
            key: my-listener-key.key
        authentication:
          type: tls
    # ...

  3. 新しい設定を適用してリソースを作成または更新します。

    oc apply -f kafka.yaml

    Cluster Operator は、Kafka クラスターのローリングアップデートを開始し、これによりリスナーの設定が更新されます。

    注記

    TLS または外部リスナーによってすでに使用されている Secret の Kafka リスナー証明書を更新した場合でも、ローリングアップデートが開始されます。

13.8.2. Kafka リスナーのサーバー証明書の SAN

独自の Kafka リスナー証明書で TLS ホスト名検証を使用するには、リスナーごとに SAN (サブジェクト代替名) を使用する必要があります。証明書の SAN は、以下のホスト名を指定する必要があります。

  • クラスターのすべての Kafka ブローカー
  • Kafka クラスターブートストラップサービス

ワイルドカード証明書は、CA でサポートされれば使用できます。

13.8.2.1. TLS リスナー SAN の例

以下の例を利用して、TLS リスナーの証明書で SAN のホスト名を指定できます。

ワイルドカードの例

//Kafka brokers
*.<cluster-name>-kafka-brokers
*.<cluster-name>-kafka-brokers.<namespace>.svc

// Bootstrap service
<cluster-name>-kafka-bootstrap
<cluster-name>-kafka-bootstrap.<namespace>.svc

ワイルドカードのない例

// Kafka brokers
<cluster-name>-kafka-0.<cluster-name>-kafka-brokers
<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc
<cluster-name>-kafka-1.<cluster-name>-kafka-brokers
<cluster-name>-kafka-1.<cluster-name>-kafka-brokers.<namespace>.svc
# ...

// Bootstrap service
<cluster-name>-kafka-bootstrap
<cluster-name>-kafka-bootstrap.<namespace>.svc

13.8.2.2. 外部リスナー SAN の例

TLS 暗号化が有効になっている外部リスナーの場合、証明書に指定する必要があるホスト名は、外部リスナーの type によって異なります。

外部リスナータイプSAN で指定する内容

Route

すべての Kafka ブローカー Routes のアドレス、およびブートストラップ Route のアドレス。

一致するワイルドカード名を使用できます。

loadbalancer

すべての Kafka ブローカー loadbalancers のアドレス、およびブートストラップ loadbalancer のアドレス。

一致するワイルドカード名を使用できます。

NodePort

Kafka ブローカー Pod がスケジュールされるすべての OpenShift ワーカーノードのアドレス。

一致するワイルドカード名を使用できます。