第6章 クライアント設定のチューニング

設定プロパティーを使用して、Kafka プロデューサーおよびコンシューマーのパフォーマンスを最適化します。

最小セットの設定プロパティーが必要ですが、プロパティーを追加または調整して、プロデューサーとコンシューマーが Kafka と対話する方法を変更できます。たとえば、プロデューサーの場合は、クライアントがリアルタイムでデータに応答できるように、メッセージのレイテンシーおよびスループットをチューニングできます。また、設定を変更して、より強力にメッセージの持続性を保証することもできます。

クライアントメトリックを分析して初期設定を行う場所を判断することから始め、必要な設定になるまで段階的に変更を加え、さらに比較を行うことができます。

6.1. Kafka プロデューサー設定のチューニング

特定のユースケースに合わせて調整されたオプションのプロパティーとともに、基本的なプロデューサー設定を使用します。

設定を調整してスループットを最大化すると、レイテンシーが増加する可能性があり、その逆も同様です。必要なバランスを取得するために、プロデューサー設定を実験して調整する必要があります。

6.1.1. 基本のプロデューサー設定

接続およびシリアライザープロパティーはすべてのプロデューサーに必要です。通常、追跡用のクライアント ID を追加し、プロデューサーで圧縮してリクエストのバッチサイズを減らすことが推奨されます。

基本的なプロデューサー設定には以下が含まれます。

  • パーティション内のメッセージの順序は保証されません。
  • ブローカーに到達するメッセージの完了通知は持続性を保証しません。
# ...
bootstrap.servers=localhost:9092 1
key.serializer=org.apache.kafka.common.serialization.StringSerializer 2
value.serializer=org.apache.kafka.common.serialization.StringSerializer 3
client.id=my-client 4
compression.type=gzip 5
# ...
1
(必須) Kafka ブローカーの host:port ブートストラップサーバーアドレスを使用して Kafka クラスターに接続するようプロデューサーを指示します。プロデューサーはアドレスを使用して、クラスター内のすべてのブローカーを検出し、接続します。サーバーがダウンした場合に備えて、コンマ区切りリストを使用して 2 つまたは 3 つのアドレスを指定しますが、クラスター内のすべてのブローカーのリストを提供する必要はありません。
2
(必須) メッセージがブローカーに送信される前に、各メッセージの鍵をバイトに変換するシリアライザー。
3
(必須) メッセージがブローカーに送信される前に、各メッセージの値をバイトに変換するシリアライザー。
4
(任意) クライアントの論理名。リクエストのソースを特定するためにログおよびメトリクスで使用されます。
5
(任意) メッセージを圧縮するコーデック。これは、送信され、圧縮された形式で格納された後、コンシューマーへの到達時に圧縮解除される可能性があります。圧縮はスループットを改善し、ストレージの負荷を減らすのに役立ちますが、圧縮や圧縮解除のコストが異常に高い低レイテンシーのアプリケーションには不適切である場合があります。

6.1.2. データの持続性

メッセージ配信の完了通知を使用して、データの持続性を適用し、メッセージが失われる可能性を最小限に抑えることができます。

# ...
acks=all 1
# ...
1
acks=all と指定すると、パーティションリーダーは、メッセージリクエストが正常に受信されたことを確認する前に、特定数のフォロワーに対してメッセージをレプリケートすることを強制されます。acks=all の追加のチェックにより、プルデューサーがメッセージを送信してから完了通知を受信するまでのレイテンシーが増加します。

完了通知がプロデューサーに送信される前にメッセージをログに追加する必要のあるブローカーの数は、トピックの min.insync.replicas 設定によって決定されます。最初に、トピックレプリケーション係数を 3 にし、他のブローカーの In-Sync レプリカを 2 にするのが一般的です。この設定では、単一のブローカーが利用できない場合でもプロデューサーは影響を受けません。2 番目のブローカーが利用できなくなると、プロデューサーは完了通知を受信せず、それ以上のメッセージを生成できなくなります。

acks=all をサポートするトピック設定

# ...
min.insync.replicas=2 1
# ...

1
2 In-Sync レプリカを使用します。デフォルトは 1 です。
注記

システムに障害が発生すると、バッファーの未送信データが失われる可能性があります。

6.1.3. 順序付き配信

メッセージは 1 度だけ配信されるため、べき等プロデューサーは重複を回避します。障害発生時でも配信の順序が維持されるように、ID とシーケンス番号がメッセージに割り当てられます。データの一貫性を維持するために acks=all を使用している場合は、順序付き配信にべき等を有効にするのは妥当です。

べき等を使った順序付き配信

# ...
enable.idempotence=true 1
max.in.flight.requests.per.connection=5 2
acks=all 3
retries=2147483647 4
# ...

1
true を設定してべき等プロデューサーを有効にします。
2
べき等配信では、インフライトリクエストの数が 1 を越えることがありますがメッセージの順序は維持されます。デフォルトのインフライトリクエストの数は 5 です。
3
acksall に設定します。
4
失敗したメッセージリクエストを再送信する試行回数を設定します。

パフォーマンスコストが原因で acks=all およびべき等を使用しない場合は、インフライト (完了確認されない) リクエストの数を 1 に設定して、順序を保持します。そうしないと、Message-A が失敗し、Message-B がブローカーに書き込まれた後にのみ成功する可能性があります。

べき等を使用しない順序付け配信

# ...
enable.idempotence=false 1
max.in.flight.requests.per.connection=1 2
retries=2147483647
# ...

1
false を設定して、べき等プロデューサーを無効にします。
2
インフライトリクエストの数を確実に 1 に設定します。

6.1.4. 信頼性の保証

べき等は、1 つのパーティションへの書き込みを 1 回だけ行う場合に便利です。トランザクションをべき等と使用すると、複数のパーティション全体で 1 度だけ書き込みを行うことができます。

トランザクションは、同じトランザクション ID を使用するメッセージが 1 度作成され、すべてがそれぞれのログに書き込まれるか、何も書き込まれないかのどちらかになることを保証します。

# ...
enable.idempotence=true
max.in.flight.requests.per.connection=5
acks=all
retries=2147483647
transactional.id=UNIQUE-ID 1
transaction.timeout.ms=900000 2
# ...
1
一意のトランザクション ID を指定します。
2
タイムアウトエラーが返されるまでのトランザクションの最大許容時間 (ミリ秒単位) を設定します。デフォルトは 900000 (15 分) です。

トランザクションの保証を維持するには、transactional.id の選択が重要になります。トランザクション ID は、一意なトピックパーティションセットに使用する必要があります。たとえば、トピックパーティション名からトランザクション ID への外部マッピングを使用したり、競合を回避する関数を使用してトピックパーティション名からトランザクション IDを算出したりすると、これを実現できます。

6.1.5. スループットおよびレイテンシーの最適化

通常、システムの要件は、指定のレイテンシー内であるメッセージの割合に対して、特定のスループットのターゲットを達成することです。たとえば、95 % のメッセージが 2 秒以内に完了確認される、1 秒あたり 500,000 個のメッセージをターゲットとします。

プロデューサーのメッセージングセマンティック (メッセージの順序付けと持続性) は、アプリケーションの要件によって定義される可能性があります。たとえば、アプリケーションによって提供される重要なプロパティーや保証に反することなく、acks=0 または acks=1 を使用するオプションがない可能性があります。

ブローカーの再起動は、パーセンタイルの高いの統計に大きく影響します。たとえば、長期間では、99% のレイテンシーはブローカーの再起動に関する動作によるものです。これは、ベンチマークを設計したり、本番環境のパフォーマンスで得られた数字を使ってベンチマークを行い、そのパフォーマンスの数字を比較したりする場合に検討する価値があります。

目的に応じて、Kafka はスループットとレイテンシーのプロデューサーパフォーマンスを調整するために多くの設定パラメーターと設定方法を提供します。

メッセージのバッチ処理 (linger.ms および batch.size)
メッセージのバッチ処理では、同じブローカー宛のメッセージをより多く送信するために、メッセージの送信を遅らせ、単一の生成リクエストでバッチ処理できるようにします。バッチ処理では、スループットを増やすためにレイテンシーを長くして妥協します。時間ベースのバッチ処理は linger.ms を使用して設定され、サイズベースのバッチ処理は batch.size を使用して設定されます。
圧縮処理 (compression.type)
メッセージ圧縮処理により、プロデューサー (メッセージの圧縮に費やされた CPU 時間) のレイテンシーが追加されますが、リクエスト (および場合によってはディスクの書き込み) を小さくするため、スループットが増加します。圧縮に価値があるかどうか、および使用に最適な圧縮は、送信されるメッセージによって異なります。圧縮処理は KafkaProducer.send() を呼び出すスレッドで発生するため、アプリケーションでこの方法のレイテンシーが問題になる場合は、より多くのスレッドを使用するよう検討してください。
パイプライン処理 (max.in.flight.requests.per.connection)
パイプライン処理は、以前のリクエストへの応答を受け取る前により多くのリクエストを送信します。通常、パイプライン処理を増やすと、バッチ処理の悪化などの別の問題がスループットに悪影響を与え始めるしきい値まではスループットが増加します。

レイテンシーの短縮

アプリケーションが KafkaProducer.send() を呼び出す場合、メッセージには以下が行われます。

  • インターセプターによる処理。
  • シリアライズ。
  • パーティションへの割り当て。
  • 圧縮処理。
  • パーティションごとのキューでメッセージのバッチに追加。

ここで、send() メソッドが返されます。そのため、send() がブロックされる時間は、以下によって決定されます。

  • インターセプター、シリアライザー、およびパーティションヤーで費やされた時間。
  • 使用される圧縮アルゴリズム。
  • 圧縮に使用するバッファーの待機に費やされた時間。

バッチは、以下のいずれかが行われるまでキューに残ります。

  • バッチが満杯になる (batch.size による)。
  • linger.ms によって導入された遅延が経過。
  • 送信者は他のパーティションのメッセージバッチを同じブローカーに送信しようとし、このバッチの追加も可能。
  • プロデューサーがフラッシュまたは閉じられる。

バッチ処理とバッファーの設定を参照して、レイテンシーをブロックする send() の影響を軽減します。

# ...
linger.ms=100 1
batch.size=16384 2
buffer.memory=33554432 3
# ...
1
linger プロパティーは、メッセージの大きなバッチが累積され、リクエストで送信されるように、ミリ秒単位の遅延を追加します。デフォルトは 0'. です。
2
最大 batch.size (バイト単位) が使用された場合、その最大値に達したとき、またはメッセージが linger.ms よりも長い期間キューに置かれたとき (いずれか早く発生した方) にリクエストが送信されます。遅延を追加すると、メッセージをバッチサイズまで累積できます。
3
バッファーサイズは、少なくともバッチサイズと同じ大きさである必要があり、バッファー、圧縮、およびインフライトリクエストに対応できる必要があります

スループットの増加

メッセージの配信および送信リクエストの完了までの最大待機時間を調整して、メッセージリクエストのスループットを向上します。

また、カスタムパーティションを作成してデフォルトを置き換えることで、メッセージを指定のパーティションに転送することもできます。

# ...
delivery.timeout.ms=120000 1
partitioner.class=my-custom-partitioner 2

# ...
1
送信リクエストの完了まで待機する最大時間 (ミリ秒単位)。この値を MAX_LONG に設定すると、Kafka に回数無制限の再試行を委譲できます。デフォルトは 120000 (2 分) です。
2
カスタムパーティショナーのクラス名を指定します。