9.4. MirrorMaker 2.0 を使用した Kafka クラスター間でのデータの同期

MirrorMaker 2.0 を使用して、設定を介して Kafka クラスター間のデータを同期します。

従来のモードで MirrorMaker 2.0 を実行すると、以前のバージョンの MirrorMaker は引き続きサポートされます。

設定では以下を指定する必要があります。

  • 各 Kafka クラスター
  • TLS 認証を含む各クラスターの接続情報
  • レプリケーションのフローおよび方向

    • クラスター対クラスター
    • トピック対トピック
  • レプリケーションルール
  • コミットされたオフセット追跡間隔

この手順では、プロパティーファイルに設定を作成し、MirrorMaker スクリプトファイルを使用して接続を設定するときにプロパティーを渡すことで、MirrorMaker 2.0 を実装する方法を説明します。

注記

MirrorMaker 2.0 は Kafka Connect を使用して接続を行い、クラスター間のデータを転送します。Kafka は、データレプリケーションに MirrorMaker シンクおよびソースコネクターを提供します。MirrorMaker スクリプトの代わりにコネクターを使用する場合は、コネクターを Kafka Connect クラスターで設定する必要があります。詳細は Apache Kafka のドキュメント を参照してください

作業を始める前に

設定例は ./config/connect-mirror-maker.properties にあります。

前提条件

  • 複製する各 Kafka クラスターノードのホストに AMQ Streams がインストールされている必要があります。

手順

  1. テキストエディターでサンプルプロパティーファイルを開くか、新しいプロパティーを作成してから、ファイルを編集して、各 Kafka クラスターに対して接続情報とレプリケーションフローを追加します。

    以下の例は、cluster-1 および cluster -2 の 2 つのクラスターに接続する設定を示しています。クラスター名は clusters プロパティーを使用して設定できます。

    clusters=cluster-1,cluster-2 1
    
    cluster-1.bootstrap.servers=CLUSTER-NAME-kafka-bootstrap-PROJECT-NAME:443 2
    cluster-1.security.protocol=SSL 3
    cluster-1.ssl.truststore.password=TRUSTSTORE-NAME
    cluster-1.ssl.truststore.location=PATH-TO-TRUSTSTORE/truststore.cluster-1.jks
    cluster-1.ssl.keystore.password=KEYSTORE-NAME
    cluster-1.ssl.keystore.location=PATH-TO-KEYSTORE/user.cluster-1.p12_
    
    cluster-2.bootstrap.servers=CLUSTER-NAME-kafka-bootstrap-<my-project>:443 4
    cluster-2.security.protocol=SSL 5
    cluster-2.ssl.truststore.password=TRUSTSTORE-NAME
    cluster-2.ssl.truststore.location=PATH-TO-TRUSTSTORE/truststore.cluster-2.jks_
    cluster-2.ssl.keystore.password=KEYSTORE-NAME
    cluster-2.ssl.keystore.location=PATH-TO-KEYSTORE/user.cluster-2.p12_
    
    cluster-1->cluster-2.enabled=true 6
    cluster-1->cluster-2.topics=.* 7
    cluster-2->cluster-1.enabled=true 8
    cluster-2->cluster-1B->C.topics=.* 9
    
    replication.policy.separator=- 10
    sync.topic.acls.enabled=false 11
    refresh.topics.interval.seconds=60 12
    refresh.groups.interval.seconds=60 13
    1
    各 Kafka クラスターはそのエイリアスで識別されます。
    2
    ブートストラップアドレスおよびポート 443 を使用する cluster-1 の接続情報。両方のクラスターがポート 443 を使用して、OpenShift ルートを使用して Kafka に接続します
    3
    ssl. プロパティーは、cluster-1 の TLS 設定を定義します。
    4
    cluster-2 の接続情報
    5
    ssl. プロパティーは、cluster-2 の TLS 設定を定義します。
    6
    cluster-1 クラスターから cluster -2 クラスターへのレプリケーションフロー。
    7
    cluster-1 クラスターから cluster -2 クラスターにすべてのトピックを複製します。
    8
    クラスター-2 クラスターから cluster -1 クラスターへのレプリケーションフロー。
    9
    cluster-2 クラスターから cluster -1 クラスターに特定のトピックを複製します。
    10
    リモートトピック名の変更に使用する区切り文字を定義します。
    11
    有効にすると、同期されたトピックに ACL が適用されます。デフォルトは false です。
    12
    新規トピックが同期するチェックの間隔。
    13
    新規コンシューマーグループの同期をチェックする間隔。
  2. オプション: 必要な場合は、リモートトピックの名前の自動変更を上書きするポリシーを追加します。その名前の前にソースクラスターの名前を追加する代わりに、トピックが元の名前を保持します。

    このオプションの設定は、active/passive バックアップおよびデータ移行に使用されます。

    replication.policy.class=io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy
  3. オプション: コンシューマーグループオフセットを同期する場合は、設定を追加して同期を有効にし、管理します。

    refresh.groups.interval.seconds=60
    sync.group.offsets.enabled=true 1
    sync.group.offsets.interval.seconds=60 2
    emit.checkpoints.interval.seconds=60 3
    1
    コンシューマーグループのオフセットを同期する任意設定。これは、active/passive 設定でのリカバリーに便利です。同期はデフォルトでは有効になっていません。
    2
    コンシューマーグループオフセットの同期が有効な場合は、同期の頻度を調整できます。
    3
    オフセット追跡のチェック頻度を調整します。オフセット同期の頻度を変更する場合、これらのチェックの頻度も調整する必要がある場合があります。
  4. ターゲットクラスターで ZooKeeper および Kafka を起動します。

    su - kafka
    /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  5. プロパティーファイルで定義したクラスター接続設定およびレプリケーションポリシーで MirrorMaker を起動します。

    /opt/kafka/bin/connect-mirror-maker.sh /config/connect-mirror-maker.properties

    MirrorMaker はクラスター間の接続を設定します。

  6. 各ターゲットクラスターについて、トピックがレプリケートされていることを確認します。

    /bin/kafka-topics.sh --bootstrap-server <BrokerAddress> --list