付録C コンシューマー設定パラメーター

key.deserializer

型: class
重要度:

org.apache.kafka.common.serialization.Deserializer インターフェイスを実装するキーのデシリアライザークラス。

value.deserializer

型: class
重要度:

org.apache.kafka.common.serialization.Deserializer インターフェイスを実装する値のデシリアライザークラス。

bootstrap.servers

型: list
デフォルト: ""
有効な値: non-null string
重要度:

Kafka クラスターへの最初の接続を確立するために使用されるホストとポートのペアの一覧。クライアントは、ブートストラップ用にここで指定されたサーバーに関係なく、すべてのサーバーを利用します。この一覧は、サーバーのフルセットを検出するために使用される最初のホストにのみ影響します。この一覧は、host1:port1,host2:port2,…​ の形式にする必要があります。これらのサーバーは、(動的に変更される可能性がある) 完全なクラスターメンバーシップを検出するための最初の接続にだけ使用されるため、このリストにはサーバーの完全なセットを含める必要はありません (ただし、サーバーがダウンした場合に備えて、複数のサーバーが必要になる場合があります)。

fetch.min.bytes

型: int
デフォルト: 1
有効な値: [0,…​]
重要度:

サーバーがフェッチ要求に対して返す必要のあるデータの最小量。利用可能なデータが不十分な場合、リクエストは、リクエストに応答する前に、十分なデータが蓄積されるのを待ちます。デフォルト設定の 1 バイトは、1 バイトのデータが使用可能になるか、データの到着を待機してフェッチ要求がタイムアウトするとすぐに、フェッチ要求に応答することを意味します。これを 1 を超える値に設定すると、サーバーは大量のデータが蓄積されるのを待機することになり、追加の遅延を犠牲にしてサーバーのスループットを少し向上させることができます。

group.id

型: string
デフォルト: null
重要度:

このコンシューマーが属するコンシューマーグループを識別する一意の文字列。このプロパティーは、コンシューマーが subscribe(topic) を使用したグループ管理機能または Kafka ベースのオフセット管理ストラテジーのいずれかを使用する場合に必要です。

heartbeat.interval.ms

型: int
デフォルト: 3000
重要度:

Kafka のグループ管理機能を使用する場合の、ハートビートからコンシューマーコーディネーター間の想定される時間。ハートビートは、コンシューマーのセッションがアクティブな状態を維持し、新しいコンシューマーがグループに参加したり離脱したりする際のリバランスを促進するために使用されます。この値は session.timeout.ms よりも低く設定する必要がありますが、通常はその値の 1/3 以下に設定する必要があります。さらに低く調整することで、通常のリバランスの予想時間を制御することもできます。

max.partition.fetch.bytes

型: int
デフォルト: 1048576
有効な値: [0,…​]
重要度:

サーバーが返すパーティションごとのデータの最大量。レコードはコンシューマーによってバッチでフェッチされます。フェッチの最初の空でないパーティションの最初のレコードバッチがこの制限よりも大きい場合でも、コンシューマーが確実に前進することができるようにバッチが返されます。ブローカーによって許可される最大レコードバッチサイズは、message.max.bytes (ブローカー設定) または max.message.bytes (トピック設定) で定義されます。コンシューマーのリクエストサイズを制限する場合は、fetch.max.bytes を参照してください。

session.timeout.ms

型: int
デフォルト: 10000
重要度:

Kafka のグループ管理機能の使用時にクライアント障害を検出するために使用されるタイムアウト。クライアントは定期的にハートビートを送信し、liveness をブローカーに示します。このセッションタイムアウトの期限切れ前にブローカーによってハートビートが受信されない場合、ブローカーはグループからこのクライアントを削除し、リバランスを開始します。この値は、group.min.session.timeout.ms および group.max.session.timeout.ms によってブローカー設定で設定される許容範囲内である必要があることに注意してください。

ssl.key.password

型: password
デフォルト: null
重要度:

キーストアファイル内の秘密キーのパスワード。これはクライアントにとってオプションになります。

ssl.keystore.location

型: string
デフォルト: null
重要度:

キーストアファイルの場所。これはクライアントではオプションで、クライアントの双方向認証に使用できます。

ssl.keystore.password

型: password
デフォルト: null
重要度:

キーストアファイルのストアパスワード。これはクライアントではオプションで、ssl.keystore.location が設定されている場合にのみ必要です。

ssl.truststore.location

型: string
デフォルト: null
重要度:

トラストストアファイルの場所。

ssl.truststore.password

型: password
デフォルト: null
重要度:

トラストストアファイルのパスワード。パスワードが設定されていない場合でもトラストストアへのアクセスは可能ですが、整合性チェックは無効になっています。

allow.auto.create.topics

型: boolean
デフォルト: true
重要度:

トピックをサブスクライブまたは割り当てるときに、ブローカーでのトピックの自動作成を許可します。サブスクライブするトピックは、ブローカーが auto.create.topics.enable ブローカー設定を使用して許可されている場合にのみ自動的に作成されます。この設定は、0.11.0 より古いブローカーを使用する場合は、false に設定する必要があります。

auto.offset.reset

型: string
デフォルト: latest
有効な値: [latest, earliest, none]
重要度:

Kafka に初期オフセットがない場合や、現在のオフセットがサーバー上に存在しない場合 (例: データが削除された場合など) の対処方法。

  • earliest: 自動的にオフセットを最も古いオフセットにリセットする
  • latest: 自動的にオフセットを最新のオフセットにリセットする
  • none: コンシューマーのグループに以前のオフセットが見つからない場合は、コンシューマーに例外を出力する
  • anything else: コンシューマーに例外を出力する
client.dns.lookup

型: 文字列
デフォルト: デフォルト
有効な値: デフォルト、use_all_dns_ips、resolve_canonical_bootstrap_servers_only
重要度:

クライアントが DNS ルックアップを使用する方法を制御します。use_all_dns_ips に設定すると、ルックアップでホスト名に対して複数の IP アドレスが返された場合、接続に失敗する前にすべての IP アドレスへの接続が試行されます。ブートストラップサーバーとアドバタイズサーバーの両方に適用されます。値が resolve_canonical_bootstrap_servers_only の場合、各エントリーは解決され、正規名のリストにデプロイメントされます。

connections.max.idle.ms

型: long
デフォルト: 540000
重要度:

この設定で指定された期間 (ミリ秒単位) の後にアイドル状態の接続を閉じます。

default.api.timeout.ms

型: int
デフォルト: 60000
有効な値: [0,…​]
重要度:

クライアント API のタイムアウト (ミリ秒単位) を指定します。この設定は、timeout パラメーターを指定しないすべてのクライアント操作のデフォルトタイムアウトとして使用されます。

enable.auto.commit

型: boolean
デフォルト: true
重要度:

true の場合、コンシューマーのオフセットは定期的にバックグラウンドでコミットされます。

exclude.internal.topics

型: boolean
デフォルト: true
重要度:

サブスクライブされたパターンと一致する内部トピックをサブスクリプションから除外するべきかどうか。常に内部トピックに明示的にサブスクライブできます。

fetch.max.bytes

型: int
デフォルト: 52428800
有効な値: [0,…​]
重要度:

サーバーがフェッチリクエストに対して返す必要のあるデータの最大量。レコードはコンシューマーによってバッチで取得され、フェッチの最初の空ではないパーティションの最初のレコードバッチがこの値よりも大きい場合でも、コンシューマーが確実に前進することができるようにレコードバッチが返されます。そのため、絶対的な最大値ではありません。ブローカーによって許可される最大レコードバッチサイズは、message.max.bytes (ブローカー設定) または max.message.bytes (トピック設定) で定義されます。コンシューマーは複数のフェッチを並行して実行することに注意してください。

group.instance.id

型: string
デフォルト: null
重要度:

エンドユーザーが提供するコンシューマーインスタンスの一意の識別子。non-empty strings のみが許可されます。設定されている場合、コンシューマーは静的メンバーとして扱われます。つまり、常に、この ID を持つ 1 つのインスタンスのみがコンシューマーグループで許可されます。これは、より大きなセッションタイムアウトと組み合わせて使用して、一時的な利用不可 (プロセス再起動など) によるグループのリバランスを回避します。設定しないと、コンシューマーは従来の動作である動的メンバーとしてグループに参加します。

isolation.level

型: string
デフォルト: read_uncommitted
有効な値: [read_committed, read_uncommitted]
重要度:

トランザクションで書き込まれたメッセージを読み取る方法を制御します。read_committed に設定すると、consumer.poll() はコミットされたトランザクションメッセージのみを返します。`read_uncommitted’ (デフォルト) に設定すると、consumer.poll() は中断されたトランザクションメッセージも含めて、すべてのメッセージを返します。非トランザクションメッセージは、どちらのモードでも無条件に返されます。

メッセージは常にオフセットの順序で返されます。そのため、read_committed モードでは、consumer.poll() は最初のオープントランザクションのオフセットよりも 1 つ小さい最後の安定したオフセット (LSO) までのメッセージのみを返します。特に、継続中のトランザクションに属するメッセージの後に表示されるメッセージは、関連するトランザクションが完了するまで保留されます。その結果、read_committed コンシューマーはフライトトランザクションがある場合に高基準値を読み取ることができません。

Further, when in `read_committed` the seekToEnd method will return the LSO.
max.poll.interval.ms

型: int
デフォルト: 300000
有効な値: [1,…​]
重要度:

コンシューマーグループ管理を使用する場合の poll() の呼び出し間の最大遅延。これにより、コンシューマーがさらにレコードをフェッチする前にアイドル状態になることができる時間に上限が設定されます。このタイムアウトの期限が切れる前に poll() が呼び出されない場合、コンシューマーは失敗とみなされ、グループはパーティションを別のメンバーに再割り当てするためにリバランスします。null 以外の group.instance.id を使用しているコンシューマーがこのタイムアウトに達した場合、パーティションは即座に再割り当てされません。代わりに、コンシューマーはハートビートの送信を停止し、session.timeout.ms の期限が切れるとパーティションが再割り当てされます。これは、シャットダウンした静的コンシューマーの動作を反映します。

max.poll.records

型: int
デフォルト: 500
有効な値: [1,…​]
重要度:

poll() への単一の呼び出しで返される最大レコード数。

partition.assignment.strategy

型: リスト
デフォルト: クラス org.apache.kafka.clients.consumer.RangeAssignor
有効な値: null 以外の文字列
重要度:

グループ管理が使用されている場合に、クライアントがコンシューマーインスタンス間でパーティションの所有権を分散するために使用するパーティション割り当て戦略を担当する、サポートされているアサイナーのクラス名またはクラスターイプの優先順のリスト。org.apache.kafka.clients.consumer.ConsumerPartitionAssignor インターフェイスを実装すると、カスタム割り当てストラテジーでプラグインできます。

receive.buffer.bytes

型: int
デフォルト: 65536
有効な値: [1,…​]
重要度:

データの読み取り時に使用する TCP 受信バッファー (SO_RCVBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。

request.timeout.ms

型: int
デフォルト: 30000
有効な値: [0,…​]
重要度:

この設定は、クライアントの要求の応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信します。または、再試行が使い切られるとリクエストが失敗します。

sasl.client.callback.handler.class

型: class
デフォルト: null
重要度:

AuthenticateCallbackHandler インターフェイスを実装する SASL クライアントコールバックハンドラークラスの完全修飾名。

sasl.jaas.config

型: password
デフォルト: null
重要度:

JAAS 設定ファイルで使用される形式の SASL 接続の JAAS ログインコンテキストパラメーター。JAAS 設定ファイルの形式は、こちら で説明されています。値の形式は loginModuleClass controlFlag (optionName=optionValue)*; です。ブローカーの場合、設定の前にリスナー接頭辞と SASL メカニズム名を小文字で指定する必要があります。たとえば、listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required; などです。

sasl.kerberos.service.name

型: string
デフォルト: null
重要度:

Kafka が実行される Kerberos プリンシパル名。これは、Kafka の JAAS 設定または Kafka の設定で定義できます。

sasl.login.callback.handler.class

型: class
デフォルト: null
重要度:

AuthenticateCallbackHandler インターフェイスを実装する SASL ログインコールバックハンドラークラスの完全修飾名。ブローカーの場合、ログインコールバックハンドラー設定の前には、リスナー接頭辞 と SASL メカニズム名を小文字で指定する必要があります。たとえば、listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler などです。

sasl.login.class

型: class
デフォルト: null
重要度:

Login インターフェイスを実装するクラスの完全修飾名。ブローカーの場合、ログイン設定の前にリスナー接頭辞 と SASL メカニズム名を小文字で指定する必要があります。たとえば、listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin です。

sasl.mechanism

型: string
デフォルト: GSSAPI
重要度:

クライアント接続に使用される SASL メカニズム。これは、セキュリティープロバイダーが利用可能な任意のメカニズムである可能性がありますGSSAPI はデフォルトのメカニズムです。

security.protocol

型: string
デフォルト: PLAINTEXT
重要度:

ブローカーとの通信に使用されるプロトコル。有効な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。

send.buffer.bytes

型: int
デフォルト: 131072
有効な値: [-1,…​]
重要度:

データの送信時に使用する TCP 送信バッファー (SO_SNDBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。

ssl.enabled.protocols

型: リスト
デフォルト: TLSv1.2
重要度:

SSL 接続で有効なプロトコルの一覧。

ssl.keystore.type

型: string
デフォルト: JKS
重要度:

キーストアファイルのファイル形式。これはクライアントにとってオプションになります。

ssl.protocol

型: string
デフォルト: TLSv1.2
重要度:

SSLContext の生成に使用される SSL プロトコル。デフォルト設定は TLSv1.2 で、ほとんどの場合に問題ありません。最近の JVM で許可される値は TLSv1.2 および TLSv1.3 です。TLS、TLSv1.1、SSL、SSLv2、および SSLv3 は古い JVM でサポートされる可能性がありますが、既知のセキュリティー脆弱性のために使用は推奨されません。

ssl.provider

型: string
デフォルト: null
重要度:

SSL 接続に使用されるセキュリティープロバイダーの名前。デフォルト値は JVM のデフォルトのセキュリティープロバイダーです。

ssl.truststore.type

型: string
デフォルト: JKS
重要度:

トラストストアファイルのファイル形式。

auto.commit.interval.ms

型: int
デフォルト: 5000
有効な値: [0,…​]
重要度:

enable.auto.committrue に設定されている場合にコンシューマーオフセットが Kafka に自動コミットされる頻度 (ミリ秒単位)。

check.crcs

型: boolean
デフォルト: true
重要度:

消費されたレコードの CRC32 を自動的に確認します。これにより、メッセージのネットワーク上またはディスク上の破損が発生しなくなります。このチェックはオーバーヘッドを追加するため、極端なパフォーマンスを求める場合は無効になる可能性があります。

client.id

型: string
デフォルト: ""
重要度:

要求の実行時にサーバーに渡す ID 文字列。この目的は、サーバー側の要求ロギングに論理アプリケーション名を含めることを許可することにより、ip/port の他にもリクエストのソースを追跡できるようにすることです。

client.rack

型: string
デフォルト: ""
重要度:

このクライアントのラック識別子。これには、このクライアントが物理的に存在する場所を示す任意の文字列値を指定できます。ブローカー設定 'broker.rack' に対応します。

fetch.max.wait.ms

型: int
デフォルト: 500
有効な値: [0,…​]
重要度:

fetch.min.bytes で指定された要件をすぐに満たすために十分なデータがない場合に、サーバーがフェッチリクエストに応答するまでにブロックする最大時間。

interceptor.classes

型: list
デフォルト: ""
有効な値: non-null string
重要度:

インターセプターとして使用するクラスの一覧。org.apache.kafka.clients.consumer.ConsumerInterceptor インターフェイスを実装すると、コンシューマーによって受信される (場合によっては変更) レコードを傍受できます。デフォルトでは、インターセプターはありません。

metadata.max.age.ms

型: long
デフォルト: 300000
有効な値: [0,…​]
重要度:

新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がない場合でも、メタデータの更新を強制するまでの期間 (ミリ秒単位)。

metric.reporters

型: list
デフォルト: ""
有効な値: non-null string
重要度:

メトリクスレポーターとして使用するクラスの一覧。org.apache.kafka.common.metrics.MetricsReporter インターフェイスを実装すると、新しいメトリクスの作成が通知されるクラスのプラグが可能になります。JmxReporter は、JMX 統計を登録するために常に含まれます。

metrics.num.samples

型: int
デフォルト: 2
有効な値: [1,…​]
重要度:

メトリクスを計算するために保持されるサンプルの数。

metrics.recording.level

型: string
デフォルト: INFO
有効な値: [INFO, DEBUG]
重要度:

メトリクスの最も高い記録レベル。

metrics.sample.window.ms

型: long
デフォルト: 30000
有効な値: [0,…​]
重要度:

メトリクスサンプルが計算される時間枠。

reconnect.backoff.max.ms

型: long
デフォルト: 1000
有効な値: [0,…​]
重要度:

接続に繰り返し失敗したブローカーへの再接続時に待機する最大時間 (ミリ秒単位)。これが指定されている場合、ホストごとのバックオフは、連続して接続に失敗するたびに、この最大値まで指数関数的に増加します。バックオフの増加を計算した後、コネクションストームを回避するために 20% のランダムなジッターが追加されます。

reconnect.backoff.ms

型: long
デフォルト: 50
有効な値: [0,…​]
重要度:

特定のホストへの再接続を試みる前に待機するベース時間。これにより、タイトなループでホストに繰り返し接続することを回避します。このバックオフは、クライアントによるブローカーへのすべての接続試行に適用されます。

retry.backoff.ms

型: long
デフォルト: 100
有効な値: [0,…​]
重要度:

特定のトピックパーティションに対して失敗したリクエストを再試行するまでの待機時間。これにより、一部の障害シナリオでタイトループでリクエストを繰り返し送信することを回避できます。

sasl.kerberos.kinit.cmd

型: string
デフォルト: /usr/bin/kinit
重要度:

Kerberos kinit コマンドパス。

sasl.kerberos.min.time.before.relogin

型: long
デフォルト: 60000
重要度:

更新試行間のログインスレッドのスリープ時間。

sasl.kerberos.ticket.renew.jitter

型: double
デフォルト: 0.05
重要度:

更新時間に追加されたランダムなジッターの割合。

sasl.kerberos.ticket.renew.window.factor

型: double
デフォルト: 0.8
重要度:

ログインスレッドは、最後の更新からチケットの有効期限までの指定された時間のウィンドウファクターに達するまでスリープし、その時点でチケットの更新を試みます。

sasl.login.refresh.buffer.seconds

型: short
デフォルト: 300
有効な値: [0,…​,3600]
重要度:

クレデンシャルを更新するときに維持するクレデンシャルの有効期限が切れるまでのバッファー時間 (秒単位)。更新が、バッファー秒数よりも有効期限に近いときに発生する場合、更新は、可能な限り多くのバッファー時間を維持するために上に移動されます。設定可能な値は 0 から 3600 (1 時間) です。値を指定しない場合は、デフォルト値の 300 (5 分) が使用されます。この値および sasl.login.refresh.min.period.seconds は、その合計がクレデンシャルの残りの有効期間を超える場合にどちらも無視されます。現在、OAUTHBEARER にのみ適用されます。

sasl.login.refresh.min.period.seconds

型: short
デフォルト: 60
有効な値: [0,…​,900]
重要度:

ログイン更新スレッドがクレデンシャルを更新する前に待機する最小時間 (秒単位)。設定可能な値は 0 から 900 (15 分) です。値を指定しない場合は、デフォルト値の 60 (1 分) が使用されます。この値および sasl.login.refresh.buffer.seconds は、その合計がクレデンシャルの残りの有効期間を超える場合に両方とも無視されます。現在、OAUTHBEARER にのみ適用されます。

sasl.login.refresh.window.factor

型: double
デフォルト: 0.8
有効な値: [0.5,…​,1.0]
重要度:

ログイン更新スレッドは、クレデンシャルの有効期間との関連で指定の期間ファクターに達するまでスリープ状態になり、達成した時点でクレデンシャルの更新を試みます。設定可能な値は 0.5(50%) から 1.0(100%) までです。値が指定されていない場合、デフォルト値の 0.8(80%) が使用されます。現在、OAUTHBEARER にのみ適用されます。

sasl.login.refresh.window.jitter

型: double
デフォルト: 0.05
有効な値: [0.0,…​,0.25]
重要度:

ログイン更新スレッドのスリープ時間に追加されるクレデンシャルの存続期間に関連するランダムジッターの最大量。設定可能な値は 0 から 0.25(25%) です。値が指定されていない場合は、デフォルト値の 0.05(5%) が使用されます。現在、OAUTHBEARER にのみ適用されます。

security.providers

型: string
デフォルト: null
重要度:

設定可能なクリエータークラスのリストで、それぞれがセキュリティーアルゴリズムを実装するプロバイダーを返します。これらのクラスは org.apache.kafka.common.security.auth.SecurityProviderCreator インターフェイスを実装する必要があります。

ssl.cipher.suites

型: list
デフォルト: null
重要度:

暗号化スイートの一覧。これは、TLS または SSL ネットワークプロトコルを使用してネットワーク接続のセキュリティー設定をネゴシエートするために使用される認証、暗号化、MAC、および鍵交換アルゴリズムの名前付きの組み合わせです。デフォルトでは、利用可能なすべての暗号スイートがサポートされます。

ssl.endpoint.identification.algorithm

型: string
デフォルト: https
重要度:

サーバー証明書を使用してサーバーのホスト名を検証するエンドポイント識別アルゴリズム。

ssl.keymanager.algorithm

型: string
デフォルト: SunX509
重要度:

SSL 接続のキーマネージャーファクトリーによって使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに設定されたキーマネージャーファクトリーアルゴリズムです。

ssl.secure.random.implementation

型: string
デフォルト: null
重要度:

SSL 暗号操作に使用する SecureRandom PRNG 実装。

ssl.trustmanager.algorithm

型: string
デフォルト: PKIX
重要度:

SSL 接続のトラストマネージャーファクトリーによって使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに設定されたトラストマネージャーファクトリーアルゴリズムです。