Appendix G. Kafka Streams configuration parameters

NameDescriptionTypeDefaultValid ValuesImportance

application.id

An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.

string

  

high

bootstrap.servers

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,…​. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

list

  

high

replication.factor

The replication factor for change log topics and repartition topics created by the stream processing application.

int

1

 

high

state.dir

Directory location for state store.

string

/tmp/kafka-streams

 

high

cache.max.bytes.buffering

Maximum number of memory bytes to be used for buffering across all threads

long

10485760

[0,…​]

medium

client.id

An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern '<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>'.

string

""

 

medium

default.deserialization.exception.handler

Exception handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface.

class

org.apache.kafka.streams.errors.LogAndFailExceptionHandler

 

medium

default.key.serde

Default serializer / deserializer class for key that implements the org.apache.kafka.common.serialization.Serde interface. Note when windowed serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as well

class

org.apache.kafka.common.serialization.Serdes$ByteArraySerde

 

medium

default.production.exception.handler

Exception handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandler interface.

class

org.apache.kafka.streams.errors.DefaultProductionExceptionHandler

 

medium

default.timestamp.extractor

Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface.

class

org.apache.kafka.streams.processor.FailOnInvalidTimestamp

 

medium

default.value.serde

Default serializer / deserializer class for value that implements the org.apache.kafka.common.serialization.Serde interface. Note when windowed serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as well

class

org.apache.kafka.common.serialization.Serdes$ByteArraySerde

 

medium

num.standby.replicas

The number of standby replicas for each task.

int

0

 

medium

num.stream.threads

The number of threads to execute stream processing.

int

1

 

medium

processing.guarantee

The processing guarantee that should be used. Possible values are at_least_once (default) and exactly_once. Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor.

string

at_least_once

[at_least_once, exactly_once]

medium

security.protocol

Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

string

PLAINTEXT

 

medium

topology.optimization

A configuration telling Kafka Streams if it should optimize the topology, disabled by default

string

none

[none, all]

medium

application.server

A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application

string

""

 

low

buffered.records.per.partition

The maximum number of records to buffer per partition.

int

1000

 

low

commit.interval.ms

The frequency with which to save the position of the processor. (Note, if 'processing.guarantee' is set to 'exactly_once', the default value is 100, otherwise the default value is 30000.

long

30000

 

low

connections.max.idle.ms

Close idle connections after the number of milliseconds specified by this config.

long

540000

 

low

metadata.max.age.ms

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

long

300000

[0,…​]

low

metric.reporters

A list of classes to use as metrics reporters. Implementing the org.apache.kafka.common.metrics.MetricsReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.

list

""

 

low

metrics.num.samples

The number of samples maintained to compute metrics.

int

2

[1,…​]

low

metrics.recording.level

The highest recording level for metrics.

string

INFO

[INFO, DEBUG]

low

metrics.sample.window.ms

The window of time a metrics sample is computed over.

long

30000

[0,…​]

low

partition.grouper

Partition grouper class that implements the org.apache.kafka.streams.processor.PartitionGrouper interface.

class

org.apache.kafka.streams.processor.DefaultPartitionGrouper

 

low

poll.ms

The amount of time in milliseconds to block waiting for input.

long

100

 

low

receive.buffer.bytes

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.

int

32768

[0,…​]

low

reconnect.backoff.max.ms

The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.

long

1000

[0,…​]

low

reconnect.backoff.ms

The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.

long

50

[0,…​]

low

request.timeout.ms

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

int

40000

[0,…​]

low

retries

Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error.

int

0

[0,…​,2147483647]

low

retry.backoff.ms

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

long

100

[0,…​]

low

rocksdb.config.setter

A Rocks DB config setter class or class name that implements the org.apache.kafka.streams.state.RocksDBConfigSetter interface

class

null

 

low

send.buffer.bytes

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.

int

131072

[0,…​]

low

state.cleanup.delay.ms

The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.ms will be removed

long

600000

 

low

upgrade.from

Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. When upgrading from 1.2 to a newer version it is not required to specify this config.Default is null. Accepted values are "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "1.1" (for upgrading from the corresponding old version).

string

null

[null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1]

low

windowstore.changelog.additional.retention.ms

Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day

long

86400000

 

low