Chapter 5. Kafka producer configuration tuning

Use a basic producer configuration with optional properties that are tailored to specific use cases.

Adjusting your configuration to maximize throughput might increase latency or vice versa. You will need to experiment and tune your producer configuration to get the balance you need.

5.1. Basic producer configuration

Connection and serializer properties are required for every producer. Generally, it is good practice to add a client id for tracking, and use compression on the producer to reduce batch sizes in requests.

In a basic producer configuration:

  • The order of messages in a partition is not guaranteed.
  • The acknowledgment of messages reaching the broker does not guarantee durability.

Basic producer configuration properties

# ...
bootstrap.servers=localhost:9092 1
key.serializer=org.apache.kafka.common.serialization.StringSerializer 2
value.serializer=org.apache.kafka.common.serialization.StringSerializer 3
client.id=my-client 4
compression.type=gzip 5
# ...

1
(Required) Tells the producer to connect to a Kafka cluster using a host:port bootstrap server address for a Kafka broker. The producer uses the address to discover and connect to all brokers in the cluster. Use a comma-separated list to specify two or three addresses in case a server is down, but it’s not necessary to provide a list of all the brokers in the cluster.
2
(Required) Serializer to transform the key of each message to bytes prior to them being sent to a broker.
3
(Required) Serializer to transform the value of each message to bytes prior to them being sent to a broker.
4
(Optional) The logical name for the client, which is used in logs and metrics to identify the source of a request.
5
(Optional) The codec for compressing messages, which are sent and might be stored in compressed format and then decompressed when reaching a consumer. Compression is useful for improving throughput and reducing the load on storage, but might not be suitable for low latency applications where the cost of compression or decompression could be prohibitive.

5.2. Data durability

Message delivery acknowledgments minimize the likelihood that messages are lost. Acknowledgments are enabled by default with the acks property set at acks=all.

Acknowledging message delivery

# ...
acks=all 1
# ...

1
acks=all forces a leader replica to replicate messages to a certain number of followers before acknowledging that the message request was successfully received.

The acks=all setting offers the strongest guarantee of delivery, but it will increase the latency between the producer sending a message and receiving acknowledgment. If you don’t require such strong guarantees, a setting of acks=0 or acks=1 provides either no delivery guarantees or only acknowledgment that the leader replica has written the record to its log.

With acks=all, the leader waits for all in-sync replicas to acknowledge message delivery. A topic’s min.insync.replicas configuration sets the minimum required number of in-sync replica acknowledgements. The number of acknowledgements include that of the leader and followers.

A typical starting point is to use the following configuration:

  • Producer configuration:

    • acks=all (default)
  • Broker configuration for topic replication:

    • default.replication.factor=3 (default = 1)
    • min.insync.replicas=2 (default = 1)

When you create a topic, you can override the default replication factor. You can also override min.insync.replicas at the topic level in the topic configuration.

AMQ Streams uses this configuration in the example configuration files for multi-node deployment of Kafka.

The following table describes how this configuration operates depending on the availability of followers that replicate the leader replica.

Table 5.1. Follower availability

Number of followers available and in-syncAcknowledgementsProducer can send messages?

2

The leader waits for 2 follower acknowledgements

Yes

1

The leader waits for 1 follower acknowledgement

Yes

0

The leader raises an exception

No

A topic replication factor of 3 creates one leader replica and two followers. In this configuration, the producer can continue if a single follower is unavailable. Some delay can occur whilst removing a failed broker from the in-sync replicas or a creating a new leader. If the second follower is also unavailable, message delivery will not be successful. Instead of acknowledging successful message delivery, the leader sends an error (not enough replicas) to the producer. The producer raises an equivalent exception. With retries configuration, the producer can resend the failed message request.

Note

If the system fails, there is a risk of unsent data in the buffer being lost.

5.3. Ordered delivery

Idempotent producers avoid duplicates as messages are delivered exactly once. IDs and sequence numbers are assigned to messages to ensure the order of delivery, even in the event of failure. If you are using acks=all for data consistency, using idempotency makes sense for ordered delivery. Idempotency is enabled for producers by default. With idempotency enabled, you can set the number of concurrent in-flight requests to a maximum of 5 for message ordering to be preserved.

Ordered delivery with idempotency

# ...
enable.idempotence=true 1
max.in.flight.requests.per.connection=5 2
acks=all 3
retries=2147483647 4
# ...

1
Set to true to enable the idempotent producer.
2
With idempotent delivery the number of in-flight requests may be greater than 1 while still providing the message ordering guarantee. The default is 5 in-flight requests.
3
Set acks to all.
4
Set the number of attempts to resend a failed message request.

If you choose not to use acks=all and disable idempotency because of the performance cost, set the number of in-flight (unacknowledged) requests to 1 to preserve ordering. Otherwise, a situation is possible where Message-A fails only to succeed after Message-B was already written to the broker.

Ordered delivery without idempotency

# ...
enable.idempotence=false 1
max.in.flight.requests.per.connection=1 2
retries=2147483647
# ...

1
Set to false to disable the idempotent producer.
2
Set the number of in-flight requests to exactly 1.

5.4. Reliability guarantees

Idempotence is useful for exactly once writes to a single partition. Transactions, when used with idempotence, allow exactly once writes across multiple partitions.

Transactions guarantee that messages using the same transactional ID are produced once, and either all are successfully written to the respective logs or none of them are.

# ...
enable.idempotence=true
max.in.flight.requests.per.connection=5
acks=all
retries=2147483647
transactional.id=UNIQUE-ID 1
transaction.timeout.ms=900000 2
# ...
1
Specify a unique transactional ID.
2
Set the maximum allowed time for transactions in milliseconds before a timeout error is returned. The default is 900000 or 15 minutes.

The choice of transactional.id is important in order that the transactional guarantee is maintained. Each transactional id should be used for a unique set of topic partitions. For example, this can be achieved using an external mapping of topic partition names to transactional ids, or by computing the transactional id from the topic partition names using a function that avoids collisions.

5.5. Optimizing producers for throughput and latency

Usually, the requirement of a system is to satisfy a particular throughput target for a proportion of messages within a given latency. For example, targeting 500,000 messages per second with 95% of messages being acknowledged within 2 seconds.

It’s likely that the messaging semantics (message ordering and durability) of your producer are defined by the requirements for your application. For instance, it’s possible that you don’t have the option of using acks=0 or acks=1 without breaking some important property or guarantee provided by your application.

Broker restarts have a significant impact on high percentile statistics. For example, over a long period the 99th percentile latency is dominated by behavior around broker restarts. This is worth considering when designing benchmarks or comparing performance numbers from benchmarking with performance numbers seen in production.

Depending on your objective, Kafka offers a number of configuration parameters and techniques for tuning producer performance for throughput and latency.

Message batching (linger.ms and batch.size)
Message batching delays sending messages in the hope that more messages destined for the same broker will be sent, allowing them to be batched into a single produce request. Batching is a compromise between higher latency in return for higher throughput. Time-based batching is configured using linger.ms, and size-based batching is configured using batch.size.
Compression (compression.type)
Message compression adds latency in the producer (CPU time spent compressing the messages), but makes requests (and potentially disk writes) smaller, which can increase throughput. Whether compression is worthwhile, and the best compression to use, will depend on the messages being sent. Compression happens on the thread which calls KafkaProducer.send(), so if the latency of this method matters for your application you should consider using more threads.
Pipelining (max.in.flight.requests.per.connection)
Pipelining means sending more requests before the response to a previous request has been received. In general more pipelining means better throughput, up to a threshold at which other effects, such as worse batching, start to counteract the effect on throughput.

Lowering latency

When your application calls KafkaProducer.send() the messages are:

  • Processed by any interceptors
  • Serialized
  • Assigned to a partition
  • Compressed
  • Added to a batch of messages in a per-partition queue

At which point the send() method returns. So the time send() is blocked is determined by:

  • The time spent in the interceptors, serializers and partitioner
  • The compression algorithm used
  • The time spent waiting for a buffer to use for compression

Batches will remain in the queue until one of the following occurs:

  • The batch is full (according to batch.size)
  • The delay introduced by linger.ms has passed
  • The sender is about to send message batches for other partitions to the same broker, and it is possible to add this batch too
  • The producer is being flushed or closed

Look at the configuration for batching and buffering to mitigate the impact of send() blocking on latency.

# ...
linger.ms=100 1
batch.size=16384 2
buffer.memory=33554432 3
# ...
1
The linger property adds a delay in milliseconds so that larger batches of messages are accumulated and sent in a request. The default is 0'.
2
If a maximum batch.size in bytes is used, a request is sent when the maximum is reached, or messages have been queued for longer than linger.ms (whichever comes sooner). Adding the delay allows batches to accumulate messages up to the batch size.
3
The buffer size must be at least as big as the batch size, and be able to accommodate buffering, compression and in-flight requests.

Increasing throughput

Improve throughput of your message requests by adjusting the maximum time to wait before a message is delivered and completes a send request.

You can also direct messages to a specified partition by writing a custom partitioner to replace the default.

# ...
delivery.timeout.ms=120000 1
partitioner.class=my-custom-partitioner 2

# ...
1
The maximum time in milliseconds to wait for a complete send request. You can set the value to MAX_LONG to delegate to Kafka an indefinite number of retries. The default is 120000 or 2 minutes.
2
Specify the class name of the custom partitioner.