3.9. 大きなメッセージサイズの処理
メッセージのデフォルトのバッチサイズは 1MB で、ほとんどのユースケースで最大のスループットを得るのに最適です。Kafka は、十分なディスク容量があれば、スループットを下げてより大きなバッチに対応できます。
大きなメッセージサイズは、以下の 4 つの方法で処理されます。
- プロデューサー側のメッセージ圧縮 が、圧縮メッセージをログに書き込みます。
- 参照ベースのメッセージングが、メッセージの値で他のシステムに格納されているデータへの参照のみを送信します。
- インラインメッセージングが、同じキーを使用するチャンクにメッセージを分割し、これらを Kafka Streams などのストリームプロセッサーを使用して、出力に組み合わせます。
- ブローカーおよびプロデューサー/コンシューマークライアントアプリケーションが、より大きなメッセージサイズを処理するように構築されます。
リファレンスベースのメッセージングおよびメッセージ圧縮オプションが推奨されます。これはほとんどの状況に対応します。これらのオプションのいずれかを使用する場合は、パフォーマンスの問題が発生しないように注意する必要があります。
プロデューサー側の圧縮
プロデューサー設定の場合は、Gzip などの compression.type を指定します。これは、プロデューサーによって生成されたデータのバッチに適用されます。ブローカー設定の compression.type=producer を使用すると、ブローカーは使用されるプロデューサーを圧縮します。プロデューサーとトピックの圧縮が一致しない場合は常に、ブローカーはバッチをログに追加する前に再度圧縮する必要があります。これはブローカーのパフォーマンスに影響を与えます。
圧縮はまた、プロデューサーに追加の処理オーバーヘッドを追加し、コンシューマーに解凍オーバーヘッドを追加しますが、バッチにより多くのデータが含まれるため、メッセージデータが適切に圧縮される場合、スループットに役立つことがよくあります。
プロデューサー側の圧縮とバッチサイズの微調整を組み合わせて、最適なスループットを促進します。メトリクスを使用すると、必要な平均バッチサイズの測定に役立ちます。
参照ベースのメッセージング
参照ベースのメッセージングは、メッセージの大きさがわからない場合のデータ複製に役立ちます。この設定が機能するには、外部データストアは高速で永続性があり、高可用性である必要があります。データはデータストアに書き込まれ、データへの参照が返されます。プロデューサーは、Kafka への参照が含まれるメッセージを送信します。コンシューマーはメッセージから参照を取得し、これを使用してデータストアからデータを取得します。
図3.4 参照ベースのメッセージングフロー

メッセージを渡すにはより多くの通信が必要なため、エンドツーエンドのレイテンシーが増加します。このアプローチのもう 1 つの重大な欠点は、Kafka メッセージがクリーンアップされたときに、外部システムのデータが自動的にクリーンアップされないことです。ハイブリッドアプローチは、大きなメッセージのみをデータストアに送信し、標準サイズのメッセージを直接処理することです。
インラインメッセージング
インラインメッセージングは複雑ですが、参照ベースのメッセージングのように外部システムに依存するオーバーヘッドはありません。
メッセージが大きすぎる場合、生成するクライアントアプリケーションは、データをシリアライズしてからチャンクにする必要があります。その後、プロデューサーは Kafka ByteArraySerializer を使用し、送信前に各チャンクを再度シリアライズするのと同様のものを使用します。コンシューマーはメッセージを追跡し、完全なメッセージが得られるまでチャンクをバッファーリングします。消費側のクライアントアプリケーションは、デシリアライズの前にアセンブルされたチャンクを受け取ります。完全なメッセージは、チャンクになったメッセージの各セットの最初または最後のチャンクのオフセットに従って、消費する残りのアプリケーションに配信されます。リバランス中の重複を避けるために、完全なメッセージの正常な配信がオフセットメタデータと照合されます。
図3.5 インラインメッセージングフロー

インラインメッセージングは、特に一連の大きなメッセージを並行して処理する場合に必要なバッファーリングのために、コンシューマー側でパフォーマンスのオーバーヘッドが発生します。大きなメッセージのチャンクはインターリーブされる可能性があるため、バッファー内の別の大きなメッセージのチャンクが不完全な場合、メッセージのすべてのチャンクが消費されたときにコミットできるとは限りません。このため、バッファーリングは通常、メッセージチャンクを永続化するか、コミットロジックを実装することでサポートされます。
より大きなメッセージを処理するための設定
より大きなメッセージを回避できない場合、およびメッセージフローの任意の時点でブロックを回避するために、メッセージ制限を増やすことができます。これを行うには、トピックレベルで message.max.bytes を設定し、個別のトピックの最大レコードバッチサイズを設定します。ブローカーレベルで message.max.bytes を設定すると、すべてのトピックに大きなメッセージが許可されます。
ブローカーは、message.max.bytes で設定された制限よりも大きなメッセージを拒否します。プロデューサー (max.request.size) およびコンシューマー (message.max.bytes) のバッファーサイズは、より大きなメッセージに対応できなければなりません。