Show Table of Contents
Chapter 90. Kafka
Kafka Component
Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their
pom.xml for this component:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>2.17.0.redhat-630187</version>
<!-- use the same version as your Camel core version -->
</dependency>
Camel 2.17 or newer: Scala is no longer used, as we use the Kafka Java client.
Camel 2.16 or older: You must also add a Maven dependency for your chosen Scala library. camel-kafka does not include that dependency, but assumes its provided. For example, to use Scala 2.10.4 add:
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>URI format
kafka:server:port[?options]
Options
|
Property
|
Default
|
Description
|
|---|---|---|
zookeeperHost
|
|
The zookeeper host to use
|
zookeeperPort
|
2181
|
The zookeeper port to use
|
zookeeperConnect
|
Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used. | |
topic
|
|
The topic to use
|
groupId
|
||
partitioner
|
||
consumerStreams
|
10 | |
clientId
|
||
zookeeperSessionTimeoutMs
|
||
zookeeperConnectionTimeoutMs
|
||
zookeeperSyncTimeMs
|
||
consumersCount
|
1
|
Camel 2.15.0: The number of consumers that connect to Kafka server. |
batchSize
|
100
|
Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once.
|
barrierAwaitTimeoutMs
|
10000
|
Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs.
|
bridgeEndpoint
|
false
|
Camel 2.16.0: If the bridgeEndpoint is true, the producer will ignore the topic header setting of the message.
|
You can append query options to the URI in the following format,
?option=value&option=value&...
Producer Options
|
Property
|
Default
|
Description
|
|---|---|---|
producerType
|
sync
|
Can have the following values:
|
compressionCodec
|
||
compressedTopics
|
||
messageSendMaxRetries
|
||
retryBackoffMs
|
||
topicMetadataRefreshIntervalMs
|
||
sendBufferBytes
|
||
requestRequiredAcks
|
||
requestTimeoutMs
|
||
queueBufferingMaxMs
|
||
queueBufferingMaxMessages
|
||
queueEnqueueTimeoutMs
|
||
batchNumMessages
|
||
serializerClass
|
||
keySerializerClass
|
Consumer Options
|
Property
|
Default
|
Description
|
|---|---|---|
consumerId
|
|
|
socketTimeoutMs
|
||
socketReceiveBufferBytes
|
||
fetchMessageMaxBytes
|
||
autoCommitEnable
|
||
autoCommitIntervalMs
|
||
queuedMaxMessages
|
||
rebalanceMaxRetries
|
||
fetchMinBytes
|
||
fetchWaitMaxMs
|
||
rebalanceBackoffMs
|
||
refreshLeaderBackoffMs
|
||
autoOffsetReset
|
||
consumerTimeoutMs
|
Samples
Consuming messages:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Producing messages:
See unit tests of camel-kafka for more examples
Endpoints
Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.
From an Endpoint you can use the following methods
- createProducer() will create a Producer for sending message exchanges to the endpoint
- createConsumer() implements the Event Driven Consumer pattern for consuming message exchanges from the endpoint via a Processor when creating a Consumer
- createPollingConsumer() implements the Polling Consumer pattern for consuming message exchanges from the endpoint via a PollingConsumer

Where did the comment section go?
Red Hat's documentation publication system recently went through an upgrade to enable speedier, more mobile-friendly content. We decided to re-evaluate our commenting platform to ensure that it meets your expectations and serves as an optimal feedback mechanism. During this redesign, we invite your input on providing feedback on Red Hat documentation via the discussion platform.