Chapter 4. Managing secure access to Kafka

You can secure your Kafka cluster by managing the access each client has to the Kafka brokers.

A secure connection between Kafka brokers and clients can encompass:

  • Encryption for data exchange
  • Authentication to prove identity
  • Authorization to allow or decline actions executed by users

This chapter explains how to set up secure connections between Kafka brokers and clients, with sections describing:

  • Security options for Kafka clusters and clients
  • How to secure Kafka brokers
  • How to use an authorization server for OAuth 2.0 token-based authentication and authorization

4.1. Security options for Kafka

Use the Kafka resource to configure the mechanisms used for Kafka authentication and authorization.

4.1.1. Listener authentication

For clients inside the OpenShift cluster, you can create plain (without encryption) or tls internal listeners.

For clients outside the OpenShift cluster, you create external listeners and specify a connection mechanism, which can be nodeport, loadbalancer, ingress or route (on OpenShift).

For more information on the configuration options for connecting an external client, see Configuring external listeners.

Supported authentication options:

  1. Mutual TLS authentication (only on the listeners with TLS enabled encryption)
  2. SCRAM-SHA-512 authentication
  3. OAuth 2.0 token based authentication

The authentication option you choose depends on how you wish to authenticate client access to Kafka brokers.

Figure 4.1. Kafka listener authentication options

options for listener authentication configuration

The listener authentication property is used to specify an authentication mechanism specific to that listener.

If no authentication property is specified then the listener does not authenticate clients which connect through that listener. The listener will accept all connections without authentication.

Authentication must be configured when using the User Operator to manage KafkaUsers.

The following example shows:

  • A plain listener configured for SCRAM-SHA-512 authentication
  • A tls listener with mutual TLS authentication
  • An external listener with mutual TLS authentication

Each listener is configured with a unique name and port within a Kafka cluster.

Note

Listeners cannot be configured to use the ports set aside for interbroker communication (9091) and metrics (9404).

An example showing listener authentication configuration

# ...
listeners:
  - name: plain
    port: 9092
    type: internal
    tls: true
    authentication:
      type: scram-sha-512
  - name: tls
    port: 9093
    type: internal
    tls: true
    authentication:
      type: tls
  - name: external
    port: 9094
    type: loadbalancer
    tls: true
    authentication:
      type: tls
# ...

4.1.1.1. Mutual TLS authentication

Mutual TLS authentication is always used for the communication between Kafka brokers and ZooKeeper pods.

AMQ Streams can configure Kafka to use TLS (Transport Layer Security) to provide encrypted communication between Kafka brokers and clients either with or without mutual authentication. For mutual, or two-way, authentication, both the server and the client present certificates. When you configure mutual authentication, the broker authenticates the client (client authentication) and the client authenticates the broker (server authentication).

Note

TLS authentication is more commonly one-way, with one party authenticating the identity of another. For example, when HTTPS is used between a web browser and a web server, the browser obtains proof of the identity of the web server.

4.1.1.2. SCRAM-SHA-512 authentication

SCRAM (Salted Challenge Response Authentication Mechanism) is an authentication protocol that can establish mutual authentication using passwords. AMQ Streams can configure Kafka to use SASL (Simple Authentication and Security Layer) SCRAM-SHA-512 to provide authentication on both unencrypted and encrypted client connections.

When SCRAM-SHA-512 authentication is used with a TLS client connection, the TLS protocol provides the encryption, but is not used for authentication.

The following properties of SCRAM make it safe to use SCRAM-SHA-512 even on unencrypted connections:

  • The passwords are not sent in the clear over the communication channel. Instead the client and the server are each challenged by the other to offer proof that they know the password of the authenticating user.
  • The server and client each generate a new challenge for each authentication exchange. This means that the exchange is resilient against replay attacks.

When a KafkaUser.spec.authentication.type is configured with scram-sha-512 the User Operator will generate a random 12-character password consisting of upper and lowercase ASCII letters and numbers.

4.1.1.3. Network policies

AMQ Streams automatically creates a NetworkPolicy resource for every listener that is enabled on a Kafka broker. By default, a NetworkPolicy grants access to a listener to all applications and namespaces.

If you want to restrict access to a listener at the network level to only selected applications or namespaces, use the networkPolicyPeers property.

Use network policies as part of the listener authentication configuration. Each listener can have a different networkPolicyPeers configuration.

For more information, refer to the Listener network policies section and the NetworkPolicyPeer API reference.

Note

Your configuration of OpenShift must support ingress NetworkPolicies in order to use network policies in AMQ Streams.

4.1.1.4. Additional listener configuration options

You can use the properties of the GenericKafkaListenerConfiguration schema to add further configuration to listeners.

4.1.2. Kafka authorization

You can configure authorization for Kafka brokers using the authorization property in the Kafka.spec.kafka resource. If the authorization property is missing, no authorization is enabled and clients have no restrictions. When enabled, authorization is applied to all enabled listeners. The authorization method is defined in the type field.

Supported authorization options:

Figure 4.2. Kafka cluster authorization options

options for kafks authorization configuration

4.1.2.1. Super users

Super users can access all resources in your Kafka cluster regardless of any access restrictions, and are supported by all authorization mechanisms.

To designate super users for a Kafka cluster, add a list of user principals to the superUsers property. If a user uses TLS client authentication, their username is the common name from their certificate subject prefixed with CN=.

An example configuration with super users

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: myproject
spec:
  kafka:
    # ...
    authorization:
      type: simple
      superUsers:
        - CN=client_1
        - user_2
        - CN=client_3
    # ...

4.2. Security options for Kafka clients

Use the KafkaUser resource to configure the authentication mechanism, authorization mechanism, and access rights for Kafka clients. In terms of configuring security, clients are represented as users.

You can authenticate and authorize user access to Kafka brokers. Authentication permits access, and authorization constrains the access to permissible actions.

You can also create super users that have unconstrained access to Kafka brokers.

The authentication and authorization mechanisms must match the specification for the listener used to access the Kafka brokers.

4.2.1. Identifying a Kafka cluster for user handling

A KafkaUser resource includes a label that defines the appropriate name of the Kafka cluster (derived from the name of the Kafka resource) to which it belongs.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster

The label is used by the User Operator to identify the KafkaUser resource and create a new user, and also in subsequent handling of the user.

If the label does not match the Kafka cluster, the User Operator cannot identify the KafkaUser and the user is not created.

If the status of the KafkaUser resource remains empty, check your label.

4.2.2. User authentication

User authentication is configured using the authentication property in KafkaUser.spec. The authentication mechanism enabled for the user is specified using the type field.

Supported authentication mechanisms:

  • TLS client authentication
  • SCRAM-SHA-512 authentication

When no authentication mechanism is specified, the User Operator does not create the user or its credentials.

4.2.2.1. TLS Client Authentication

To use TLS client authentication, you set the type field to tls.

An example KafkaUser with TLS client authentication enabled

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  # ...

When the user is created by the User Operator, it creates a new Secret with the same name as the KafkaUser resource. The Secret contains a private and public key for TLS client authentication. The public key is contained in a user certificate, which is signed by the client Certificate Authority (CA).

All keys are in X.509 format.

Secrets provide private keys and certificates in PEM and PKCS #12 formats.

For more information on securing Kafka communication with Secrets, see Chapter 11, Managing TLS certificates.

An example Secret with user credentials

apiVersion: v1
kind: Secret
metadata:
  name: my-user
  labels:
    strimzi.io/kind: KafkaUser
    strimzi.io/cluster: my-cluster
type: Opaque
data:
  ca.crt: # Public key of the client CA
  user.crt: # User certificate that contains the public key of the user
  user.key: # Private key of the user
  user.p12: # PKCS #12 archive file for storing certificates and keys
  user.password: # Password for protecting the PKCS #12 archive file

4.2.2.2. SCRAM-SHA-512 Authentication

To use the SCRAM-SHA-512 authentication mechanism, you set the type field to scram-sha-512.

An example KafkaUser with SCRAM-SHA-512 authentication enabled

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
  # ...

When the user is created by the User Operator, it creates a new secret with the same name as the KafkaUser resource. The secret contains the generated password in the password key, which is encoded with base64. In order to use the password, it must be decoded.

An example Secret with user credentials

apiVersion: v1
kind: Secret
metadata:
  name: my-user
  labels:
    strimzi.io/kind: KafkaUser
    strimzi.io/cluster: my-cluster
type: Opaque
data:
  password: Z2VuZXJhdGVkcGFzc3dvcmQ= 1
  sasl.jaas.config: b3JnLmFwYWNoZS5rYWZrYS5jb21tb24uc2VjdXJpdHkuc2NyYW0uU2NyYW1Mb2dpbk1vZHVsZSByZXF1aXJlZCB1c2VybmFtZT0ibXktdXNlciIgcGFzc3dvcmQ9ImdlbmVyYXRlZHBhc3N3b3JkIjsK 2

1
The generated password, base64 encoded.
2
The JAAS configuration string for SASL SCRAM-SHA-512 authentication, base64 encoded.

Decoding the generated password:

echo "Z2VuZXJhdGVkcGFzc3dvcmQ=" | base64 --decode

4.2.3. User authorization

User authorization is configured using the authorization property in KafkaUser.spec. The authorization type enabled for a user is specified using the type field.

To use simple authorization, you set the type property to simple in KafkaUser.spec.authorization. Simple authorization uses the default Kafka authorization plugin, AclAuthorizer.

Alternatively, you can use OPA authorization, or if you are already using OAuth 2.0 token based authentication, you can also use OAuth 2.0 authorization.

If no authorization is specified, the User Operator does not provision any access rights for the user. Whether such a KafkaUser can still access resources depends on the authorizer being used. For example, for the AclAuthorizer this is determined by its allow.everyone.if.no.acl.found configuration.

4.2.3.1. ACL rules

AclAuthorizer uses ACL rules to manage access to Kafka brokers.

ACL rules grant access rights to the user, which you specify in the acls property.

For more information about the AclRule object, see the AclRule schema reference.

4.2.3.2. Super user access to Kafka brokers

If a user is added to a list of super users in a Kafka broker configuration, the user is allowed unlimited access to the cluster regardless of any authorization constraints defined in ACLs in KafkaUser.

For more information on configuring super user access to brokers, see Kafka authorization.

4.2.3.3. User quotas

You can configure the spec for the KafkaUser resource to enforce quotas so that a user does not exceed access to Kafka brokers based on a byte threshold or a time limit of CPU utilization.

An example KafkaUser with user quotas

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  # ...
  quotas:
    producerByteRate: 1048576 1
    consumerByteRate: 2097152 2
    requestPercentage: 55 3

1
Byte-per-second quota on the amount of data the user can push to a Kafka broker
2
Byte-per-second quota on the amount of data the user can fetch from a Kafka broker
3
CPU utilization limit as a percentage of time for a client group

For more information on these properties, see the KafkaUserQuotas schema reference.

4.3. Securing access to Kafka brokers

To establish secure access to Kafka brokers, you configure and apply:

  • A Kafka resource to:

    • Create listeners with a specified authentication type
    • Configure authorization for the whole Kafka cluster
  • A KafkaUser resource to access the Kafka brokers securely through the listeners

Configure the Kafka resource to set up:

  • Listener authentication
  • Network policies that restrict access to Kafka listeners
  • Kafka authorization
  • Super users for unconstrained access to brokers

Authentication is configured independently for each listener. Authorization is always configured for the whole Kafka cluster.

The Cluster Operator creates the listeners and sets up the cluster and client certificate authority (CA) certificates to enable authentication within the Kafka cluster.

You can replace the certificates generated by the Cluster Operator by installing your own certificates. You can also configure your listener to use a Kafka listener certificate managed by an external Certificate Authority. Certificates are available in PKCS #12 format (.p12) and PEM (.crt) formats.

Use KafkaUser to enable the authentication and authorization mechanisms that a specific client uses to access Kafka.

Configure the KafkaUser resource to set up:

  • Authentication to match the enabled listener authentication
  • Authorization to match the enabled Kafka authorization
  • Quotas to control the use of resources by clients

The User Operator creates the user representing the client and the security credentials used for client authentication, based on the chosen authentication type.

Additional resources

For more information about the schema for:

4.3.1. Securing Kafka brokers

This procedure shows the steps involved in securing Kafka brokers when running AMQ Streams.

The security implemented for Kafka brokers must be compatible with the security implemented for the clients requiring access.

  • Kafka.spec.kafka.listeners[*].authentication matches KafkaUser.spec.authentication
  • Kafka.spec.kafka.authorization matches KafkaUser.spec.authorization

The steps show the configuration for simple authorization and a listener using TLS authentication. For more information on listener configuration, see GenericKafkaListener schema reference.

Alternatively, you can use SCRAM-SHA or OAuth 2.0 for listener authentication, and OAuth 2.0 or OPA for Kafka authorization.

Procedure

  1. Configure the Kafka resource.

    1. Configure the authorization property for authorization.
    2. Configure the listeners property to create a listener with authentication.

      For example:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      spec:
        kafka:
          # ...
          authorization: 1
            type: simple
            superUsers: 2
              - CN=client_1
              - user_2
              - CN=client_3
          listeners:
            - name: tls
              port: 9093
              type: internal
              tls: true
              authentication:
                type: tls 3
          # ...
        zookeeper:
          # ...
      1
      2
      List of user principals with unlimited access to Kafka. CN is the common name from the client certificate when TLS authentication is used.
      3
      Listener authentication mechanisms may be configured for each listener, and specified as mutual TLS, SCRAM-SHA-512 or token-based OAuth 2.0.

      If you are configuring an external listener, the configuration is dependent on the chosen connection mechanism.

  2. Create or update the Kafka resource.

    oc apply -f KAFKA-CONFIG-FILE

    The Kafka cluster is configured with a Kafka broker listener using TLS authentication.

    A service is created for each Kafka broker pod.

    A service is created to serve as the bootstrap address for connection to the Kafka cluster.

    The cluster CA certificate to verify the identity of the kafka brokers is also created with the same name as the Kafka resource.

4.3.2. Securing user access to Kafka

Use the properties of the KafkaUser resource to configure a Kafka user.

You can use oc apply to create or modify users, and oc delete to delete existing users.

For example:

  • oc apply -f USER-CONFIG-FILE
  • oc delete KafkaUser USER-NAME

When you configure the KafkaUser authentication and authorization mechanisms, ensure they match the equivalent Kafka configuration:

  • KafkaUser.spec.authentication matches Kafka.spec.kafka.listeners[*].authentication
  • KafkaUser.spec.authorization matches Kafka.spec.kafka.authorization

This procedure shows how a user is created with TLS authentication. You can also create a user with SCRAM-SHA authentication.

The authentication required depends on the type of authentication configured for the Kafka broker listener.

Note

Authentication between Kafka users and Kafka brokers depends on the authentication settings for each. For example, it is not possible to authenticate a user with TLS if it is not also enabled in the Kafka configuration.

Prerequisites

The authentication type in KafkaUser should match the authentication configured in Kafka brokers.

Procedure

  1. Configure the KafkaUser resource.

    For example:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      authentication: 1
        type: tls
      authorization:
        type: simple 2
        acls:
          - resource:
              type: topic
              name: my-topic
              patternType: literal
            operation: Read
          - resource:
              type: topic
              name: my-topic
              patternType: literal
            operation: Describe
          - resource:
              type: group
              name: my-group
              patternType: literal
            operation: Read
    1
    User authentication mechanism, defined as mutual tls or scram-sha-512.
    2
    Simple authorization, which requires an accompanying list of ACL rules.
  2. Create or update the KafkaUser resource.

    oc apply -f USER-CONFIG-FILE

    The user is created, as well as a Secret with the same name as the KafkaUser resource. The Secret contains a private and public key for TLS client authentication.

For information on configuring a Kafka client with properties for secure connection to Kafka brokers, see Setting up access for clients outside of OpenShift in the Deploying and Upgrading AMQ Streams on OpenShift guide.

4.3.3. Restricting access to Kafka listeners using network policies

You can restrict access to a listener to only selected applications by using the networkPolicyPeers property.

Prerequisites

  • An OpenShift cluster with support for Ingress NetworkPolicies.
  • The Cluster Operator is running.

Procedure

  1. Open the Kafka resource.
  2. In the networkPolicyPeers property, define the application pods or namespaces that will be allowed to access the Kafka cluster.

    For example, to configure a tls listener to allow connections only from application pods with the label app set to kafka-client:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
            networkPolicyPeers:
              - podSelector:
                  matchLabels:
                    app: kafka-client
        # ...
      zookeeper:
        # ...
  3. Create or update the resource.

    Use oc apply:

    oc apply -f your-file

Additional resources

4.4. Using OAuth 2.0 token-based authentication

AMQ Streams supports the use of OAuth 2.0 authentication using the OAUTHBEARER and PLAIN mechanisms.

OAuth 2.0 enables standardized token-based authentication and authorization between applications, using a central authorization server to issue tokens that grant limited access to resources.

Kafka brokers and clients both need to be configured to use OAuth 2.0. You can configure OAuth 2.0 authentication, then OAuth 2.0 authorization.

Note

OAuth 2.0 authentication can be used in conjunction with Kafka authorization.

Using OAuth 2.0 authentication, application clients can access resources on application servers (called resource servers) without exposing account credentials.

The application client passes an access token as a means of authenticating, which application servers can also use to determine the level of access to grant. The authorization server handles the granting of access and inquiries about access.

In the context of AMQ Streams:

  • Kafka brokers act as OAuth 2.0 resource servers
  • Kafka clients act as OAuth 2.0 application clients

Kafka clients authenticate to Kafka brokers. The brokers and clients communicate with the OAuth 2.0 authorization server, as necessary, to obtain or validate access tokens.

For a deployment of AMQ Streams, OAuth 2.0 integration provides:

  • Server-side OAuth 2.0 support for Kafka brokers
  • Client-side OAuth 2.0 support for Kafka MirrorMaker, Kafka Connect and the Kafka Bridge

Additional resources

4.4.1. OAuth 2.0 authentication mechanisms

AMQ Streams supports the OAUTHBEARER and PLAIN mechanisms for OAuth 2.0 authentication. Both mechanisms allow Kafka clients to establish authenticated sessions with Kafka brokers. The authentication flow between clients, the authorization server, and Kafka brokers is different for each mechanism.

We recommend that you configure clients to use OAUTHBEARER whenever possible. OAUTHBEARER provides a higher level of security than PLAIN because client credentials are never shared with Kafka brokers. Consider using PLAIN only with Kafka clients that do not support OAUTHBEARER.

If necessary, OAUTHBEARER and PLAIN can be enabled together, on the same oauth listener.

OAUTHBEARER overview

Kafka supports the OAUTHBEARER authentication mechanism, however it must be explicitly configured. Also, many Kafka client tools use libraries that provide basic support for OAUTHBEARER at the protocol level.

To ease application development, AMQ Streams provides an OAuth callback handler for the upstream Kafka Client Java libraries (but not for other libraries). Therefore, you do not need to write your own callback handlers for such clients. An application client can use the callback handler to provide the access token. Clients written in other languages, such as Go, must use custom code to connect to the authorization server and obtain the access token.

With OAUTHBEARER, the client initiates a session with the Kafka broker for credentials exchange, where credentials take the form of a bearer token provided by the callback handler. Using the callbacks, you can configure token provision in one of three ways:

  • Client ID and Secret (by using the OAuth 2.0 client credentials mechanism)
  • A long-lived access token, obtained manually at configuration time
  • A long-lived refresh token, obtained manually at configuration time

OAUTHBEARER is automatically enabled in the oauth listener configuration for the Kafka broker. You can set the enableOauthBearer property to true, though this is not required.

  # ...
  authentication:
    type: oauth
    # ...
    enableOauthBearer: true
Note

OAUTHBEARER authentication can only be used by Kafka clients that support the OAUTHBEARER mechanism at the protocol level.

PLAIN overview

PLAIN is a simple authentication mechanism used by all Kafka client tools (including developer tools such as kafkacat). To enable PLAIN to be used together with OAuth 2.0 authentication, AMQ Streams includes server-side callbacks and calls this OAuth 2.0 over PLAIN.

With the AMQ Streams implementation of PLAIN, the client credentials are not stored in ZooKeeper. Instead, client credentials are handled centrally behind a compliant authorization server, similar to when OAUTHBEARER authentication is used.

When used with the OAuth 2.0 over PLAIN callbacks, Kafka clients authenticate with Kafka brokers using either of the following methods:

  • Client ID and secret (by using the OAuth 2.0 client credentials mechanism)
  • A long-lived access token, obtained manually at configuration time

The client must be enabled to use PLAIN authentication, and provide a username and password. If the password is prefixed with $accessToken: followed by the value of the access token, the Kafka broker will interpret the password as the access token. Otherwise, the Kafka broker will interpret the username as the client ID and the password as the client secret.

If the password is set as the access token, the username must be set to the same principal name that the Kafka broker obtains from the access token. The process depends on how you configure username extraction using userNameClaim, fallbackUserNameClaim, fallbackUsernamePrefix, or userInfoEndpointUri. It also depends on your authorization server; in particular, how it maps client IDs to account names.

To use PLAIN, you must enable it in the oauth listener configuration for the Kafka broker.

In the following example, PLAIN is enabled in addition to OAUTHBEARER, which is enabled by default. If you want to use PLAIN only, you can disable OAUTHBEARER by setting enableOauthBearer to false.

  # ...
  authentication:
    type: oauth
    # ...
    enablePlain: true
    tokenEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/token

4.4.2. OAuth 2.0 Kafka broker configuration

Kafka broker configuration for OAuth 2.0 involves:

  • Creating the OAuth 2.0 client in the authorization server
  • Configuring OAuth 2.0 authentication in the Kafka custom resource
Note

In relation to the authorization server, Kafka brokers and Kafka clients are both regarded as OAuth 2.0 clients.

4.4.2.1. OAuth 2.0 client configuration on an authorization server

To configure a Kafka broker to validate the token received during session initiation, the recommended approach is to create an OAuth 2.0 client definition in an authorization server, configured as confidential, with the following client credentials enabled:

  • Client ID of kafka (for example)
  • Client ID and Secret as the authentication mechanism
Note

You only need to use a client ID and secret when using a non-public introspection endpoint of the authorization server. The credentials are not typically required when using public authorization server endpoints, as with fast local JWT token validation.

4.4.2.2. OAuth 2.0 authentication configuration in the Kafka cluster

To use OAuth 2.0 authentication in the Kafka cluster, you specify, for example, a TLS listener configuration for your Kafka cluster custom resource with the authentication method oauth:

Assigining the authentication method type for OAuth 2.0

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    # ...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
      #...

You can configure plain, tls and external listeners, but it is recommended not to use plain listeners or external listeners with disabled TLS encryption with OAuth 2.0 as this creates a vulnerability to network eavesdropping and unauthorized access through token theft.

You configure an external listener with type: oauth for a secure transport layer to communicate with the client.

Using OAuth 2.0 with an external listener

# ...
listeners:
  - name: external
    port: 9094
    type: loadbalancer
    tls: true
    authentication:
      type: oauth
    #...

The tls property is false by default, so it must be enabled.

When you have defined the type of authentication as OAuth 2.0, you add configuration based on the type of validation, either as fast local JWT validation or token validation using an introspection endpoint.

The procedure to configure OAuth 2.0 for listeners, with descriptions and examples, is described in Configuring OAuth 2.0 support for Kafka brokers.

4.4.2.3. Fast local JWT token validation configuration

Fast local JWT token validation checks a JWT token signature locally.

The local check ensures that a token:

  • Conforms to type by containing a (typ) claim value of Bearer for an access token
  • Is valid (not expired)
  • Has an issuer that matches a validIssuerURI

You specify a validIssuerURI attribute when you configure the listener, so that any tokens not issued by the authorization server are rejected.

The authorization server does not need to be contacted during fast local JWT token validation. You activate fast local JWT token validation by specifying a jwksEndpointUri attribute, the endpoint exposed by the OAuth 2.0 authorization server. The endpoint contains the public keys used to validate signed JWT tokens, which are sent as credentials by Kafka clients.

Note

All communication with the authorization server should be performed using TLS encryption.

You can configure a certificate truststore as an OpenShift Secret in your AMQ Streams project namespace, and use a tlsTrustedCertificates attribute to point to the OpenShift Secret containing the truststore file.

You might want to configure a userNameClaim to properly extract a username from the JWT token. If you want to use Kafka ACL authorization, you need to identify the user by their username during authentication. (The sub claim in JWT tokens is typically a unique ID, not a username.)

Example configuration for fast local JWT token validation

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    #...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
          validIssuerUri: <https://<auth-server-address>/auth/realms/tls>
          jwksEndpointUri: <https://<auth-server-address>/auth/realms/tls/protocol/openid-connect/certs>
          userNameClaim: preferred_username
          maxSecondsWithoutReauthentication: 3600
          tlsTrustedCertificates:
          - secretName: oauth-server-cert
            certificate: ca.crt
    #...

4.4.2.4. OAuth 2.0 introspection endpoint configuration

Token validation using an OAuth 2.0 introspection endpoint treats a received access token as opaque. The Kafka broker sends an access token to the introspection endpoint, which responds with the token information necessary for validation. Importantly, it returns up-to-date information if the specific access token is valid, and also information about when the token expires.

To configure OAuth 2.0 introspection-based validation, you specify an introspectionEndpointUri attribute rather than the jwksEndpointUri attribute specified for fast local JWT token validation. Depending on the authorization server, you typically have to specify a clientId and clientSecret, because the introspection endpoint is usually protected.

Example configuration for an introspection endpoint

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
          clientId: kafka-broker
          clientSecret:
            secretName: my-cluster-oauth
            key: clientSecret
          validIssuerUri: <https://<auth-server-address>/auth/realms/tls>
          introspectionEndpointUri: <https://<auth-server-address>/auth/realms/tls/protocol/openid-connect/token/introspect>
          userNameClaim: preferred_username
          maxSecondsWithoutReauthentication: 3600
          tlsTrustedCertificates:
          - secretName: oauth-server-cert
            certificate: ca.crt

4.4.3. Session re-authentication for Kafka brokers

You can configure oauth listeners to use Kafka session re-authentication for OAuth 2.0 sessions between Kafka clients and Kafka brokers. This mechanism enforces the expiry of an authenticated session between the client and the broker after a defined period of time. When a session expires, the client immediately starts a new session by reusing the existing connection rather than dropping it.

Session re-authentication is disabled by default. To enable it, you set a time value for maxSecondsWithoutReauthentication in the oauth listener configuration. The same property is used to configure session re-authentication for OAUTHBEARER and PLAIN authentication. For an example configuration, see Section 4.4.6.2, “Configuring OAuth 2.0 support for Kafka brokers”.

Session re-authentication must be supported by the Kafka client libraries used by the client.

Session re-authentication can be used with fast local JWT or introspection endpoint token validation.

Client re-authentication

When the broker’s authenticated session expires, the client must re-authenticate to the existing session by sending a new, valid access token to the broker, without dropping the connection.

If token validation is successful, a new client session is started using the existing connection. If the client fails to re-authenticate, the broker will close the connection if further attempts are made to send or receive messages. Java clients that use Kafka client library 2.2 or later automatically re-authenticate if the re-authentication mechanism is enabled on the broker.

Session re-authentication also applies to refresh tokens, if used. When the session expires, the client refreshes the access token by using its refresh token. The client then uses the new access token to re-authenticate to the existing session.

Session expiry for OAUTHBEARER and PLAIN

When session re-authentication is configured, session expiry works differently for OAUTHBEARER and PLAIN authentication.

For OAUTHBEARER and PLAIN, using the client ID and secret method:

  • The broker’s authenticated session will expire at the configured maxSecondsWithoutReauthentication.
  • The session will expire earlier if the access token expires before the configured time.

For PLAIN using the long-lived access token method:

  • The broker’s authenticated session will expire at the configured maxSecondsWithoutReauthentication.
  • Re-authentication will fail if the access token expires before the configured time. Although session re-authentication is attempted, PLAIN has no mechanism for refreshing tokens.

If maxSecondsWithoutReauthentication is not configured, OAUTHBEARER and PLAIN clients can remain connected to brokers indefinitely, without needing to re-authenticate. Authenticated sessions do not end with access token expiry. However, this can be considered when configuring authorization, for example, by using keycloak authorization or installing a custom authorizer.

4.4.4. OAuth 2.0 Kafka client configuration

A Kafka client is configured with either:

  • The credentials required to obtain a valid access token from an authorization server (client ID and Secret)
  • A valid long-lived access token or refresh token, obtained using tools provided by an authorization server

The only information ever sent to the Kafka broker is an access token. The credentials used to authenticate with the authorization server to obtain the access token are never sent to the broker.

When a client obtains an access token, no further communication with the authorization server is needed.

The simplest mechanism is authentication with a client ID and Secret. Using a long-lived access token, or a long-lived refresh token, adds more complexity because there is an additional dependency on authorization server tools.

Note

If you are using long-lived access tokens, you may need to configure the client in the authorization server to increase the maximum lifetime of the token.

If the Kafka client is not configured with an access token directly, the client exchanges credentials for an access token during Kafka session initiation by contacting the authorization server. The Kafka client exchanges either:

  • Client ID and Secret
  • Client ID, refresh token, and (optionally) a Secret

4.4.5. OAuth 2.0 client authentication flow

In this section, we explain and visualize the communication flow between Kafka client, Kafka broker, and authorization server during Kafka session initiation. The flow depends on the client and server configuration.

When a Kafka client sends an access token as credentials to a Kafka broker, the token needs to be validated.

Depending on the authorization server used, and the configuration options available, you may prefer to use:

  • Fast local token validation based on JWT signature checking and local token introspection, without contacting the authorization server
  • An OAuth 2.0 introspection endpoint provided by the authorization server

Using fast local token validation requires the authorization server to provide a JWKS endpoint with public certificates that are used to validate signatures on the tokens.

Another option is to use an OAuth 2.0 introspection endpoint on the authorization server. Each time a new Kafka broker connection is established, the broker passes the access token received from the client to the authorization server, and checks the response to confirm whether or not the token is valid.

Kafka client credentials can also be configured for:

  • Direct local access using a previously generated long-lived access token
  • Contact with the authorization server for a new access token to be issued
Note

An authorization server might only allow the use of opaque access tokens, which means that local token validation is not possible.

4.4.5.1. Example client authentication flows

Here you can see the communication flows, for different configurations of Kafka clients and brokers, during Kafka session authentication.

Client using client ID and secret, with broker delegating validation to authorization server

Client using client ID and secret with broker delegating validation to authorization server

  1. Kafka client requests access token from authorization server, using client ID and secret, and optionally a refresh token.
  2. Authorization server generates a new access token.
  3. Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token.
  4. Kafka broker validates the access token by calling a token introspection endpoint on authorization server, using its own client ID and secret.
  5. Kafka client session is established if the token is valid.

Client using client ID and secret, with broker performing fast local token validation

Client using client ID and secret with broker performing fast local token validation

  1. Kafka client authenticates with authorization server from the token endpoint, using a client ID and secret, and optionally a refresh token.
  2. Authorization server generates a new access token.
  3. Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token.
  4. Kafka broker validates the access token locally using a JWT token signature check, and local token introspection.

Client using long-lived access token, with broker delegating validation to authorization server

Client using long-lived access token with broker delegating validation to authorization server

  1. Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token.
  2. Kafka broker validates the access token by calling a token introspection endpoint on authorization server, using its own client ID and secret.
  3. Kafka client session is established if the token is valid.

Client using long-lived access token, with broker performing fast local validation

Client using long-lived access token with broker performing fast local validation

  1. Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token.
  2. Kafka broker validates the access token locally using JWT token signature check, and local token introspection.
Warning

Fast local JWT token signature validation is suitable only for short-lived tokens as there is no check with the authorization server if a token has been revoked. Token expiration is written into the token, but revocation can happen at any time, so cannot be accounted for without contacting the authorization server. Any issued token would be considered valid until it expires.

4.4.6. Configuring OAuth 2.0 authentication

OAuth 2.0 is used for interaction between Kafka clients and AMQ Streams components.

In order to use OAuth 2.0 for AMQ Streams, you must:

4.4.6.1. Configuring Red Hat Single Sign-On as an OAuth 2.0 authorization server

This procedure describes how to deploy Red Hat Single Sign-On as an authorization server and configure it for integration with AMQ Streams.

The authorization server provides a central point for authentication and authorization, and management of users, clients, and permissions. Red Hat Single Sign-On has a concept of realms where a realm represents a separate set of users, clients, permissions, and other configuration. You can use a default master realm, or create a new one. Each realm exposes its own OAuth 2.0 endpoints, which means that application clients and application servers all need to use the same realm.

To use OAuth 2.0 with AMQ Streams, you use a deployment of Red Hat Single Sign-On to create and manage authentication realms.

Note

If you already have Red Hat Single Sign-On deployed, you can skip the deployment step and use your current deployment.

Before you begin

You will need to be familiar with using Red Hat Single Sign-On.

For deployment and administration instructions, see:

Prerequisites

  • AMQ Streams and Kafka is running

For the Red Hat Single Sign-On deployment:

Procedure

  1. Deploy Red Hat Single Sign-On to your OpenShift cluster.

    Check the progress of the deployment in your OpenShift web console.

  2. Log in to the Red Hat Single Sign-On Admin Console to create the OAuth 2.0 policies for AMQ Streams.

    Login details are provided when you deploy Red Hat Single Sign-On.

  3. Create and enable a realm.

    You can use an existing master realm.

  4. Adjust the session and token timeouts for the realm, if required.
  5. Create a client called kafka-broker.
  6. From the Settings tab, set:

    • Access Type to Confidential
    • Standard Flow Enabled to OFF to disable web login for this client
    • Service Accounts Enabled to ON to allow this client to authenticate in its own name
  7. Click Save before continuing.
  8. From the Credentials tab, take a note of the secret for using in your AMQ Streams Kafka cluster configuration.
  9. Repeat the client creation steps for any application client that will connect to your Kafka brokers.

    Create a definition for each new client.

    You will use the names as client IDs in your configuration.

What to do next

After deploying and configuring the authorization server, configure the Kafka brokers to use OAuth 2.0.

4.4.6.2. Configuring OAuth 2.0 support for Kafka brokers

This procedure describes how to configure Kafka brokers so that the broker listeners are enabled to use OAuth 2.0 authentication using an authorization server.

We advise use of OAuth 2.0 over an encrypted interface through configuration of TLS listeners. Plain listeners are not recommended.

If the authorization server is using certificates signed by the trusted CA and matching the OAuth 2.0 server hostname, TLS connection works using the default settings. Otherwise, you may need to configure the truststore with prober certificates or disable the certificate hostname validation.

When configuring the Kafka broker you have two options for the mechanism used to validate the access token during OAuth 2.0 authentication of the newly connected Kafka client:

Before you start

For more information on the configuration of OAuth 2.0 authentication for Kafka broker listeners, see:

Prerequisites

  • AMQ Streams and Kafka are running
  • An OAuth 2.0 authorization server is deployed

Procedure

  1. Update the Kafka broker configuration (Kafka.spec.kafka) of your Kafka resource in an editor.

    oc edit kafka my-cluster
  2. Configure the Kafka broker listeners configuration.

    The configuration for each type of listener does not have to be the same, as they are independent.

    The examples here show the configuration options as configured for external listeners.

    Example 1: Configuring fast local JWT token validation

    #...
    - name: external
      port: 9094
      type: loadbalancer
      tls: true
      authentication:
        type: oauth 1
        validIssuerUri: <https://<auth-server-address>/auth/realms/external> 2
        jwksEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/certs> 3
        userNameClaim: preferred_username 4
        maxSecondsWithoutReauthentication: 3600 5
        tlsTrustedCertificates: 6
        - secretName: oauth-server-cert
          certificate: ca.crt
        disableTlsHostnameVerification: true 7
        jwksExpirySeconds: 360 8
        jwksRefreshSeconds: 300 9
        jwksMinRefreshPauseSeconds: 1 10
        enableECDSA: "true" 11

    1
    Listener type set to oauth.
    2
    URI of the token issuer used for authentication.
    3
    URI of the JWKS certificate endpoint used for local JWT validation.
    4
    The token claim (or key) that contains the actual user name in the token. The user name is the principal used to identify the user. The userNameClaim value will depend on the authentication flow and the authorization server used.
    5
    (Optional) Activates the Kafka re-authentication mechanism that enforces session expiry to the same length of time as the access token. If the specified value is less than the time left for the access token to expire, then the client will have to re-authenticate before the actual token expiry. By default, the session does not expire when the access token expires, and the client does not attempt re-authentication.
    6
    (Optional) Trusted certificates for TLS connection to the authorization server.
    7
    (Optional) Disable TLS hostname verification. Default is false.
    8
    The duration the JWKS certificates are considered valid before they expire. Default is 360 seconds. If you specify a longer time, consider the risk of allowing access to revoked certificates.
    9
    The period between refreshes of JWKS certificates. The interval must be at least 60 seconds shorter than the expiry interval. Default is 300 seconds.
    10
    The minimum pause in seconds between consecutive attempts to refresh JWKS public keys. When an unknown signing key is encountered, the JWKS keys refresh is scheduled outside the regular periodic schedule with at least the specified pause since the last refresh attempt. The refreshing of keys follows the rule of exponential backoff, retrying on unsuccessful refreshes with ever increasing pause, until it reaches jwksRefreshSeconds. The default value is 1.
    11
    (Optional) If ECDSA is used for signing JWT tokens on authorization server, then this needs to be enabled. It installs additional crypto providers using BouncyCastle crypto library. Default is false.

    Example 2: Configuring token validation using an introspection endpoint

    - name: external
      port: 9094
      type: loadbalancer
      tls: true
      authentication:
        type: oauth
        validIssuerUri: <https://<auth-server-address>/auth/realms/external>
        introspectionEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token/introspect> 1
        clientId: kafka-broker 2
        clientSecret: 3
          secretName: my-cluster-oauth
          key: clientSecret
        userNameClaim: preferred_username 4
        maxSecondsWithoutReauthentication: 3600 5

    1
    URI of the token introspection endpoint.
    2
    Client ID to identify the client.
    3
    Client Secret and client ID is used for authentication.
    4
    The token claim (or key) that contains the actual user name in the token. The user name is the principal used to identify the user. The userNameClaim value will depend on the authorization server used.
    5
    (Optional) Activates the Kafka re-authentication mechanism that enforces session expiry to the same length of time as the access token. If the specified value is less than the time left for the access token to expire, then the client will have to re-authenticate before the actual token expiry. By default, the session does not expire when the access token expires, and the client does not attempt re-authentication.

    Depending on how you apply OAuth 2.0 authentication, and the type of authorization server, there are additional (optional) configuration settings you can use:

      # ...
      authentication:
        type: oauth
        # ...
        checkIssuer: false 1
        checkAudience: true 2
        fallbackUserNameClaim: client_id 3
        fallbackUserNamePrefix: client-account- 4
        validTokenType: bearer 5
        userInfoEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/userinfo 6
        enableOauthBearer: false 7
        enablePlain: true 8
        tokenEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/token 9
        customClaimCheck: "@.custom == 'custom-value'" 10
    1
    If your authorization server does not provide an iss claim, it is not possible to perform an issuer check. In this situation, set checkIssuer to false and do not specify a validIssuerUri. Default is true.
    2
    If your authorization server provides an aud (audience) claim, and you want to enforce an audience check, set checkAudience to true. Audience checks identify the intended recipients of tokens. As a result, the Kafka broker will reject tokens that do not have its clientId in their aud claim. Default is false.
    3
    An authorization server may not provide a single attribute to identify both regular users and clients. When a client authenticates in its own name, the server might provide a client ID. When a user authenticates using a username and password, to obtain a refresh token or an access token, the server might provide a username attribute in addition to a client ID. Use this fallback option to specify the username claim (attribute) to use if a primary user ID attribute is not available.
    4
    In situations where fallbackUserNameClaim is applicable, it may also be necessary to prevent name collisions between the values of the username claim, and those of the fallback username claim. Consider a situation where a client called producer exists, but also a regular user called producer exists. In order to differentiate between the two, you can use this property to add a prefix to the user ID of the client.
    5
    (Only applicable when using introspectionEndpointUri) Depending on the authorization server you are using, the introspection endpoint may or may not return the token type attribute, or it may contain different values. You can specify a valid token type value that the response from the introspection endpoint has to contain.
    6
    (Only applicable when using introspectionEndpointUri) The authorization server may be configured or implemented in such a way to not provide any identifiable information in an Introspection Endpoint response. In order to obtain the user ID, you can configure the URI of the userinfo endpoint as a fallback. The userNameClaim, fallbackUserNameClaim, and fallbackUserNamePrefix settings are applied to the response of userinfo endpoint.
    7
    Set this to false`to disable the OAUTHBEARER mechanism on the listener. At least one of PLAIN or OAUTHBEARER has to be enabled. Default is `true.
    8
    Set this to true to enable the PLAIN mechanism on the listener, which is supported by all clients on all platforms. The Kafka client has to enable the PLAIN mechanism and set the username and the password. This mechanism can be used to authenticate either by using the OAuth access token, or by using the OAuth client id and secret (client credentials). If the client sets password to start with the string $accessToken:, the password is interpreted as the access token on the server, and username as the account username, otherwise the user is interpreted as the client id, and password as the client secret. Default is false.
    9
    This has to be set to support the client credentials authentication when enablePlain is set to true, as described in previous point.
    10
    Additional custom rules can be imposed on the JWT access token during validation by setting this to a JsonPath filter query. If the access token does not contain the necessary data, it is rejected. When using the introspectionEndpointUri, the custom check is applied to the introspection endpoint response JSON.
  3. Save and exit the editor, then wait for rolling updates to complete.
  4. Check the update in the logs or by watching the pod state transitions:

    oc logs -f ${POD_NAME} -c ${CONTAINER_NAME}
    oc get pod -w

    The rolling update configures the brokers to use OAuth 2.0 authentication.

4.4.6.3. Configuring Kafka Java clients to use OAuth 2.0

This procedure describes how to configure Kafka producer and consumer APIs to use OAuth 2.0 for interaction with Kafka brokers.

Add a client callback plugin to your pom.xml file, and configure the system properties.

Prerequisites

  • AMQ Streams and Kafka are running
  • An OAuth 2.0 authorization server is deployed and configured for OAuth access to Kafka brokers
  • Kafka brokers are configured for OAuth 2.0

Procedure

  1. Add the client library with OAuth 2.0 support to the pom.xml file for the Kafka client:

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.7.1.redhat-00003</version>
    </dependency>
  2. Configure the system properties for the callback:

    For example:

    System.setProperty(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, “https://<auth-server-address>/auth/realms/master/protocol/openid-connect/token”); 1
    System.setProperty(ClientConfig.OAUTH_CLIENT_ID, "<client-name>"); 2
    System.setProperty(ClientConfig.OAUTH_CLIENT_SECRET, "<client-secret>"); 3
    1
    URI of the authorization server token endpoint.
    2
    Client ID, which is the name used when creating the client in the authorization server.
    3
    Client secret created when creating the client in the authorization server.
  3. Enable the SASL OAUTHBEARER mechanism on a TLS encrypted connection in the Kafka client configuration:

    For example:

    props.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
    props.put("security.protocol", "SASL_SSL"); 1
    props.put("sasl.mechanism", "OAUTHBEARER");
    props.put("sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
    1
    Here we use SASL_SSL for use over TLS connections. Use SASL_PLAINTEXT over unencrypted connections.
  4. Verify that the Kafka client can access the Kafka brokers.

4.4.6.4. Configuring OAuth 2.0 for Kafka components

This procedure describes how to configure Kafka components to use OAuth 2.0 authentication using an authorization server.

You can configure authentication for:

  • Kafka Connect
  • Kafka MirrorMaker
  • Kafka Bridge

In this scenario, the Kafka component and the authorization server are running in the same cluster.

Before you start

For more information on the configuration of OAuth 2.0 authentication for Kafka components, see:

Prerequisites

  • AMQ Streams and Kafka are running
  • An OAuth 2.0 authorization server is deployed and configured for OAuth access to Kafka brokers
  • Kafka brokers are configured for OAuth 2.0

Procedure

  1. Create a client secret and mount it to the component as an environment variable.

    For example, here we are creating a client Secret for the Kafka Bridge:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Secret
    metadata:
     name: my-bridge-oauth
    type: Opaque
    data:
     clientSecret: MGQ1OTRmMzYtZTllZS00MDY2LWI5OGEtMTM5MzM2NjdlZjQw 1
    1
    The clientSecret key must be in base64 format.
  2. Create or edit the resource for the Kafka component so that OAuth 2.0 authentication is configured for the authentication property.

    For OAuth 2.0 authentication, you can use:

    • Client ID and secret
    • Client ID and refresh token
    • Access token
    • TLS

    KafkaClientAuthenticationOAuth schema reference provides examples of each.

    For example, here OAuth 2.0 is assigned to the Kafka Bridge client using a client ID and secret, and TLS:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      # ...
      authentication:
        type: oauth 1
        tokenEndpointUri: https://<auth-server-address>/auth/realms/master/protocol/openid-connect/token 2
        clientId: kafka-bridge
        clientSecret:
          secretName: my-bridge-oauth
          key: clientSecret
        tlsTrustedCertificates: 3
        - secretName: oauth-server-cert
          certificate: tls.crt
    1
    Authentication type set to oauth.
    2
    URI of the token endpoint for authentication.
    3
    Trusted certificates for TLS connection to the authorization server.

    Depending on how you apply OAuth 2.0 authentication, and the type of authorization server, there are additional configuration options you can use:

    # ...
    spec:
      # ...
      authentication:
        # ...
        disableTlsHostnameVerification: true 1
        checkAccessTokenType: false 2
        accessTokenIsJwt: false 3
        scope: any 4
    1
    (Optional) Disable TLS hostname verification. Default is false.
    2
    If the authorization server does not return a typ (type) claim inside the JWT token, you can apply checkAccessTokenType: false to skip the token type check. Default is true.
    3
    If you are using opaque tokens, you can apply accessTokenIsJwt: false so that access tokens are not treated as JWT tokens.
    4
    (Optional) The scope for requesting the token from the token endpoint. An authorization server may require a client to specify the scope. In this case it is any.
  3. Apply the changes to the deployment of your Kafka resource.

    oc apply -f your-file
  4. Check the update in the logs or by watching the pod state transitions:

    oc logs -f ${POD_NAME} -c ${CONTAINER_NAME}
    oc get pod -w

    The rolling updates configure the component for interaction with Kafka brokers using OAuth 2.0 authentication.

4.5. Using OAuth 2.0 token-based authorization

If you are using OAuth 2.0 with Red Hat Single Sign-On for token-based authentication, you can also use Red Hat Single Sign-On to configure authorization rules to constrain client access to Kafka brokers. Authentication establishes the identity of a user. Authorization decides the level of access for that user.

AMQ Streams supports the use of OAuth 2.0 token-based authorization through Red Hat Single Sign-On Authorization Services, which allows you to manage security policies and permissions centrally.

Security policies and permissions defined in Red Hat Single Sign-On are used to grant access to resources on Kafka brokers. Users and clients are matched against policies that permit access to perform specific actions on Kafka brokers.

Kafka allows all users full access to brokers by default, and also provides the AclAuthorizer plugin to configure authorization based on Access Control Lists (ACLs).

ZooKeeper stores ACL rules that grant or deny access to resources based on username. However, OAuth 2.0 token-based authorization with Red Hat Single Sign-On offers far greater flexibility on how you wish to implement access control to Kafka brokers. In addition, you can configure your Kafka brokers to use OAuth 2.0 authorization and ACLs.

4.5.1. OAuth 2.0 authorization mechanism

OAuth 2.0 authorization in AMQ Streams uses Red Hat Single Sign-On server Authorization Services REST endpoints to extend token-based authentication with Red Hat Single Sign-On by applying defined security policies on a particular user, and providing a list of permissions granted on different resources for that user. Policies use roles and groups to match permissions to users. OAuth 2.0 authorization enforces permissions locally based on the received list of grants for the user from Red Hat Single Sign-On Authorization Services.

4.5.1.1. Kafka broker custom authorizer

A Red Hat Single Sign-On authorizer (KeycloakRBACAuthorizer) is provided with AMQ Streams. To be able to use the Red Hat Single Sign-On REST endpoints for Authorization Services provided by Red Hat Single Sign-On, you configure a custom authorizer on the Kafka broker.

The authorizer fetches a list of granted permissions from the authorization server as needed, and enforces authorization locally on the Kafka Broker, making rapid authorization decisions for each client request.

4.5.2. Configuring OAuth 2.0 authorization support

This procedure describes how to configure Kafka brokers to use OAuth 2.0 authorization using Red Hat Single Sign-On Authorization Services.

Before you begin

Consider the access you require or want to limit for certain users. You can use a combination of Red Hat Single Sign-On groups, roles, clients, and users to configure access in Red Hat Single Sign-On.

Typically, groups are used to match users based on organizational departments or geographical locations. And roles are used to match users based on their function.

With Red Hat Single Sign-On, you can store users and groups in LDAP, whereas clients and roles cannot be stored this way. Storage and access to user data may be a factor in how you choose to configure authorization policies.

Note

Super users always have unconstrained access to a Kafka broker regardless of the authorization implemented on the Kafka broker.

Prerequisites

  • AMQ Streams must be configured to use OAuth 2.0 with Red Hat Single Sign-On for token-based authentication. You use the same Red Hat Single Sign-On server endpoint when you set up authorization.
  • OAuth 2.0 authentication must be configured with the maxSecondsWithoutReauthentication option to enable re-authentication.

Procedure

  1. Access the Red Hat Single Sign-On Admin Console or use the Red Hat Single Sign-On Admin CLI to enable Authorization Services for the Kafka broker client you created when setting up OAuth 2.0 authentication.
  2. Use Authorization Services to define resources, authorization scopes, policies, and permissions for the client.
  3. Bind the permissions to users and clients by assigning them roles and groups.
  4. Configure the Kafka brokers to use Red Hat Single Sign-On authorization by updating the Kafka broker configuration (Kafka.spec.kafka) of your Kafka resource in an editor.

    oc edit kafka my-cluster
  5. Configure the Kafka broker kafka configuration to use keycloak authorization, and to be able to access the authorization server and Authorization Services.

    For example:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        authorization:
          type: keycloak 1
          tokenEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token> 2
          clientId: kafka 3
          delegateToKafkaAcls: false 4
          disableTlsHostnameVerification: false 5
          superUsers: 6
          - CN=fred
          - sam
          - CN=edward
          tlsTrustedCertificates: 7
          - secretName: oauth-server-cert
            certificate: ca.crt
          grantsRefreshPeriodSeconds: 60 8
          grantsRefreshPoolSize: 5 9
        #...
    1
    Type keycloak enables Red Hat Single Sign-On authorization.
    2
    URI of the Red Hat Single Sign-On token endpoint. For production, always use HTTPs.
    3
    The client ID of the OAuth 2.0 client definition in Red Hat Single Sign-On that has Authorization Services enabled. Typically, kafka is used as the ID.
    4
    (Optional) Delegate authorization to Kafka AclAuthorizer if access is denied by Red Hat Single Sign-On Authorization Services policies. Default is false.
    5
    (Optional) Disable TLS hostname verification. Default is false.
    6
    (Optional) Designated super users.
    7
    (Optional) Trusted certificates for TLS connection to the authorization server.
    8
    (Optional) The time between two consecutive grants refresh runs. That is the maximum time for active sessions to detect any permissions changes for the user on Red Hat Single Sign-On. The default value is 60.
    9
    (Optional) The number of threads to use to refresh (in parallel) the grants for the active sessions. The default value is 5.
  6. Save and exit the editor, then wait for rolling updates to complete.
  7. Check the update in the logs or by watching the pod state transitions:

    oc logs -f ${POD_NAME} -c kafka
    oc get pod -w

    The rolling update configures the brokers to use OAuth 2.0 authorization.

  8. Verify the configured permissions by accessing Kafka brokers as clients or users with specific roles, making sure they have the necessary access, or do not have the access they are not supposed to have.