10.4. MirrorMaker 2.0 を使用した Kafka クラスター間でのデータの同期

MirrorMaker 2.0 を使用して、設定を介して Kafka クラスター間のデータを同期します。

従来のモードで MirrorMaker 2.0 を実行すると、以前のバージョンの MirrorMaker は継続してサポートされます。

設定では以下を指定する必要があります。

  • 各 Kafka クラスター
  • TLS 認証を含む各クラスターの接続情報
  • レプリケーションのフローおよび方向

    • クラスター対クラスター
    • トピック対トピック
  • レプリケーションルール
  • コミットされたオフセット追跡間隔

この手順では、プロパティーファイルの設定を作成し、MirrorMaker スクリプトファイルを使用して接続を設定するときにプロパティーを渡すことで、MirrorMaker 2.0 を実装する方法を説明します。

注記

MirrorMaker 2.0 は、Kafka Connect を使用してクラスター間のデータ転送を行います。Kafka は、データレプリケーションの MirrorMaker シンクおよびソースコネクターを提供します。MirrorMaker スクリプトの代わりにコネクターを使用する場合は、Kafka Connect クラスターでコネクターを設定する必要があります。詳細は Apache Kafka のドキュメントを参照してください

作業を始める前に

設定ファイルのサンプルは、./config/connect-mirror-maker.properties にあります。

前提条件

  • レプリケートしている各 Kafka クラスターノードのホストに AMQ Streams がインストールされている必要があります。

手順

  1. テキストエディターでサンプルファイルを開くか、新しいプロパティーファイルを作成し、そのファイルを編集して、各 Kafka クラスターの接続情報およびレプリケーションフローを追加します。

    以下の例は、cluster-1 と cluster- 2 の 2 つのクラスター を接続する設定を示しています。クラスター名は clusters プロパティーで設定可能です。

    clusters=cluster-1,cluster-2 1
    
    cluster-1.bootstrap.servers=CLUSTER-NAME-kafka-bootstrap-PROJECT-NAME:443 2
    cluster-1.security.protocol=SSL 3
    cluster-1.ssl.truststore.password=TRUSTSTORE-NAME
    cluster-1.ssl.truststore.location=PATH-TO-TRUSTSTORE/truststore.cluster-1.jks
    cluster-1.ssl.keystore.password=KEYSTORE-NAME
    cluster-1.ssl.keystore.location=PATH-TO-KEYSTORE/user.cluster-1.p12_
    
    cluster-2.bootstrap.servers=CLUSTER-NAME-kafka-bootstrap-<my-project>:443 4
    cluster-2.security.protocol=SSL 5
    cluster-2.ssl.truststore.password=TRUSTSTORE-NAME
    cluster-2.ssl.truststore.location=PATH-TO-TRUSTSTORE/truststore.cluster-2.jks_
    cluster-2.ssl.keystore.password=KEYSTORE-NAME
    cluster-2.ssl.keystore.location=PATH-TO-KEYSTORE/user.cluster-2.p12_
    
    cluster-1->cluster-2.enabled=true 6
    cluster-1->cluster-2.topics=.* 7
    cluster-2->cluster-1.enabled=true 8
    cluster-2->cluster-1B->C.topics=.* 9
    
    replication.policy.separator=- 10
    sync.topic.acls.enabled=false 11
    refresh.topics.interval.seconds=60 12
    refresh.groups.interval.seconds=60 13
    1
    各 Kafka クラスターはそのエイリアスで識別されます。
    2
    ブートストラップアドレス およびポート 443 を使用した cluster-1 の接続情報。どちらのクラスターもポート 443 を使用して、OpenShift Routes を使用して Kafka に接続します。
    3
    ssl. プロパティーは、cluster-1 の TLS 設定を定義します。
    4
    cluster-2 の接続情報
    5
    ssl. プロパティーは、cluster-2 の TLS 設定を定義します。
    6
    cluster-1 クラスターから cluster- 2 クラスターへのレプリケーションフローが有効になっている。
    7
    cluster-1 クラスターのすべてのトピックを cluster- 2 クラスター に複製します。
    8
    cluster-2 クラスターから cluster- 1 クラスターへのレプリケーションフローが有効になっている。
    9
    cluster-2 クラスターから特定のトピックを cluster- 1 クラスター に複製します。
    10
    リモートトピック名の変更に使用する区切り文字を定義します。
    11
    有効にすると、同期されたトピックに ACL が適用されます。デフォルトは false です。
    12
    新しいトピックを同期させる間隔。
    13
    同期する新規コンシューマーグループをチェックする間隔。
  2. オプション: 必要な場合は、リモートトピックの名前の自動名前を上書きするポリシーを追加します。その名前の前にソースクラスターの名前を追加する代わりに、トピックが元の名前を保持します。

    このオプションの設定は、active/passive バックアップおよびデータ移行に使用されます。

    replication.policy.class=io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy
  3. オプション: コンシューマーグループのオフセットを同期する場合は、設定を追加して同期を有効にします。

    refresh.groups.interval.seconds=60
    sync.group.offsets.enabled=true 1
    sync.group.offsets.interval.seconds=60 2
    emit.checkpoints.interval.seconds=60 3
    1
    コンシューマーグループのオフセットを同期する任意設定。これは、active/passive 設定でのリカバリーに便利です。同期はデフォルトでは有効になっていません。
    2
    コンシューマーグループオフセットの同期が有効な場合は、同期の頻度を調整できます。
    3
    オフセット追跡のチェック頻度を調整します。オフセット同期の頻度を変更する場合、これらのチェックの頻度も調整する必要がある場合があります。
  4. ターゲットクラスターで ZooKeeper および Kafka を起動します。

    su - kafka
    /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  5. クラスター接続設定と、プロパティーファイルで定義したレプリケーションポリシーで MirrorMaker を起動します。

    /opt/kafka/bin/connect-mirror-maker.sh /config/connect-mirror-maker.properties

    MirrorMaker はクラスター間の接続を設定します。

  6. 各ターゲットクラスターに対して、トピックが複製されていることを確認します。

    /bin/kafka-topics.sh --bootstrap-server <BrokerAddress> --list