Red Hat Training

A Red Hat training course is available for Red Hat AMQ

Chapter 4. Configuring Kafka

Kafka uses a properties file to store static configuration. The recommended location for the configuration file is /opt/kafka/config/server.properties. The configuration file has to be readable by the kafka user.

AMQ Streams ships an example configuration file that highlights various basic and advanced features of the product. It can be found under config/server.properties in the AMQ Streams installation directory.

This chapter explains the most important configuration options. For a complete list of supported Kafka broker configuration options, see Appendix A, Broker configuration parameters.

4.1. Zookeeper

Kafka brokers need Zookeeper to store some parts of their configuration as well as to coordinate the cluster (for example to decide which node is a leader for which partition). Connection details for the Zookeeper cluster are stored in the configuration file. The field zookeeper.connect contains a comma-separated list of hostnames and ports of members of the zookeeper cluster.

For example:

zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181

Kafka will use these addresses to connect to the Zookeeper cluster. With this configuration, all Kafka znodes will be created directly in the root of Zookeeper database. Therefore, such a Zookeeper cluster could be used only for a single Kafka cluster. To configure multiple Kafka clusters to use single Zookeeper cluster, specify a base (prefix) path at the end of the Zookeeper connection string in the Kafka configuration file:

zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181/my-cluster-1

4.2. Listeners

Kafka brokers can be configured to use multiple listeners. Each listener can be used to listen on a different port or network interface and can have different configuration. Listeners are configured in the listeners property in the configuration file. The listeners property contains a list of listeners with each listener configured as <listenerName>://<hostname>:_<port>_. When the hostname value is empty, Kafka will use java.net.InetAddress.getCanonicalHostName() as hostname. The following example shows how multiple listeners might be configured:

listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094

When a Kafka client wants to connect to a Kafka cluster, it first connects to a bootstrap server. The bootstrap server is one of the cluster nodes. It will provide the client with a list of all other brokers which are part of the cluster and the client will connect to them individually. By default the bootstrap server will provide the client with a list of nodes based on the listeners field.

Advertised listeners

It is possible to give the client a different set of addresses than given in the listeners property. It is useful in situations when additional network infrastructure, such as a proxy, is between the client and the broker, or when an external DNS name should be used instead of an IP address. Here, the broker allows defining the advertised addresses of the listeners in the advertised.listeners configuration property. This property has the same format as the listeners property. The following example shows how to configure advertised listeners:

listeners=INT1://:9092,INT2://:9093
advertised.listeners=INT1://my-broker-1.my-domain.com:1234,INT2://my-broker-1.my-domain.com:1234:9093
Note

The names of the listeners have to match the names of the listeners from the listeners property.

Inter-broker listeners

When the cluster has replicated topics, the brokers responsible for such topics need to communicate with each other in order to replicate the messages in those topics. When multiple listeners are configured, the configuration field inter.broker.listener.name can be used to specify the name of the listener which should be used for replication between brokers. For example:

inter.broker.listener.name=REPLICATION

4.3. Commit logs

Apache Kafka stores all records it receives from producers in commit logs. The commit logs contain the actual data, in the form of records, that Kafka needs to deliver. These are not the application log files which record what the broker is doing.

Log directories

You can configure log directories using the log.dirs property filed to store commit logs in one or multiple log directories. It should be set to /var/lib/kafka directory created during installation:

log.dirs=/var/lib/kafka

For performance reasons, you can configure log.dirs to multiple directories and place each of them on a different physical device to improve disk I/O performance. For example:

log.dirs=/var/lib/kafka1,/var/lib/kafka2,/var/lib/kafka3

4.4. Broker ID

Broker ID is a unique identifier for each broker in the cluster. You can assign an integer greater than or equal to 0 as broker ID. The broker ID is used to identify the brokers after restarts or crashes and it is therefore important that the id is stable and does not change over time. The broker ID is configured in the broker properties file:

broker.id=1

4.5. Running a multi-node Kafka cluster

This procedure will show you how to configure and run Kafka as a multi-node cluster.

Prerequisites

Running the cluster

  1. Edit the /opt/kafka/config/server.properties Kafka configuration file for the following:

    • Set the broker.id field to 0 for the first broker, 1 for the second broker, and so on. + Configure the details for connecting to Zookeeper in the zookeeper.connect option. + Configure the Kafka listeners. + Set the directories where the commit logs should be stored in the logs.dir directory.

      The following example shows the configuration for a Kafka broker:

      broker.id=0
      zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181
      listeners=REPLICATION://:9091,PLAINTEXT://:9092
      inter.broker.listener.name=REPLICATION
      log.dirs=/var/lib/kafka

      In a typical installation where each Kafka broker is running on identical hardware, only the broker.id configuration property will differ between each broker config.

  2. Start the Kafka broker with the default configuration file.

    su - kafka
    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  3. Verify that the Kafka broker is running.

    jcmd | grep Kafka
  4. Repeat this procedure on all the nodes of the Kafka cluster.
  5. Once all nodes of the clusters are up and running, verify that all nodes are members of the Kafka cluster by sending a dump command to one of the Zookeeper nodes using the ncat utility. The command will print all Kafka brokers registered in Zookeeper.

    Use ncat stat to check the node status

    echo dump | ncat zoo1.my-domain.com 2181

    The output should contain all Kafka brokers you just configured and started.

    Example output from the ncat command for Kafka cluster with 3 nodes

    SessionTracker dump:
    org.apache.zookeeper.server.quorum.LearnerSessionTracker@28848ab9
    ephemeral nodes dump:
    Sessions with Ephemerals (3):
    0x20000015dd00000:
            /brokers/ids/1
    0x10000015dc70000:
            /controller
            /brokers/ids/0
    0x10000015dc70001:
            /brokers/ids/2

Additional resources

4.6. Zookeeper authentication

By default, connections between Zookeeper and Kafka are not authenticated. However, Kafka and Zookeeper support Java Authentication and Authorization Service (JAAS) which can be used to set up authentication using Simple Authentication and Security Layer (SASL). Zookeeper supports authentication using the DIGEST-MD5 SASL mechanism with locally stored credentials.

4.6.1. JAAS Configuration

SASL authentication for Zookeeper connections has to be configured in the JAAS configuration file. By default, Kafka will use the JAAS context named Client for connecting to Zookeeper. The Client context should be configured in the /opt/kafka/config/jass.conf file. The context has to enable the PLAIN SASL authentication, as in the following example:

Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="123456";
};

4.6.2. Enabling Zookeeper authentication

This procedure describes how to enable authentication using the SASL DIGEST-MD5 mechanism when connecting to Zookeeper.

Prerequisites

  • Client-to-server authentication is enabled in Zookeeper

Enabling SASL DIGEST-MD5 authentication

  1. On all Kafka broker nodes, create or edit the /opt/kafka/config/jaas.conf JAAS configuration file and add the following context:

    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="<Username>"
        password="<Password>";
    };

    The username and password should be the same as configured in Zookeeper.

    Following example shows the Client context:

    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="123456";
    };
  2. Restart all Kafka broker nodes one by one. To pass the JAAS configuration to Kafka brokers, use the KAFKA_OPTS environment variable.

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

Additional resources

4.7. Zookeeper authorization

When authentication is enabled between Kafka and Zookeeper, Kafka can be configured to automatically protect all its records with Access Control List (ACL) rules which will allow only the Kafka user to change the data. All other users will have read-only access.

4.7.1. ACL Configuration

Enforcement of ACL rules is controlled by the zookeeper.set.acl property in the config/server.properties Kafka configuration file and is disabled by default. To enabled the ACL protection set zookeeper.set.acl to true:

zookeeper.set.acl=true

Kafka will set the ACL rules only for newly created Zookeeper znodes. When the ACLs are only enabled after the first start of the cluster, the tool zookeeper-security-migration.sh can be used to set ACLs on all existing znodes.

The data stored in Zookeeper includes information such as topic names and their configuration. The Zookeeper database also contains the salted and hashed user credentials when SASL SCRAM authentication is used. But it does not include any records sent and received using Kafka. Kafka, in general, considers the data stored in Zookeeper as non-confidential. In case these data are considered confidential (for example because topic names contain customer identification) the only way how to protect them is by isolating Zookeeper on the network level and allowing access only to Kafka brokers.

4.7.2. Enabling Zookeeper ACLs for a new Kafka cluster

This procedure describes how to enable Zookeeper ACLs in Kafka configuration for a new Kafka cluster. Use this procedure only before the first start of the Kafka cluster. For enabling Zookeeper ACLs in already running cluster, see Section 4.7.3, “Enabling Zookeeper ACLs in an existing Kafka cluster”.

Prerequisites

  • AMQ Streams is installed on all hosts which will be used as Kafka brokers.
  • Zookeeper cluster is configured and running.
  • Client-to-server authentication is enabled in Zookeeper.
  • Zookeeper authentication is enabled in the Kafka brokers.
  • Kafka broker have not yet been started.

Procedure

  1. Edit the /opt/kafka/config/server.properties Kafka configuration file to set the zookeeper.set.acl field to true on all cluster nodes.

    zookeeper.set.acl=true
  2. Start the Kafka brokers.

Additional resources

4.7.3. Enabling Zookeeper ACLs in an existing Kafka cluster

The zookeeper-security-migration.sh tool can to be used to set Zookeeper ACLs on all existing znodes. The zookeeper-security-migration.sh is available as part of AMQ Streams and can be found in the bin directory.

Prerequisites

Enabling the Zookeeper ACLs

  1. Edit the /opt/kafka/config/server.properties Kafka configuration file to set the zookeeper.set.acl field to true on all cluster nodes.

    zookeeper.set.acl=true
  2. Restart all Kafka brokers one by one
  3. Set the ACLs on all existing Zookeeper znodes using the zookeeper-security-migration.sh tool.

    su - kafka
    cd /opt/kafka
    KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=_<ZookeeperURL>_
    exit

    For example:

    su - kafka
    cd /opt/kafka
    KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=zoo1.my-domain.com:2181
    exit

4.8. Encryption and authentication

Kafka supports TLS for encrypting the communication with Kafka clients. Additionally, it supports two types of authentication:

  • TLS client authentication based on X.509 certificates
  • SASL Authentication based on a username and password

4.8.1. Listener configuration

Encryption and authentication in Kafka brokers is configured per listener. For more information about Kafka listener configuration, see Section 4.2, “Listeners”.

Each listener in the Kafka broker is configured with its own security protocol. The configuration property listener.security.protocol.map defines which listener uses which security protocol. It maps each listener name to its security protocol. Supported security protocols are:

PLAINTEXT
Listener without any encryption or authentication.
SSL
Listener using TLS encryption and, optionally, authentication using TLS client certificates.
SASL_PLAINTEXT
Listener without encryption but with SASL-based authentication.
SASL_SSL
Listener with TLS-based encryption and SASL-based authentication.

Given the following listeners configuration:

listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094

the listener.security.protocol.map might look like this:

listener.security.protocol.map=INT1:SASL_PLAINTEXT,INT2:SASL_SSL,REPLICATION:SSL

This would configure the listener INT1 to use unencrypted connections with SASL authentication, the listener INT2 to use encrypted connections with SASL authentication and the REPLICATION interface to use TLS encryption (possibly with TLS client authentication). The same security protocol can be used multiple times. The following example is also a valid configuration:

listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL

Such a configuration would use TLS encryption and TLS authentication for all interfaces. The following chapters will explain in more detail how to configure TLS and SASL.

4.8.2. TLS Encryption

In order to use TLS encryption and server authentication, a keystore containing private and public keys has to be provided. This is usually done using a file in the Java Keystore (JKS) format. A path to this file is set in the ssl.keystore.location property. The ssl.keystore.password property should be used to set the password protecting the keystore. For example:

ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456

In some cases, an additional password is used to protect the private key. Any such password can be set using the ssl.key.password property.

Kafka is able to use keys signed by certification authorities as well as self-signed keys. Using keys signed by certification authorities should always be the preferred method. In order to allow clients to verify the identity of the Kafka broker they are connecting to, the certificate should always contain the advertised hostname(s) as its Common Name (CN) or in the Subject Alternative Names (SAN).

It is possible to use different SSL configurations for different listeners. All options starting with ssl. can be prefixed with listener.name.<NameOfTheListener>., where the name of the listener has to be always in lower case. This will override the default SSL configuration for that specific listener. The following example shows how to use different SSL configurations for different listeners:

listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL

# Default configuration - will be used for listeners INT1 and INT2
ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456

# Different configuration for listener REPLICATION
listener.name.replication.ssl.keystore.location=/path/to/keystore/server-1.jks
listener.name.replication.ssl.keystore.password=123456

Additional TLS configuration options

In addition to the main TLS configuration options described above, Kafka supports many options for fine-tuning the TLS configuration. For example, to enable or disable TLS / SSL protocols or cipher suites:

ssl.cipher.suites
List of enabled cipher suites. Each cipher suite is a combination of authentication, encryption, MAC and key exchange algorithms used for the TLS connection. By default, all available cipher suites are enabled.
ssl.enabled.protocols
List of enabled TLS / SSL protocols. Defaults to TLSv1.2,TLSv1.1,TLSv1.

For a complete list of supported Kafka broker configuration options, see Appendix A, Broker configuration parameters.

4.8.3. Enabling TLS encryption

This procedure describes how to enable encryption in Kafka brokers.

Prerequisites

  • AMQ Streams is installed on all hosts which will be used as Kafka brokers.

Procedure

  1. Generate TLS certificates for all Kafka brokers in your cluster. The certificates should have their advertised and bootstrap addresses in their Common Name or Subject Alternative Name.
  2. Edit the /opt/kafka/config/server.properties Kafka configuration file on all cluster nodes for the following:

    • Change the listener.security.protocol.map field to specify the SSL protocol for the listener where you want to use TLS encryption.
    • Set the ssl.keystore.location option to the path to the JKS keystore with the broker certificate.
    • Set the ssl.keystore.password option to the password you used to protect the keystore.

      For example:

      listeners=UNENCRYPTED://:9092,ENCRYPTED://:9093,REPLICATION://:9094
      listener.security.protocol.map=UNENCRYPTED:PLAINTEXT,ENCRYPTED:SSL,REPLICATION:PLAINTEXT
      ssl.keystore.location=/path/to/keystore/server-1.jks
      ssl.keystore.password=123456
  3. (Re)start the Kafka brokers

Additional resources

4.8.4. Authentication

Kafka supports two methods of authentication. On all connections, authentication using one of the supported SASL (Simple Authentication and Security Layer) mechanisms can be used. On encrypted connections, TLS client authentication based on X.509 certificates can be used.

4.8.4.1. TLS client authentication

TLS client authentication can be used only on connections which are already using TLS encryption. To use TLS client authentication, a truststore with public keys can be provided to the broker. These keys can be used to authenticate clients connecting to the broker. The truststore should be provided in Java Keystore (JKS) format and should contain public keys of the certification authorities. All clients with public and private keys signed by one of the certification authorities included in the truststore will be authenticated. The location of the truststore is set using field ssl.truststore.location. In case the truststore is password protected, the password should be set in the ssl.truststore.password property. For example:

ssl.truststore.location=/path/to/keystore/server-1.jks
ssl.truststore.password=123456

Once the truststore is configured, TLS client authentication has to be enabled using the ssl.client.auth property. This property can be set to one of three different values:

none
TLS client authentication is switched off. (Default value)
requested
TLS client authentication is optional. Clients will be asked to authenticate using TLS client certificate but they can choose not to.
required
Clients are required to authenticate using TLS client certificate.

When a client authenticates using TLS client authentication, the authenticated principal name is the distinguished name from the authenticated client certificate. For example, a user with a certificate which has a distinguished name CN=someuser will be authenticated with the following principal CN=someuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown. When TLS client authentication is not used and SASL is disabled, the principal name will be ANONYMOUS.

4.8.4.2. SASL authentication

SASL authentication is configured using Java Authentication and Authorization Service (JAAS). JAAS is also used for authentication of connections between Kafka and Zookeeper. JAAS uses its own configuration file. The recommended location for this file is /opt/kafka/config/jaas.conf. The file has to be readable by the kafka user. When running Kafka, the location of this file is specified using Java system property java.security.auth.login.config. This property has to be passed to Kafka when starting the broker nodes:

KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/my/jaas.config"; bin/kafka-server-start.sh

SASL authentication is supported both through plain unencrypted connections as well as through TLS connections. SASL can be enabled individually for each listener. To enable it, the security protocol in listener.security.protocol.map has to be either SASL_PLAINTEXT or SASL_SSL.

SASL authentication in Kafka supports several different mechanisms:

PLAIN
Implements authentication based on username and passwords. Usernames and passwords are stored locally in Kafka configuration.
SCRAM-SHA-256 and SCRAM-SHA-512
Implements authentication using Salted Challenge Response Authentication Mechanism (SCRAM). SCRAM credentials are stored centrally in Zookeeper. SCRAM can be used in situations where Zookeeper cluster nodes are running isolated in a private network.
GSSAPI
Implements authentication against a Kerberos server.
Warning

The PLAIN mechanism sends the username and password over the network in an unencrypted format. It should be therefore only be used in combination with TLS encryption.

The SASL mechanisms are configured via the JAAS configuration file. Kafka uses the JAAS context named KafkaServer. After they are configured in JAAS, the SASL mechanisms have to be enabled in the Kafka configuration. This is done using the sasl.enabled.mechanisms property. This property contains a comma-separated list of enabled mechanisms:

sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512

In case the listener used for inter-broker communication is using SASL, the property sasl.mechanism.inter.broker.protocol has to be used to specify the SASL mechanism which it should use. For example:

sasl.mechanism.inter.broker.protocol=PLAIN

The username and password which will be used for the inter-broker communication has to be specified in the KafkaServer JAAS context using the field username and password.

SASL PLAIN

To use the PLAIN mechanism, the usernames and password which are allowed to connect are specified directly in the JAAS context. The following example shows the context configured for SASL PLAIN authentication. The example configures three different users:

  • admin
  • user1
  • user2
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";
};

The JAAS configuration file with the user database should be kept in sync on all Kafka brokers.

When SASL PLAIN is also used for inter-broker authentication, the username and password properties should be included in the JAAS context:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="123456"
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";
};

SASL SCRAM

SCRAM authentication in Kafka consists of two mechanisms: SCRAM-SHA-256 and SCRAM-SHA-512. These mechanisms differ only in the hashing algorithm used - SHA-256 versus stronger SHA-512. To enable SCRAM authentication, the JAAS configuration file has to include the following configuration:

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required;
};

When enabling SASL authentication in the Kafka configuration file, both SCRAM mechanisms can be listed. However, only one of them can be chosen for the inter-broker communication. For example:

sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

User credentials for the SCRAM mechanism are stored in Zookeeper. The kafka-configs.sh tool can be used to manage them. For example, run the following command to add user user1 with password 123456:

bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1

To delete a user credential use:

bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1

SASL GSSAPI

The SASL mechanism used for authentication using Kerberos is called GSSAPI. To configure Kerberos SASL authentication, the following configuration should be added to the JAAS configuration file:

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};

The domain name in the Kerberos principal has to be always in upper case.

In addition to the JAAS configuration, the Kerberos service name needs to be specified in the sasl.kerberos.service.name property in the Kafka configuration:

sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka

Multiple SASL mechanisms

Kafka can use multiple SASL mechanisms at the same time. The different JAAS configurations can be all added to the same context:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";

    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";

    org.apache.kafka.common.security.scram.ScramLoginModule required;
};

When multiple mechanisms are enabled, clients will be able to choose the mechanism which they want to use.

4.8.5. Enabling TLS client authentication

This procedure describes how to enable TLS client authentication in Kafka brokers.

Prerequisites

  • AMQ Streams is installed on all hosts which will be used as Kafka brokers.
  • TLS encryption is enabled.

Procedure

  1. Prepare a JKS truststore containing the public key of the certification authority used to sign the user certificates.
  2. Edit the /opt/kafka/config/server.properties Kafka configuration file on all cluster nodes for the following:

    • Set the ssl.truststore.location option to the path to the JKS truststore with the certification authority of the user certificates.
    • Set the ssl.truststore.password option to the password you used to protect the truststore.
    • Set the ssl.client.auth option to required.

      For example:

      ssl.truststore.location=/path/to/truststore.jks
      ssl.truststore.password=123456
      ssl.client.auth=required
  3. (Re)start the Kafka brokers

Additional resources

4.8.6. Enabling SASL PLAIN authentication

This procedure describes how to enable SASL PLAIN authentication in Kafka brokers.

Prerequisites

  • AMQ Streams is installed on all hosts which will be used as Kafka brokers.

Procedure

  1. Edit or create the /opt/kafka/config/jaas.conf JAAS configuration file. This file should contain all your users and their passwords. Make sure this file is the same on all Kafka brokers.

    For example:

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        user_admin="123456"
        user_user1="123456"
        user_user2="123456";
    };
  2. Edit the /opt/kafka/config/server.properties Kafka configuration file on all cluster nodes for the following:

    • Change the listener.security.protocol.map field to specify the SASL_PLAINTEXT or SASL_SSL protocol for the listener where you want to use SASL PLAIN authentication.
    • Set the sasl.enabled.mechanisms option to PLAIN.

      For example:

      listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
      listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
      sasl.enabled.mechanisms=PLAIN
  3. (Re)start the Kafka brokers using the KAFKA_OPTS environment variable to pass the JAAS configuration to Kafka brokers.

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

Additional resources

4.8.7. Enabling SASL SCRAM authentication

This procedure describes how to enable SASL SCRAM authentication in Kafka brokers.

Prerequisites

  • AMQ Streams is installed on all hosts which will be used as Kafka brokers.

Procedure

  1. Edit or create the /opt/kafka/config/jaas.conf JAAS configuration file. Enable the ScramLoginModule for the KafkaServer context. Make sure this file is the same on all Kafka brokers.

    For example:

    KafkaServer {
        org.apache.kafka.common.security.scram.ScramLoginModule required;
    };
  2. Edit the /opt/kafka/config/server.properties Kafka configuration file on all cluster nodes for the following:

    • Change the listener.security.protocol.map field to specify the SASL_PLAINTEXT or SASL_SSL protocol for the listener where you want to use SASL SCRAM authentication.
    • Set the sasl.enabled.mechanisms option to SCRAM-SHA-256 or SCRAM-SHA-512.

      For example:

      listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
      listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
      sasl.enabled.mechanisms=SCRAM-SHA-512
  3. (Re)start the Kafka brokers using the KAFKA_OPTS environment variable to pass the JAAS configuration to Kafka brokers.

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

Additional resources

4.8.8. Adding SASL SCRAM users

This procedure describes how to add new users for authentication using SASL SCRAM.

Prerequisites

  • AMQ Streams is installed on all hosts which will be used as Kafka brokers.
  • SASL SCRAM authentication is enabled.

Procedure

  • Use the kafka-configs.sh tool to add new SASL SCRAM users.

    bin/kafka-configs.sh --zookeeper <ZookeeperAddress> --alter --add-config 'SCRAM-SHA-512=[password=<Password>]' --entity-type users --entity-name <Username>

    For example:

    bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --alter --add-config 'SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1

Additional resources

4.8.9. Deleting SASL SCRAM users

This procedure describes how to remove users when using SASL SCRAM authentication.

Prerequisites

  • AMQ Streams is installed on all hosts which will be used as Kafka brokers.
  • SASL SCRAM authentication is enabled.

Procedure

  • Use the kafka-configs.sh tool to delete SASL SCRAM users.

    bin/kafka-configs.sh --zookeeper <ZookeeperAddress> --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name <Username>

    For example:

    bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1

Additional resources

4.9. Logging

Kafka brokers use Log4j as their logging infrastructure. Logging configuration is by default read from the log4j.propeties configuration file which should be placed either in the /opt/kafka/config/ directory or on the classpath. The location and name of the configuration file can be changed using the Java property log4j.configuration which can be passed to Kafka using the KAFKA_LOG4J_OPTS environment variable:

su - kafka
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/my/path/to/log4j.config"; /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

For more information about Log4j configurations, see Log4j manual.