付録G Kafka Streams 設定パラメーター

application.id

type: string
Importance: high

ストリーム処理アプリケーションの ID。Kafka クラスター内で一意である必要があります。これは、デフォルトの client-id プレフィックスである 1)として使用されます。2)メンバーシップ管理の group-id、および changelog トピックプレフィックスは 3 です。

bootstrap.servers

type: list
Importance: high

Kafka クラスターへの最初の接続を確立するために使用するホストとポートのペアの一覧。クライアントは、ここで指定するすべてのサーバーの使用を行います。このリストは、サーバーのフルセットを検出するために使用される初期ホストにのみ影響します。このリストは host1:port1,host2:port2,…​ の形式にする必要があります。これらのサーバーは、クラスターの全メンバーシップを検出するために初期接続(動的に変更されている可能性がある)にだけ使用されているため、この一覧にはサーバーのフルセットを含める必要はありません(サーバーがダウンした場合の場合もあります)。

replication.factor

type: int
デフォルト: 1
の修正:
high

変更ログトピックおよびストリーム処理アプリケーションによって作成されるトピックの再パーティショントピックのレプリケーション係数。ブローカークラスターがバージョン 2.4 以降にある場合は、-1 を設定してブローカーのデフォルトのレプリケーション係数を使用できます。

state.dir

type: string
Default: /tmp/kafka-streams
Importance: high

ステートストアのディレクトリーの場所。このパスは、同じ基礎となるファイルシステムを共有するストリームインスタンスごとに固有でなければなりません。

acceptable.recovery.lag

type: long
Default: 10000
Valid Values: [0,…​]
Importance: medium

アクティブなタスクでクライアントがキャッチされる最大ラグ(オフセットの数)。指定のワークロードの 1 分未満のリカバリー時間に対応します。0 以上でなければなりません。

cache.max.bytes.buffering

type: long
デフォルト: 10485760
有効な値: [0,…​]
Importance: medium

すべてのスレッドでバッファーに使用されるメモリーバイトの最大数。

client.id

type: string
デフォルト: ""
Importance: medium

内部コンシューマー、プロデューサー、および restore-consumer のクライアント ID に使用される ID プレフィックス文字列。パターン '<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>'。

default.deserialization.exception.handler

type: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: medium

org.apache.kafka.streams.errors.DeserializationExceptionHandler インターフェースを実装する例外処理クラス。

default.key.serde

type: class
Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Importance: medium

org.apache.kafka.common.serialization.Serde インターフェースを実装するキーのデフォルトのシリアライザー/デシリアライザークラス。windowed serde クラスが使用されている場合は、'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' でも org.apache.kafka.common.serialization.Serde インターフェースを実装する内部クラスを設定する必要があります。

default.production.exception.handler

type: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: medium

org.apache.kafka.streams.errors.ProductionExceptionHandler インターフェースを実装する例外処理クラス。

default.timestamp.extractor

Type: class
デフォルト: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: medium

org.apache.kafka.streams.processor.TimestampExtractor インターフェースを実装するデフォルトのタイムスタンプ抽出。

default.value.serde

type: class
Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Importance: medium

org.apache.kafka.common.serialization.Serde インターフェースを実装する値のデフォルトシリアライザー/デシリアライザークラス。windowed serde クラスが使用されている場合は、'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' でも org.apache.kafka.common.serialization.Serde インターフェースを実装する内部クラスを設定する必要があります。

default.windowed.key.serde.inner

type: class
Default: null
Importance: medium

ウィンドウ化されたキーの内部クラスのデフォルトのシリアライザー/デシリアライザー。org.apache.kafka.common.serialization.Serde インターフェースを実装する必要があります。

default.windowed.value.serde.inner

type: class
Default: null
Importance: medium

ウィンドウ化された値の内部クラスのデフォルトのシリアライザー/デシリアライザー。org.apache.kafka.common.serialization.Serde インターフェースを実装する必要があります。

max.task.idle.ms

タイプ: long
デフォルト: 0
のインポート:

パーティションバッファーにすべてのパーティションバッファーにレコードが含まれていなければ、ストリームタスクがアイドル状態になる最大時間(ミリ秒単位)。これにより、複数の入力ストリーム間で順序不足のレコード処理の可能性を回避します。

max.warmup.replicas

type: int
Default: 2
Valid Values: [1,…​]
Importance: medium

ウォームアップレプリカの最大数(設定された num.standbys 以上の期間で 1 つのインスタンスがタスクを利用可能にする目的で 1 度に割り当てることができるウォームスタンバイ)の最大数。別のインスタンスでこのタスクを利用できる状態にしておく一方で、再割り当てされたことになります。高可用性に使用できる追加のブローカートラフィックとクラスター状態のサイズを調整するために使用されます。1 以上でなければなりません。

num.standby.replicas

type: int
Default: 0
Importance: medium

各タスクのスタンバイレプリカ数。

num.stream.threads

type: int
Default: 1
Importance: medium

ストリーム処理を実行するスレッドの数。

processing.guarantee

type: string
デフォルト: at_least_once
有効な値: [at_least_once, exactly_once_beta]
Importance: medium

処理により、使用するべき確実性があります。使用できる値は、at_least_once (デフォルト)、exactly_once (ブローカーバージョン 0.11.0 以降が必要)、および exactly_once_beta (ブローカーバージョン 2.5 以降が必要)です。完全に 1 回実行処理には、デフォルトで、実稼働に推奨される設定である 3 つ以上のブローカークラスターが必要です。開発の場合は、ブローカーの設定 transaction.state.log.replication.factor および transaction.state.log.min.isr を調整することで変更できます。

security.protocol

type: string
Default: PLAINTEXT
Importance: medium

ブローカーとの通信に使用されるプロトコル。有効な値は PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。

task.timeout.ms

type: long
デフォルト: 300000(5 分)
有効な値: [0,…​]
Importance: medium

内部エラーが原因でタスクが停止している可能性があり、エラーが発生するまで再試行する可能性がある最大時間(ミリ秒単位)。0ms のタイムアウトでは、タスクは最初の内部エラーに対してエラーを出しました。0ms を超えるタイムアウトの場合には、エラーが発生する前に、タスクは少なくとも 1 回再試行します。

topology.optimization

type: string
Default: none
Valid Values: [none, all]
Importance: medium

Kafka Streams に指示する設定は、デフォルトで無効化され、トポロジーを最適化する必要があります。

application.server

type: string
デフォルト: ""
Importance: low

この KafkaStreams インスタンスのステートストア検出と対話型クエリーに使用できる、ユーザー定義のエンドポイントを参照する host:port ペア。

buffered.records.per.partition

type: int
Default: 1000
Importance: low

パーティションごとにバッファーする最大レコード数。

built.in.metrics.version

type: string
Default: latest
Valid Values: [0.10.0-2.4, latest]
Importance: low

使用する組み込みメトリクスのバージョン。

commit.interval.ms

type: long
デフォルト: 30000(30 秒)
有効値: [0,…​]
Importance: low

プロセッサーの場所を保存する頻度(ミリ秒単位)。( processing.guaranteeexactly_once に設定された場合、デフォルト値は 100 です。それ以外の場合は、デフォルト値は 30000 です。

connections.max.idle.ms

type: long
デフォルト: 540000(9 分)
Importance: low

この設定によって指定された期間(ミリ秒単位)の後にアイドル状態の接続を閉じます。

metadata.max.age.ms

type: long
デフォルト: 300000(5 分)
有効な値: [0,…​]
Importance: low

パーティションリーダーが変更されておらず、新しいブローカーやパーティションをプロアクティブに検出するようにしていても、メタデータの更新を強制する期間(ミリ秒単位)。

metric.reporters

type: list
デフォルト: ""
Importance: low

メトリクスレポーターとして使用するクラスの一覧。org.apache.kafka.common.metrics.MetricsReporter インターフェースを実装すると、新しいメトリクス作成の通知となるクラスにプラグインすることができます。JmxReporter は、JMX 統計を登録するために常に含まれます。

metrics.num.samples

type: int
Default: 2
Valid Values: [1,…​]
Importance: low

メトリクスを計算するために保持されるサンプルの数。

metrics.recording.level

type: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low

メトリックの最大記録レベル。

metrics.sample.window.ms

type: long
デフォルト: 30000(30 秒)
有効値: [0,…​]
Importance: low

メトリクスサンプルが計算される期間。

partition.grouper

type: class
Default: org.apache.kafka.streams.processor.DefaultPartitionGrouper
Importance: low

org.apache.kafka.streams.processor.PartitionGrouper インターフェースを実装するパーティション分類クラス。警告: この設定は非推奨となり、3.0.0 リリースで削除される予定です。

poll.ms

タイプ: long
デフォルト: 100
修正:
low

入力を待機する期間(ミリ秒単位)。

probing.rebalance.interval.ms

type: long
デフォルト: 600000(10 分)
有効な値: [60000,…​]
Importance: low

ウォームアップを終了し、アクティブになるレプリカに対するリバランスをトリガーするまで待機する最大時間(ミリ秒単位)。割り当てがバランスになるまで、リバランスはトリガーを継続します。1 分以上でなければなりません。

receive.buffer.bytes

type: int
Default: 32768(32 kibibytes)
Valid Values: [-1,…​]
Importance: low

データの読み取り時に使用する TCP 受信バッファー(SO_RCVBUF)のサイズ。値が -1 の場合、OS のデフォルトが使用されます。

reconnect.backoff.max.ms

type: long
デフォルト: 1000(1秒)
有効値:
[0,…​]
Importance: low

繰り返し接続に失敗したブローカーに再接続するまで待機する最大時間(ミリ秒単位)。指定された場合、ホストごとのバックオフは連続した接続障害ごとに指数関数的に増加し、最大接続数までします。バックオフの増加を計算すると、接続のフレッターを回避するために 20% のランダムジッターが追加されます。

reconnect.backoff.ms

type: long
デフォルト: 50
有効な値: [0,…​]
インポートランス: low

指定のホストに再接続を試みる前に待機する時間。これにより、密接なループでホストへ繰り返し接続することはできません。このバックオフは、クライアントがブローカーへのすべての接続試行に適用されます。

request.timeout.ms

type: int
デフォルト: 40000(40秒)
有効値:
[0,…​]
Importance: low

この設定では、クライアントがリクエストの応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントがリクエストを再送するか、再試行した場合はリクエストが失敗します。

retries

type: int
Default: 0
Valid Values: [0,…​,2147483647]
Importance: low

ゼロよりも大きい値を設定すると、クライアントは一時的なエラーで失敗したリクエストを再送信します。値をゼロまたは MAX_VALUE に設定し、対応するタイムアウトパラメーターを使用してクライアントの再試行期間を制御することが推奨されます。

retry.backoff.ms

type: long
デフォルト: 100
有効な値: [0,…​]
インポートランス: low

特定のトピックパーティションへの失敗した要求を再試行するまでの待機時間。これにより、障害シナリオによっては、1 回目のループでリクエストを繰り返し送信しないようにします。

rocksdb.config.setter

type: class
Default: null
Importance: low

org.apache.kafka.streams.state.RocksDBConfigSetter インターフェースを実装する Rocks DB 設定セッタークラスまたはクラス名。

send.buffer.bytes

type: int
Default: 131072(128 kibibytes)
Valid Values: [-1,…​]
Importance: low

データ送信時に使用する TCP 送信バッファー(SO_SNDBUF)のサイズ。値が -1 の場合、OS のデフォルトが使用されます。

state.cleanup.delay.ms

type: long
デフォルト: 600000(10 分)
Importance: low

パーティションが移行されたときに状態を削除するまでの待機時間(ミリ秒単位)。少なくとも state.cleanup.delay.ms で編集されていない状態ディレクトリーのみが削除されます。

upgrade.from

type: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3]
Importance: low

後方互換性がある方法でのアップグレードを可能にします。これは、[0.10.0, 1.1] から 2.0 以降にアップグレードする場合や、[2.0、2.3] から 2.4+ にアップグレードする際に必要になります。2.4 から新しいバージョンにアップグレードする場合、この設定を指定する必要はありません。デフォルトは null です。許可される値は "0.10.0"、"0.10.1"、"0.10.2"、"0.11.0"、""1.0"、"1.1"、"2.0"、"2.1"、"2.2"、"2.3" です(対応する古いバージョンからアップグレードする場合)。

window.size.ms

タイプ: long
デフォルト: null
インポートランス: low

ウィンドウの終了時間を計算するために、デシリアライザーのウィンドウサイズを設定します。

windowstore.changelog.additional.retention.ms

type: long
デフォルト: 86400000(1 日)
Importance: low

データがログ早期から削除されないように、ウィンドウメンテナンス Ms に追加されました。クロックドリフトを許可します。デフォルトは 1 日です。