第7章 Kafka ストリームの設定プロパティー
application.idタイプ: string
重要度: 高
ストリーム処理アプリケーションの識別子。Kafka クラスター内で一意である必要があります。これは、1) デフォルトのクライアント ID の接頭辞、2) メンバーシップ管理のためのグループ ID、3) Changelog のトピックの接頭辞として使用されます。
bootstrap.serversタイプ: list
重要度: 高
Kafka クラスターへの最初の接続を確立するために使用されるホストとポートのペアの一覧。クライアントは、ブートストラップ用にここで指定されたサーバーに関係なく、すべてのサーバーを利用します。この一覧は、サーバーのフルセットを検出するために使用される最初のホストにのみ影響します。この一覧は、
host1:port1,host2:port2,…の形式にする必要があります。これらのサーバーは、(動的に変更される可能性がある) 完全なクラスターメンバーシップを検出するための最初の接続にだけ使用されるため、このリストにはサーバーの完全なセットを含める必要はありません (ただし、サーバーがダウンした場合に備えて、複数のサーバーが必要になる場合があります)。num.standby.replicas型: int
デフォルト: 0
重要度: 高
各タスクのスタンバイレプリカ数。
state.dirタイプ: string
デフォルト: /tmp/kafka-streams
重要度: 高
状態ストアのディレクトリーの場所。このパスは、同じ基礎となるファイルシステムを共有するストリームインスタンスごとに一意である必要があります。
acceptable.recovery.lag型: long
デフォルト: 10000
有効な値: [0,…]
重要度: 中
クライアントがアクティブなタスク割り当てを受け取るのに十分なほど追いついたと見なされる最大許容ラグ (追いつくまでのオフセット数)。割り当て時に、処理前に残りの変更ログを復元します。リバランス中の処理の一時停止を回避するために、この設定は、特定のワークロードの 1 分未満の回復時間に対応する必要があります。0 以上である必要があります。
cache.max.bytes.buffering型: long
デフォルト: 10485760
有効な値: [0,…]
重要度: 中
すべてのスレッドでバッファーするのに使用されるメモリーバイトの最大数。
client.idタイプ: string
デフォルト: ""
重要度: 中
内部コンシューマー、プロデューサー、および復元コンシューマーのクライアント ID に使用される ID 接頭辞。パターンは、'<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>' です。
default.deserialization.exception.handlerタイプ: class
デフォルト: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
重要度: 中
org.apache.kafka.streams.errors.DeserializationExceptionHandlerインターフェイスを実装する例外処理クラス。default.key.serdeタイプ: class
デフォルト: null
重要度: 中
org.apache.kafka.common.serialization.Serdeインターフェイスを実装するキーのデフォルトのシリアライザー/デシリアライザークラス。windowed serde クラスを使用する場合は、org.apache.kafka.common.serialization.Serdeインターフェイスを実装した inner serde クラスを 'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' で設定する必要があることに注意してください。default.list.key.serde.innerタイプ: class
デフォルト: null
重要度: 中
org.apache.kafka.common.serialization.Serdeインターフェイスを実装するキーの list serde のデフォルトの内部クラス。この設定は、default.key.serde設定がorg.apache.kafka.common.serialization.Serdes.ListSerdeに設定されている場合にのみ読み取られます。default.list.key.serde.typeタイプ: class
デフォルト: null
重要度: 中
java.util.Listインターフェイスを実装するキーのデフォルトクラス。この設定は、default.key.serde設定がorg.apache.kafka.common.serialization.Serdes.ListSerdeに設定されている場合にのみ読み取られます。リスト serde クラスが使用されている場合は、org.apache.kafka.common.serialization.Serdeインターフェイスを 'default.list.key.serde.inner' 経由で実装する内部 serde クラスを設定する必要がある点に注意してください。default.list.value.serde.innerタイプ: class
デフォルト: null
重要度: 中
org.apache.kafka.common.serialization.Serdeインターフェイスを実装する値の list serde のデフォルトの内部クラス。この設定は、default.value.serde設定がorg.apache.kafka.common.serialization.Serdes.ListSerdeに設定されている場合にのみ読み取られます。default.list.value.serde.typeタイプ: class
デフォルト: null
重要度: 中
java.util.Listインターフェイスを実装する値のデフォルトクラス。この設定は、default.value.serde設定がorg.apache.kafka.common.serialization.Serdes.ListSerdeに設定されている場合にのみ読み取られます。リスト serde クラスが使用されている場合は、org.apache.kafka.common.serialization.Serdeインターフェイスを 'default.list.value.serde.inner' 経由で実装する内部 serde クラスを設定する必要がある点に注意してください。default.production.exception.handlerタイプ: class
デフォルト: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
重要度: 中
org.apache.kafka.streams.errors.ProductionExceptionHandlerインターフェイスを実装する例外処理クラス。default.timestamp.extractorタイプ: class
デフォルト: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
重要度: 中
org.apache.kafka.streams.processor.TimestampExtractorインターフェイスを実装した、デフォルトのタイムスタンプ抽出クラスです。default.value.serdeタイプ: class
デフォルト: null
重要度: 中
org.apache.kafka.common.serialization.Serdeインターフェイスを実装する値のデフォルトのシリアライザー/デシリアライザークラス。windowed serde クラスを使用する場合は、org.apache.kafka.common.serialization.Serdeインターフェイスを実装した inner serde クラスを 'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' で設定する必要があることに注意してください。max.task.idle.ms型: long
デフォルト: 0
重要度: 中
この設定は、結合とマージが順不同の結果を生成するかどうかを制御します。この設定値は、ストリームタスクが、一部の (すべてではない) 入力パーティションに完全に追いついたときに、プロデューサーが追加のレコードを送信するのを待ち、複数の入力ストリームにまたがるレコード処理の順序がずれる可能性を避けるためにアイドル状態を保つ最大時間 (ミリ秒単位) になります。デフォルト (ゼロ) は、プロデューサーがさらにレコードを送信するのを待ちませんが、ブローカーにすでに存在するデータをフェッチするのを待ちます。このデフォルトは、ブローカーにすでに存在するレコードの場合、Streams がそれらをタイムスタンプ順に処理することを意味します。-1 に設定すると、アイドリングを完全に無効にし、ローカルで利用可能なデータをすべて処理しますが、このように処理する場合でも、順序外の処理が発生する可能性があります。
max.warmup.replicas型: int
デフォルト: 2
有効な値: [1,…]
重要度: 中
タスクが再割り当てされたあるインスタンスでウォームアップしている間に、別のインスタンスでタスクを利用できるようにするために、一度に割り当てることができるウォームアップレプリカ (設定された num.standbys を超える追加のスタンドバイ) の最大数です。高可用性を確保するために追加のブローカートラフィックとクラスターの状態を調整するのに使用されます。1 以上でなければなりません。
num.stream.threads型: int
デフォルト: 1
重要度: 中
ストリーム処理を実行するスレッドの数。
processing.guaranteeタイプ: string
デフォルト: at_least_once
有効な値: [at_least_once, exactly_once, exactly_once_beta, exactly_once_v2]
重要度: 中
使用されるべき処理保証です。使用できる値は、
at_least_once(デフォルト) およびexactly_once_v2(ブローカーバージョン 2.5 以降が必要) です。非推奨のオプションはexactly_once(ブローカーバージョン 0.11.0 以降が必要) およびexactly_once_beta(ブローカーバージョン 2.5 以降が必要) です。1 回だけの処理には、デフォルトで少なくとも 3 つのブローカーのクラスターが必要であることに注意してください。これは、実稼働環境に推奨される設定です。開発の場合、ブローカー設定transaction.state.log.replication.factorおよびtransaction.state.log.min.isrを調整することにより、これを変更できます。rack.aware.assignment.tagsタイプ: list
デフォルト: ""
有効な値: 最大 5 つの要素を含むリスト
重要度: 中
Kafka Streams インスタンス間でスタンバイレプリカを配布するために使用されるクライアントタグキーのリスト。設定すると、Kafka ストリームは、各クライアントタグディメンションにスタンバイタスクを分散するために最善を尽くします。
replication.factorタイプ: int
デフォルト: -1
重要度: 中
ストリーム処理アプリケーションによって作成されたログトピックおよびパーティショントピックを変更するためのレプリケーション係数。デフォルトの
-1(つまり、ブローカーのデフォルトレプリケーションファクターを使用) にはブローカーバージョン 2.4 以降が必要です。security.protocol型: string
デフォルト: PLAINTEXT
重要度: 中
ブローカーとの通信に使用されるプロトコル。有効な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
task.timeout.ms型: long
デフォルト: 300000 (5 分)
有効な値: [0,…]
重要度: 中
内部エラーが原因でタスクが停止し、エラーが発生するまで再試行する最大時間 (ミリ秒単位)。タイムアウトが 0ms の場合、タスクは最初の内部エラーに対してエラーを発生させます。タイムアウトが 0ms を超える場合、タスクはエラーが発生する前に少なくとも 1 回再試行します。
topology.optimizationタイプ: string
デフォルト: none
有効な値: [none, all]
重要度: 中
デフォルトでは、トポロジーを最適化する必要がある場合に Kafka Streams に指示する設定。
application.serverタイプ: string
デフォルト: ""
重要度: 低
この KafkaStreams インスタンスでの状態ストア検出とインタラクティブなクエリーに使用できるユーザー定義のエンドポイントを参照する host:port ペア。
buffered.records.per.partition型: int
デフォルト: 1000
重要度: 低
パーティションごとにバッファーを行う最大レコード数。
built.in.metrics.versionタイプ: string
デフォルト: latest
有効な値: [latest]
重要度: 低
使用する組み込みメトリクスのバージョン。
commit.interval.ms型: long
デフォルト: 30000 (30 秒)
有効な値: [0,…]
重要度: 低
プロセッサーの位置を保存する頻度 (ミリ秒単位)。(
processing.guaranteeがexactly_once_v2、exactly_onceに設定されている場合、デフォルト値は100になり、これ以外の場合のデフォルト値は30000になる点に注意してください。connections.max.idle.ms型: long
デフォルト: 540000 (9 分)
重要度: low
この設定で指定された期間 (ミリ秒単位) の後にアイドル状態の接続を閉じます。
default.dsl.storeタイプ: string
デフォルト: rocksDB
有効な値: [rocksDB, in_memory]
重要度: 低
DSL Operator が使用するデフォルトの状態ストアタイプ。
metadata.max.age.ms型: long
デフォルト: 300000 (5 分)
有効な値: [0,…]
重要度: low
新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がない場合でも、メタデータの更新を強制するまでの期間 (ミリ秒単位)。
metric.reportersタイプ: list
デフォルト: ""
重要度: 低
メトリクスレポーターとして使用するクラスの一覧。
org.apache.kafka.common.metrics.MetricsReporterインターフェイスを実装すると、新しいメトリクスの作成が通知されるクラスのプラグが可能になります。JmxReporter は、JMX 統計を登録するために常に含まれます。metrics.num.samples型: int
デフォルト: 2
有効な値: [1,…]
重要度: 低
メトリクスを計算するために保持されるサンプルの数。
metrics.recording.level型: string
デフォルト: INFO
有効な値: [INFO, DEBUG, TRACE]
重要度: 低
メトリクスの最も高い記録レベル。
metrics.sample.window.ms型: long
デフォルト: 30000 (30 秒)
有効な値: [0,…]
重要度: 低
メトリクスサンプルが計算される時間枠。
poll.ms型: long
デフォルト: 100
重要度: 低
入力を待つためにブロックする時間をミリ秒単位で指定します。
probing.rebalance.interval.ms型: long
デフォルト: 600000 (10 分)
有効な値: [60000,…]
重要度: low
ウォーミングアップが終了し、アクティブになる準備ができているウォームアップレプリカをプローブするためにリバランスをトリガーするまで待機する最大時間 (ミリ秒単位)。プローブリバランスは、割り当てが分散されるまで引き続きトリガーされます。1 分以上でなければなりません。
receive.buffer.bytes型: int
デフォルト: 32768 (32 キビバイト)
有効な値: [-1,…]
重要度: low
データの読み取り時に使用する TCP 受信バッファー (SO_RCVBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。
reconnect.backoff.max.ms型: long
デフォルト: 1000 (1 秒)
有効な値: [0,…]
重要度: 低
接続に繰り返し失敗したブローカーへの再接続時に待機する最大時間 (ミリ秒単位)。これが指定されている場合、ホストごとのバックオフは、連続して接続に失敗するたびに、この最大値まで指数関数的に増加します。バックオフの増加を計算した後、コネクションストームを回避するために 20% のランダムなジッターが追加されます。
reconnect.backoff.ms型: long
デフォルト: 50
有効な値: [0,…]
重要度: 低
特定のホストへの再接続を試みる前に待機するベース時間。これにより、タイトなループでホストに繰り返し接続することを回避します。このバックオフは、クライアントによるブローカーへのすべての接続試行に適用されます。
repartition.purge.interval.ms型: long
デフォルト: 30000 (30 秒)
有効な値: [0,…]
重要度: 低
再分割トピックから完全に消費されたレコードを削除する頻度 (ミリ秒単位)。パージは、最後のパージから少なくともこの値の後に行われますが、それ以降まで遅れる場合があります。(
commit.interval.ms とは異なり、processing.guarantee が exact_once_v2に設定されている場合、この値のデフォルトは変更されないことに注意してください)。request.timeout.ms型: int
デフォルト: 40000 (40 秒)
有効な値: [0,…]
重要度: 低
この設定は、クライアントの要求の応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信します。または、再試行が使い切られるとリクエストが失敗します。
retries型: int
デフォルト: 0
有効な値: [0,…,2147483647]
重要度: 低
ゼロより大きい値を設定すると、クライアントは、一時的なエラーの可能性がある失敗した要求を再送信します。値をゼロまたは
MAX_VALUEのいずれかに設定し、対応するタイムアウトパラメーターを使用して、クライアントがリクエストを再試行する期間を制御することが推奨されます。retry.backoff.ms型: long
デフォルト: 100
有効な値: [0,…]
重要度: 低
特定のトピックパーティションに対して失敗したリクエストを再試行するまでの待機時間。これにより、一部の障害シナリオでタイトループでリクエストを繰り返し送信することを回避できます。
rocksdb.config.setterタイプ: class
デフォルト: null
重要度: 低
org.apache.kafka.streams.state.RocksDBConfigSetterインターフェイスを実装する Rocks DB 設定セッタークラスまたはクラス名。send.buffer.bytes型: int
デフォルト: 131072 (128 キビバイト)
有効な値: [-1,…]
重要度: low
データの送信時に使用する TCP 送信バッファー (SO_SNDBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。
state.cleanup.delay.ms型: long
デフォルト: 600000 (10 分)
重要度: low
パーティションが移行されたときに状態を削除するまで待機する時間 (ミリ秒単位)。少なくとも
state.cleanup.delay.msの間変更されていない state ディレクトリーのみが削除されます。upgrade.fromタイプ: string
デフォルト: null
有効な値: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3]
重要度: 低
後方互換性のある方法でのアップグレードを許可します。これは、[0.10.0, 1.1] から 2.0+ にアップグレード、または [2.0, 2.3] から 2.4+ にアップグレードする際に必要です。2.4 から新しいバージョンにアップグレードする場合は、この設定を指定する必要はありません。デフォルトは
nullです。許可される値は 0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1、2.0、2.1、2.2、2.3 (対応する旧バージョンからのアップグレード用) です。window.size.msタイプ: long
デフォルト: null
重要度: 低
ウィンドウの終了時間を計算するためにデシリアライザーのウィンドウサイズを設定します。
windowed.inner.class.serdeタイプ: string
デフォルト: null
重要度: 低
ウィンドウ表示されたレコードの内部クラスのデフォルトのシリアライザー/デシリアライザー。"
"`org.apache.kafka.common.serialization.Serde` インターフェイスを実装する必要があります。KafkaStreams アプリケーションでこの設定を設定すると、Plain コンシューマークライアントからのみ使用されることが意図されているため、エラーが発生することに注意してください。windowstore.changelog.additional.retention.ms型: long
デフォルト: 86400000 (1 日)
重要度: 低
windows maintainMs に追加され、データがログから早期に削除されないようにします。クロックドリフトを許可します。デフォルトは 1 日です。