Chapter 6. Setting up client access to the Kafka cluster

After you have deployed AMQ Streams, the procedures in this section explain how to:

  • Deploy example producer and consumer clients, which you can use to verify your deployment
  • Set up external client access to the Kafka cluster

    The steps to set up access to the Kafka cluster for a client outside OpenShift are more complex, and require familiarity with the Kafka component configuration procedures described in the Using AMQ Streams on OpenShift guide.

6.1. Deploying example clients

This procedure shows how to deploy example producer and consumer clients that use the Kafka cluster you created to send and receive messages.

Prerequisites

  • The Kafka cluster is available for the clients.

Procedure

  1. Deploy a Kafka producer.

    oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
  2. Type a message into the console where the producer is running.
  3. Press Enter to send the message.
  4. Deploy a Kafka consumer.

    oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.4 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
  5. Confirm that you see the incoming messages in the consumer console.

6.2. Setting up access for clients outside of OpenShift

This procedure shows how to configure client access to a Kafka cluster from outside OpenShift.

Using the address of the Kafka cluster, you can provide external access to a client on a different OpenShift namespace or outside OpenShift entirely.

You configure an external Kafka listener to provide the access.

The following external listener types are supported:

  • route to use OpenShift Route and the default HAProxy router
  • loadbalancer to use loadbalancer services
  • nodeport to use ports on OpenShift nodes
  • ingress to use OpenShift Ingress and the NGINX Ingress Controller for Kubernetes

The type chosen depends on your requirements, and your environment and infrastructure. For example, loadbalancers might not be suitable for certain infrastructure, such as bare metal, where node ports provide a better option.

In this procedure:

  1. An external listener is configured for the Kafka cluster, with TLS encryption and authentication, and Kafka simple authorization is enabled.
  2. A KafkaUser is created for the client, with TLS authentication and Access Control Lists (ACLs) defined for simple authorization.

You can configure your listener to use TLS, SCRAM-SHA-512 or OAuth 2.0 authentication. TLS always uses encryption, but it is recommended to also use encryption with SCRAM-SHA-512 and OAuth 2.0 authentication.

You can configure simple, OAuth 2.0, OPA or custom authorization for Kafka brokers. When enabled, authorization is applied to all enabled listeners.

When you configure the KafkaUser authentication and authorization mechanisms, ensure they match the equivalent Kafka configuration:

  • KafkaUser.spec.authentication matches Kafka.spec.kafka.listeners[*].authentication
  • KafkaUser.spec.authorization matches Kafka.spec.kafka.authorization

You should have at least one listener supporting the authentication you want to use for the KafkaUser.

Note

Authentication between Kafka users and Kafka brokers depends on the authentication settings for each. For example, it is not possible to authenticate a user with TLS if it is not also enabled in the Kafka configuration.

AMQ Streams operators automate the configuration process:

  • The Cluster Operator creates the listeners and sets up the cluster and client certificate authority (CA) certificates to enable authentication within the Kafka cluster.
  • The User Operator creates the user representing the client and the security credentials used for client authentication, based on the chosen authentication type.

In this procedure, the certificates generated by the Cluster Operator are used, but you can replace them by installing your own certificates. You can also configure your listener to use a Kafka listener certificate managed by an external Certificate Authority.

Certificates are available in PKCS #12 (.p12) and PEM (.crt) formats. This procedure shows PKCS #12 certificates.

Prerequisites

  • The Kafka cluster is available for the client
  • The Cluster Operator and User Operator are running in the cluster
  • A client outside the OpenShift cluster to connect to the Kafka cluster

Procedure

  1. Configure the Kafka cluster with an external Kafka listener.

    • Define the authentication required to access the Kafka broker through the listener
    • Enable authorization on the Kafka broker

      For example:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
        namespace: myproject
      spec:
        kafka:
          # ...
          listeners: 1
          - name: external 2
            port: 9094 3
            type: LISTENER-TYPE 4
            tls: true 5
            authentication:
              type: tls 6
            configuration:
              preferredNodePortAddressType: InternalDNS 7
              bootstrap and broker service overrides 8
              #...
          authorization: 9
            type: simple
            superUsers:
              - super-user-name 10
        # ...
      1
      Configuration options for enabling external listeners are described in the Generic Kafka listener schema reference.
      2
      Name to identify the listener. Must be unique within the Kafka cluster.
      3
      Port number used by the listener inside Kafka. The port number has to be unique within a given Kafka cluster. Allowed port numbers are 9092 and higher with the exception of ports 9404 and 9999, which are already used for Prometheus and JMX. Depending on the listener type, the port number might not be the same as the port number that connects Kafka clients.
      4
      External listener type specified as route, loadbalancer, nodeport or ingress. An internal listener is specified as internal.
      5
      Enables TLS encryption on the listener. Default is false. TLS encryption is not required for route listeners.
      6
      Authentication specified as tls.
      7
      (Optional, for nodeport listeners only) Configuration to specify a preference for the first address type used by AMQ Streams as the node address.
      8
      (Optional) AMQ Streams automatically determines the addresses to advertise to clients. The addresses are automatically assigned by OpenShift. You can override bootstrap and broker service addresses if the infrastructure on which you are running AMQ Streams does not provide the right address. Validation is not performed on the overrides. The override configuration differs according to the listener type. For example, you can override hosts for route, DNS names or IP addresses for loadbalancer, and node ports for nodeport.
      9
      Authorization specified as simple, which uses the AclAuthorizer Kafka plugin.
      10
      (Optional) Super users can access all brokers regardless of any access restrictions defined in ACLs.
      Warning

      An OpenShift Route address comprises the name of the Kafka cluster, the name of the listener, and the name of the namespace it is created in. For example, my-cluster-kafka-listener1-bootstrap-myproject (CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE). If you are using a route listener type, be careful that the whole length of the address does not exceed a maximum limit of 63 characters.

  2. Create or update the Kafka resource.

    oc apply -f KAFKA-CONFIG-FILE

    The Kafka cluster is configured with a Kafka broker listener using TLS authentication.

    A service is created for each Kafka broker pod.

    A service is created to serve as the bootstrap address for connection to the Kafka cluster.

    A service is also created as the external bootstrap address for external connection to the Kafka cluster using nodeport listeners.

    The cluster CA certificate to verify the identity of the kafka brokers is also created with the same name as the Kafka resource.

  3. Find the bootstrap address and port from the status of the Kafka resource.

    oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'

    Use the bootstrap address in your Kafka client to connect to the Kafka cluster.

  4. Create or modify a user representing the client that requires access to the Kafka cluster.

    • Specify the same authentication type as the Kafka listener.
    • Specify the authorization ACLs for simple authorization.

      For example:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaUser
      metadata:
        name: my-user
        labels:
          strimzi.io/cluster: my-cluster 1
      spec:
        authentication:
          type: tls 2
        authorization:
          type: simple
          acls: 3
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operation: Read
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operation: Describe
            - resource:
                type: group
                name: my-group
                patternType: literal
              operation: Read
      1
      The label must match the label of the Kafka cluster for the user to be created.
      2
      Authentication specified as tls.
      3
      Simple authorization requires an accompanying list of ACL rules to apply to the user. The rules define the operations allowed on Kafka resources based on the username (my-user).
  5. Create or modify the KafkaUser resource.

    oc apply -f USER-CONFIG-FILE

    The user is created, as well as a Secret with the same name as the KafkaUser resource. The Secret contains a private and public key for TLS client authentication.

    For example:

    apiVersion: v1
    kind: Secret
    metadata:
      name: my-user
      labels:
        strimzi.io/kind: KafkaUser
        strimzi.io/cluster: my-cluster
    type: Opaque
    data:
      ca.crt: PUBLIC-KEY-OF-THE-CLIENT-CA
      user.crt: USER-CERTIFICATE-CONTAINING-PUBLIC-KEY-OF-USER
      user.key: PRIVATE-KEY-OF-USER
      user.p12: P12-ARCHIVE-FILE-STORING-CERTIFICATES-AND-KEYS
      user.password: PASSWORD-PROTECTING-P12-ARCHIVE
  6. Extract the public cluster CA certificate to the desired certificate format:

    oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
  7. Extract the password from the password file:

    oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
  8. Configure your client with the authentication details for the public cluster certificates:

    Sample client code

    properties.put("security.protocol","SSL"); 1
    properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/path/to/ca.p12"); 2
    properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,CA-PASSWORD); 3
    properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"PKCS12"); 4

    1
    Enables TLS encryption (with or without TLS client authentication).
    2
    Specifies the truststore location where the certificates were imported.
    3
    Specifies the password for accessing the truststore. This property can be omitted if it is not needed by the truststore.
    4
    Identifies the truststore type.
    Note

    Use security.protocol: SASL_SSL when using SCRAM-SHA authentication over TLS.

  9. Extract the user CA certificate from the user Secret to the desired certificate format:

    oc get secret USER-NAME -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
  10. Extract the password from the password file:

    oc get secret USER-NAME -o jsonpath='{.data.user\.password}' | base64 -d > user.password
  11. Configure your client with the authentication details for the user CA certificate:

    Sample client code

    properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/path/to/user.p12"); 1
    properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"<user.password>"); 2
    properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"PKCS12"); 3

    1
    Specifies the keystore location where the certificates were imported.
    2
    Specifies the password for accessing the keystore. This property can be omitted if it is not needed by the keystore. The public user certificate is signed by the client CA when it is created.
    3
    Identifies the keystore type.
  12. Add the bootstrap address and port for connecting to the Kafka cluster:

    bootstrap.servers: BOOTSTRAP-ADDRESS:PORT

Additional resources