Chapter 3. Configuring Zookeeper

Kafka uses Zookeeper to store configuration data and for cluster coordination. It is strongly recommended to run a cluster of replicated Zookeeper instances.

3.1. Basic configuration

The most important Zookeeper configuration options are:

tickTime
Zookeeper’s basic time unit in milliseconds. It is used for heartbeats and session timeouts. For example, minimum session timeout will be two ticks.
dataDir
The directory where Zookeeper stores its transaction log and snapshots of its in-memory database. This should be set to the /var/lib/zookeeper/ directory created during installation.
clientPort
Port number where clients can connect. Defaults to 2181.

An example zookeeper configuration file config/zookeeper.properties is located in the AMQ Streams installation directory. It is recommended to place the dataDir directory on a separate disk device to minimize the latency in Zookeeper.

Zookeeper configuration file should be located in /opt/kafka/config/zookeeper.properties. A basic example of the configuration file can be found below. The configuration file has to be readable by the kafka user.

timeTick=2000
dataDir=/var/lib/zookeeper/
clientPort=2181

3.2. Zookeeper cluster configuration

For reliable ZooKeeper service, you should deploy ZooKeeper in a cluster. Hence, for production use cases, you must run a cluster of replicated Zookeeper instances. Zookeeper clusters are also referred as ensembles.

Zookeeper clusters usually consist of an odd number of nodes. Zookeeper requires a majority of the nodes in the cluster should be up and running. For example, a cluster with three nodes, at least two of them must be up and running. This means it can tolerate one node being down. A cluster consisting of five nodes, at least three nodes must be available. This means it can tolerate two nodes being down. A cluster consisting of seven nodes, at least four nodes must be available. This means it can tolerate three nodes being down. Having more nodes in the Zookeeper cluster delivers better resiliency and reliability of the whole cluster.

Zookeeper can run in clusters with an even number of nodes. The additional node, however, does not increase the resiliency of the cluster. A cluster with four nodes requires at least three nodes to be available and can tolerate only one node being down. Therefore it has exactly the same resiliency as a cluster with only three nodes.

The different Zookeeper nodes should be ideally placed into different data centers or network segments. Increasing the number of Zookeeper nodes increases the workload spent on cluster synchronization. For most Kafka use cases Zookeeper cluster with 3, 5 or 7 nodes should be fully sufficient.

Warning

Zookeeper cluster with 3 nodes can tolerate only 1 unavailable node. This means that when a cluster node crashes while you are doing maintenance on another node your Zookeeper cluster will be unavailable.

Replicated Zookeeper configuration supports all configuration options supported by the standalone configuration. Additional options are added for the clustering configuration:

initLimit
Amount of time to allow followers to connect and sync to the cluster leader. The time is specified as a number of ticks (see the timeTick option for more details).
syncLimit
Amount of time for which followers can be behind the leader. The time is specified as a number of ticks (see the timeTick option for more details).

In addition to the options above, every configuration file should contain a list of servers which should be members of the Zookeeper cluster. The server records should be specified in the format server.id=hostname:port1:port2, where:

id
The ID of the Zookeeper cluster node.
hostname
The hostname or IP address where the node listens for connections.
port1
The port number used for intra-cluster communication.
port2
The port number used for leader election.

The following is an example configuration file of a Zookeeper cluster with three nodes:

timeTick=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2

server.1=172.17.0.1:2888:3888
server.2=172.17.0.2:2888:3888
server.3=172.17.0.3:2888:3888

Each node in the Zookeeper cluster has to be assigned with a unique ID. Each node’s ID has to be configured in myid file and stored in the dataDir folder like /var/lib/zookeeper/. The myid files should contain only a single line with the written ID as text. The ID can be any integer from 1 to 255. You must manually create this file on each cluster node. Using this file, each Zookeeper instance will use the configuration from the corresponding server. line in the configuration file to configure its listeners. It will also use all other server. lines to identify other cluster members.

In the above example, there are three nodes, so each one will have a different myid with values 1, 2, and 3 respectively.

3.3. Running multi-node Zookeeper cluster

This procedure will show you how to configure and run Zookeeper as a multi-node cluster.

Prerequisites

  • AMQ Streams is installed on all hosts which will be used as Zookeeper cluster nodes.

Running the cluster

  1. Create the myid file in /var/lib/zookeeper/. Enter ID 1 for the first Zookeeper node, 2 for the second Zookeeper node, and so on.

    su - kafka
    echo "_<NodeID>_" > /var/lib/zookeeper/myid

    For example:

    su - kafka
    echo "1" > /var/lib/zookeeper/myid
  2. Edit the Zookeeper /opt/kafka/config/zookeeper.properties configuration file for the following:

    • Set the option dataDir to /var/lib/zookeeper/. + Configure the initLimit and syncLimit options.
    • Add a list of all Zookeeper nodes. The list should include also the current node.

      Example configuration for a node of Zookeeper cluster with five members

      timeTick=2000
      dataDir=/var/lib/zookeeper/
      clientPort=2181
      initLimit=5
      syncLimit=2
      
      server.1=172.17.0.1:2888:3888
      server.2=172.17.0.2:2888:3888
      server.3=172.17.0.3:2888:3888
      server.4=172.17.0.4:2888:3888
      server.5=172.17.0.5:2888:3888

  3. Start Zookeeper with the default configuration file.

    su - kafka
    /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
  4. Verify that the Zookeeper is running.

    jcmd | grep zookeeper
  5. Repeat this procedure on all the nodes of the cluster.
  6. Once all nodes of the clusters are up and running, verify that all nodes are members of the cluster by sending a stat command to each of the nodes using ncat utility.

    Use ncat stat to check the node status

    echo stat | ncat localhost 2181

    In the output you should see information that the node is either leader or follower.

    Example output from the ncat command

    Zookeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT
    Clients:
     /0:0:0:0:0:0:0:1:59726[0](queued=0,recved=1,sent=0)
    
    Latency min/avg/max: 0/0/0
    Received: 2
    Sent: 1
    Connections: 1
    Outstanding: 0
    Zxid: 0x200000000
    Mode: follower
    Node count: 4

Additional resources

3.4. Authentication

By default, Zookeeper does not use any form of authentication and allows anonymous connections. However, it supports Java Authentication and Authorization Service (JAAS) which can be used to set up authentication using Simple Authentication and Security Layer (SASL). Zookeeper supports authentication using the DIGEST-MD5 SASL mechanism with locally stored credentials.

3.4.1. Authentication with SASL

JAAS is configured using a separate configuration file. It is recommended to place the JAAS configuration file in the same directory as the Zookeeper configuration (/opt/kafka/config/). The recommended file name is zookeeper-jaas.conf. When using a Zookeeper cluster with multiple nodes, the JAAS configuration file has to be created on all cluster nodes.

JAAS is configured using contexts. Separate parts such as the server and client are always configured with a separate context. The context is a configuration option and has the following format:

ContextName {
       param1
       param2;
};

SASL Authentication is configured separately for server-to-server communication (communication between Zookeeper instances) and client-to-server communication (communication between Kafka and Zookeeper). Server-to-server authentication is relevant only for Zookeeper clusters with multiple nodes.

Server-to-Server authentication

For server-to-server authentication, the JAAS configuration file contains two parts:

  • The server configuration
  • The client configuration

When using DIGEST-MD5 SASL mechanism, the QuorumServer context is used to configure the authentication server. It must contain all the usernames to be allowed to connect together with their passwords in an unencrypted form. The second context, QuorumLearner, has to be configured for the client which is built into Zookeeper. It also contains the password in an unencrypted form. An example of the JAAS configuration file for DIGEST-MD5 mechanism can be found below:

QuorumServer {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       user_zookeeper="123456";
};

QuorumLearner {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="zookeeper"
       password="123456";
};

In addition to the JAAS configuration file, you must enable the server-to-server authentication in the regular Zookeeper configuration file by specifying the following options:

quorum.auth.enableSasl=true
quorum.auth.learnerRequireSasl=true
quorum.auth.serverRequireSasl=true
quorum.auth.learner.loginContext=QuorumLearner
quorum.auth.server.loginContext=QuorumServer
quorum.cnxn.threads.size=20

Use the EXTRA_ARGS environment variable to pass the JAAS configuration file to the Zookeeper server as a Java property:

su - kafka
export EXTRA_ARGS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

For more information about server-to-server authentication, see Zookeeper wiki.

Client-to-Server authentication

Client-to-server authentication is configured in the same JAAS file as the server-to-server authentication. However, unlike the server-to-server authentication, it contains only the server configuration. The client part of the configuration has to be done in the client. For information on how to configure a Kafka broker to connect to Zookeeper using authentication, see the Kafka installation section.

Add the Server context to the JAAS configuration file to configure client-to-server authentication. For DIGEST-MD5 mechanism it configures all usernames and passwords:

Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    user_super="123456"
    user_kafka="123456"
    user_someoneelse="123456";
};

After configuring the JAAS context, enable the client-to-server authentication in the Zookeeper configuration file by adding the following line:

requireClientAuthScheme=sasl
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

You must add the authProvider.<ID> property for every server that is part of the Zookeeper cluster.

Use the EXTRA_ARGS environment variable to pass the JAAS configuration file to the Zookeeper server as a Java property:

su - kafka
export EXTRA_ARGS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

For more information about configuring Zookeeper authentication in Kafka brokers, see Section 4.6, “Zookeeper authentication”.

3.4.2. Enabling Server-to-server authentication using DIGEST-MD5

This procedure describes how to enable authentication using the SASL DIGEST-MD5 mechanism between the nodes of the Zookeeper cluster.

Prerequisites

  • AMQ Streams is installed on the host
  • Zookeeper cluster is configured with multiple nodes.

Enabling SASL DIGEST-MD5 authentication

  1. On all Zookeeper nodes, create or edit the /opt/kafka/config/zookeeper-jaas.conf JAAS configuration file and add the following contexts:

    QuorumServer {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           user__<Username>_="<Password>";
    };
    
    QuorumLearner {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           username="<Username>"
           password="<Password>";
    };

    The username and password must be the same in both JAAS contexts. For example:

    QuorumServer {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           user_zookeeper="123456";
    };
    
    QuorumLearner {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           username="zookeeper"
           password="123456";
    };
  2. On all Zookeeper nodes, edit the /opt/kafka/config/zookeeper.properties Zookeeper configuration file and set the following options:

    quorum.auth.enableSasl=true
    quorum.auth.learnerRequireSasl=true
    quorum.auth.serverRequireSasl=true
    quorum.auth.learner.loginContext=QuorumLearner
    quorum.auth.server.loginContext=QuorumServer
    quorum.cnxn.threads.size=20
  3. Restart all Zookeeper nodes one by one. To pass the JAAS configuration to Zookeeper, use the EXTRA_ARGS environment variable.

    su - kafka
    export EXTRA_ARGS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

Additional resources

3.4.3. Enabling Client-to-server authentication using DIGEST-MD5

This procedure describes how to enable authentication using the SASL DIGEST-MD5 mechanism between Zookeeper clients and Zookeeper.

Prerequisites

Enabling SASL DIGEST-MD5 authentication

  1. On all Zookeeper nodes, create or edit the /opt/kafka/config/zookeeper-jaas.conf JAAS configuration file and add the following context:

    Server {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        user_super="<SuperUserPassword>"
        user<Username1>_="<Password1>" user<USername2>_="<Password2>";
    };

    The super will have automatically administrator priviledges. The file can contain multiple users, but only one additional user is required by the Kafka brokers. The recommended name for the Kafka user is kafka.

    The following example shows the Server context for client-to-server authentication:

    Server {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        user_super="123456"
        user_kafka="123456";
    };
  2. On all Zookeeper nodes, edit the /opt/kafka/config/zookeeper.properties Zookeeper configuration file and set the following options:

    requireClientAuthScheme=sasl
    authProvider.<IdOfBroker1>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    authProvider.<IdOfBroker2>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    authProvider.<IdOfBroker3>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

    The authProvider.<ID> property has to be added for every node which is part of the Zookeeper cluster. An example three-node Zookeeper cluster configuration must look like the following:

    requireClientAuthScheme=sasl
    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  3. Restart all Zookeeper nodes one by one. To pass the JAAS configuration to Zookeeper, use the EXTRA_ARGS environment variable.

    su - kafka
    export EXTRA_ARGS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

Additional resources

3.5. Authorization

Zookeeper supports access control lists (ACLs) to protect data stored inside it. Kafka brokers can automatically configure the ACL rights for all Zookeeper records they create so no other Zookeeper user can modify them.

For more information about enabling Zookeeper ACLs in Kafka brokers, see Section 4.7, “Zookeeper authorization”.

3.6. TLS

The version of Zookeeper which is part of AMQ Streams currently does not support TLS for encryption or authentication.

3.7. Additional configuration options

You can set the following options based on your use case:

maxClientCnxns
The maximum number of concurrent client connections to a single member of the ZooKeeper cluster.
autopurge.snapRetainCount
Number of snapshots of Zookeeper’s in-memory database which will be retained. Default value is 3.
autopurge.purgeInterval
The time interval in hours for purging snapshots. The default value is 0 and this option is disabled.

All available configuration options can be found in Zookeeper documentation.

3.8. Logging

Zookeeper is using log4j as their logging infrastructure. Logging configuration is by default read from the log4j.propeties configuration file which should be placed either in the /opt/kafka/config/ directory or in the classpath. The location and name of the configuration file can be changed using the Java property log4j.configuration which can be passed to Zookeeper using the KAFKA_LOG4J_OPTS environment variable:

su - kafka
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/my/path/to/log4j.properties"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

For more information about Log4j configurations, see Log4j documentation.