7.4. Kafka MirrorMaker の設定

MirrorMaker を設定するには、ソースおよびターゲット (宛先) の Kafka クラスターが実行中である必要があります。

従来のバージョンの MirrorMaker のサポートも継続されますが、AMQ Streams で MirrorMaker 2.0 を使用することもできます。

7.4.1. MirrorMaker 2.0 の設定

MirrorMaker 2.0 はソースの Kafka クラスターからメッセージを消費して、ターゲットの Kafka クラスターに書き込みます。

MirrorMaker 2.0 は以下を使用します。

  • ソースクラスターからデータを消費するソースクラスターの設定
  • データをターゲットクラスターに出力するターゲットクラスターの設定

MirrorMaker 2.0 は Kafka Connect フレームワークをベースとし、コネクター によってクラスター間のデータ転送が管理されます。

MirrorMaker 2.0 を設定して、ソースクラスターとターゲットクラスターの接続の詳細を含む Kafka Connect のデプロイメントを定義し、MirrorMaker 2.0 コネクターのセットを実行して接続を確立します。

MirrorMaker 2.0 は、以下のコネクターで設定されます。

MirrorSourceConnector
ソースコネクターは、トピックをソースクラスターからターゲットクラスターにレプリケートします。また、ACL をレプリケートし、MirrorCheckpointConnector を実行する必要があります。
MirrorCheckpointConnector
チェックポイントコネクターは定期的にオフセットを追跡します。有効にすると、ソースクラスターとターゲットクラスター間のコンシューマーグループオフセットも同期されます。
MirrorHeartbeatConnector
ハートビートコネクターは、ソースクラスターとターゲットクラスター間の接続を定期的にチェックします。
注記

User Operator を使用して ACL を管理する場合、コネクターを介した ACL レプリケーションはできません。

ソースクラスターからターゲットクラスターへのデータの ミラーリング プロセスは非同期です。各 MirrorMaker 2.0 インスタンスは、1 つのソースクラスターから 1 つのターゲットクラスターにデータをミラーリングします。複数の MirrorMaker 2.0 インスタンスを使用して、任意の数のクラスター間でデータをミラーリングできます。

図7.1 2 つのクラスターにおけるレプリケーション

MirrorMaker 2.0 のレプリケーション

デフォルトでは、ソースクラスターの新規トピックのチェックは 10 分ごとに行われます。頻度は、refresh.topics.interval.seconds をソースコネクター設定に追加することで変更できます。

7.4.1.1. クラスター設定

active/passive または active/active クラスター設定で MirrorMaker 2.0 を使用できます。

アクティブ/アクティブのクラスター設定
アクティブ/アクティブ設定には、双方向でデータをレプリケートするアクティブなクラスターが 2 つあります。アプリケーションはいずれかのクラスターを使用できます。各クラスターは同じデータを提供できます。これにより、地理的に異なる場所で同じデータを利用できるようにします。コンシューマーグループは両方のクラスターでアクティブであるため、レプリケートされたトピックのコンシューマーオフセットはソースクラスターに同期されません。
アクティブ/パッシブクラスター設定
アクティブ/パッシブ設定には、パッシブクラスターにデータをレプリケートするアクティブクラスターがあります。パッシブクラスターはスタンバイのままになります。システムに障害が発生した場合に、データ復旧にパッシブクラスターを使用できます。

プロデューサーとコンシューマーがアクティブなクラスターのみに接続することを前提とします。MirrorMaker 2.0 クラスターは、ターゲットの宛先ごとに必要です。

7.4.1.2. 双方向レプリケーション (active/active)

MirrorMaker 2.0 アーキテクチャーでは、active/active クラスター設定で双方向レプリケーションがサポートされます。

各クラスターは、source および remote トピックの概念を使用して、別のクラスターのデータをレプリケートします。同じトピックが各クラスターに保存されるため、リモートトピックの名前がソースクラスターを表すように自動的に MirrorMaker 2.0 によって変更されます。元のクラスターの名前の先頭には、トピックの名前が追加されます。

図7.2 トピック名の変更

MirrorMaker 2.0 双方向アーキテクチャー

ソースクラスターにフラグを付けると、トピックはそのクラスターにレプリケートされません。

remote トピックを介したレプリケーションの概念は、データの集約が必要なアーキテクチャーの設定に役立ちます。コンシューマーは、同じクラスター内でソースおよびリモートトピックにサブスクライブできます。これに個別の集約クラスターは必要ありません。

7.4.1.3. 一方向レプリケーション (active/passive)

MirrorMaker 2.0 アーキテクチャーでは、active/passive クラスター設定でー方向レプリケーションがサポートされます。

active/passive のクラスター設定を使用してバックアップを作成したり、データを別のクラスターに移行したりできます。このような場合には、リモートトピックの名前を自動的に変更させないように指定することがあります。

IdentityReplicationPolicy をソースコネクター設定に追加することで、名前の自動変更をオーバーライドできます。この設定が適用されると、トピックには元の名前が保持されます。

MirrorMaker 2.0 設定の YAML の例
  apiVersion: kafka.strimzi.io/v1beta2
  kind: KafkaMirrorMaker2
  metadata:
    name: my-mirror-maker2
    spec:
      version: 3.3.1
      connectCluster: "my-cluster-target"
      clusters:
      - alias: "my-cluster-source"
        bootstrapServers: my-cluster-source-kafka-bootstrap:9092
      - alias: "my-cluster-target"
        bootstrapServers: my-cluster-target-kafka-bootstrap:9092
      mirrors:
      - sourceCluster: "my-cluster-source"
        targetCluster: "my-cluster-target"
        sourceConnector: {}
      topicsPattern: ".*"
      groupsPattern: "group1|group2|group3"

7.4.2. MirrorMaker の設定

従来のバージョンの MirrorMaker では、プロデューサーとコンシューマーを使用して、クラスターにまたがってデータをレプリケートします。

MirrorMaker は以下を使用します。

  • ソースクラスターからデータを使用するコンシューマーの設定。
  • データをターゲットクラスターに出力するプロデューサーの設定。

コンシューマーおよびプロデューサー設定には、認証および暗号化設定が含まれます。

include フィールドは、ソースからターゲットクラスターにミラーリングするトピックを定義します。

主なコンシューマー設定
コンシューマーグループ ID
使用するメッセージがコンシューマーグループに割り当てられるようにするための MirrorMaker コンシューマーのコンシューマーグループ ID。
コンシューマーストリームの数
メッセージを並行して使用するコンシューマーグループ内のコンシューマー数を決定する値。
オフセットコミットの間隔
メッセージの使用とメッセージのコミットの期間を設定するオフセットコミットの間隔。
キープロデューサーの設定
送信失敗のキャンセルオプション
メッセージ送信の失敗を無視するか、または MirrorMaker を終了して再作成するかを定義できます。
MirrorMaker 設定の YAML 例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker
metadata:
  name: my-mirror-maker
spec:
  # ...
  consumer:
    bootstrapServers: my-source-cluster-kafka-bootstrap:9092
    groupId: "my-group"
    numStreams: 2
    offsetCommitInterval: 120000
    # ...
  producer:
    # ...
    abortOnSendFailure: false
    # ...
  include: "my-topic|other-topic"
  # ...