7.3. Kafka クラスターの設定

Kafka クラスターは、1 つまたは複数のブローカーで設定されます。プロデューサーおよびコンシューマーがブローカー内のトピックにアクセスできるようにするには、Kafka 設定でクラスターへのデータの保存方法、およびデータへのアクセス方法を定義する必要があります。ラック全体で複数のブローカーノードを使用して Kafka クラスターを実行するように設定できます。

ストレージ

Kafka および ZooKeeper は、ディスクにデータを格納します。

AMQ Streams は、StorageClass でプロビジョニングされるブロックストレージが必要です。ストレージ用のファイルシステム形式は XFS または EXT4 である必要があります。3 種類のデータストレージがサポートされます。

一時データストレージ (開発用のみで推奨されます)
一時ストレージは、インスタンスの有効期間についてのデータを格納します。インスタンスを再起動すると、データは失われます。
永続ストレージ
永続ストレージは、インスタンスのライフサイクルとは関係なく長期のデータストレージに関連付けられます。
JBOD (Just a Bunch of Disks、Kafka のみに適しています)
JBOD では、複数のディスクを使用して各ブローカーにコミットログを保存できます。

既存の Kafka クラスターが使用するディスク容量は、増やすことができます (インフラストラクチャーでサポートされる場合)。

リスナー

リスナーは、クライアントが Kafka クラスターに接続する方法を設定します。

Kafka クラスター内の各リスナーに一意の名前とポートを指定することで、複数のリスナーを設定できます。

以下のタイプのリスナーがサポートされます。

  • OpenShift 内でのアクセスに使用する 内部リスナー
  • OpenShift 外からアクセスするときに使用する 外部リスナー

リスナーの TLS 暗号化を有効にし、認証 を設定できます。

内部リスナーは、internal 型を指定して Kafka を公開します。

  • 同じ OpenShift クラスター内で接続する internal
  • ブローカーごとの ClusterIP サービスを使用して Kafka を公開する cluster-ip

外部リスナーは、外部用 type を指定して Kafka を公開します。

  • OpenShift ルートおよびデフォルトの HAProxy ルーターを使用する route
  • ロードバランサーサービスを使用する loadbalancer
  • OpenShift ノードのポートを使用する nodeport
  • OpenShift Ingress および Ingress NGINX Controller for Kubernetes を使用する ingress
注記

cluster-ip タイプを使用すると、独自のアクセスメカニズムを追加できます。たとえば、カスタム Ingress コントローラーまたは OpenShift Gateway API でリスナーを使用できます。

トークンベースの認証に OAuth 2.0 を使用している場合は、リスナーが承認サーバーを使用するように設定できます。

ラックアウェアネス
ラックは、データセンター、データセンター内のラック、または可用性ゾーンを表します。Kafka ブローカー Pod とトピックレプリカをラック全体に分散するようにラック認識を設定します。rack プロパティーを使用してラック認識を有効にし、topologyKey を指定します。topologyKey は、ラックを識別する OpenShift ワーカーノードに割り当てられたラベルの名前です。AMQ Streams は、各 Kafka ブローカーにラック ID を割り当てます。Kafka ブローカーは ID を使用して、パーティションのレプリカをラック全体に分散させます。ラック認識で使用する RackAwareReplicaSelector セレクタープラグインを指定することもできます。プラグインはブローカーとコンシューマーのラック ID を照合するため、メッセージは最も近いレプリカから消費されます。プラグインを使用するには、コンシューマーもラック認識を有効にする必要があります。Kafka Connect、MirrorMaker 2.0、および Kafka Bridge でラック認識を有効にできます。

Kafka 設定の YAML 例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external1
        port: 9094
        type: route
        tls: true
        authentication:
          type: tls
    # ...
    storage:
      type: persistent-claim
      size: 10000Gi
    # ...
    rack:
      topologyKey: topology.kubernetes.io/zone
    config:
      replica.selector.class: org.apache.kafka.common.replica.RackAwareReplicaSelector
    # ...