6.2. 为 OpenShift 外部的客户端设置访问权限

此流程演示了如何从 OpenShift 外部配置对 Kafka 集群的客户端访问。

使用 Kafka 集群的地址,您可以提供到不同 OpenShift 命名空间或整个 OpenShift 外部客户端的外部访问。

您可以配置外部 Kafka 侦听程序以提供访问权限。

支持以下外部监听程序类型:

选择的类型取决于您的要求,以及您的环境和基础架构。例如,负载均衡器可能不适用于某些基础架构,如裸机,其中节点端口提供了一个更好的选项。

在这个过程中:

  1. 为 Kafka 集群配置了一个外部监听程序,它带有 TLS 加密和身份验证,并启用了 Kafka 简单授权
  2. 为客户端创建一个 KafkaUser,并为 简单授权 定义了 TLS 身份验证和访问控制列表(ACL)。

您可以将侦听器配置为使用 TLS、SCRAM-SHA-512 或 OAuth 2.0 身份验证。TLS 始终使用加密,但建议您还将加密与 SCRAM-SHA-512 和 OAuth 2.0 身份验证一起使用。

您可以为 Kafka 代理配置简单、OAuth 2.0、OPA 或自定义授权。启用后,授权将应用到所有启用的监听程序。

当您配置 KafkaUser 身份验证和授权机制时,请确保它们与等同的 Kafka 配置匹配:

  • KafkaUser.spec.authentication matches Kafka.spec.kafka.listeners[*].authentication
  • KafkaUser.spec.authorizationKafka.spec.kafka.authorization匹配

您应至少有一个监听器支持您要用于 KafkaUser 的身份验证。

注意

Kafka 用户和 Kafka 代理之间的身份验证取决于每个用户的身份验证设置。例如,如果 Kafka 配置中没有启用 TLS 的用户,则无法验证它。

AMQ Streams Operator 自动配置过程:

  • Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便在 Kafka 集群中启用身份验证。
  • User Operator 根据所选的身份验证类型,创建代表客户端以及用于客户端身份验证的安全凭证的用户。

在这一流程中,使用了 Cluster Operator 生成的证书,但您可以通过 安装您自己的证书 来替换它们。您还可以将监听程序配置为使用 由外部证书颁发机构管理的 Kafka 侦听器证书。

证书以 PKCS #12(.p12)和 PEM(.crt)格式提供。此流程显示 PKCS #12 证书。

先决条件

  • Kafka 集群可用于客户端
  • Cluster Operator 和 User Operator 在集群中运行
  • OpenShift 集群外的客户端连接到 Kafka 集群

流程

  1. 使用外部 Kafka 侦听程序配置 Kafka 集群。

    • 定义通过监听程序访问 Kafka 代理所需的身份验证
    • 在 Kafka 代理上启用授权

      例如:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
        namespace: myproject
      spec:
        kafka:
          # ...
          listeners: 1
          - name: external 2
            port: 9094 3
            type: LISTENER-TYPE 4
            tls: true 5
            authentication:
              type: tls 6
            configuration:
              preferredNodePortAddressType: InternalDNS 7
              bootstrap and broker service overrides 8
              #...
          authorization: 9
            type: simple
            superUsers:
              - super-user-name 10
        # ...
      1
      2
      用于标识侦听器的名称。在 Kafka 集群中必须是唯一的。
      3
      Kafka 内侦听器使用的端口号。在给定 Kafka 集群中,端口号必须是唯一的。允许的端口号是 9092 及以上,但端口 9404 和 9999 除外,它们已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
      4
      外部监听程序类型,指定为 routeloadbalancernodeportingress。内部监听程序指定为 内部 监听程序。
      5
      在监听器上启用 TLS 加密。默认为 false路由 监听程序不需要 TLS 加密。
      6
      指定为 tls 的身份验证。
      7
      (可选,仅适用于 节点端口 侦听器)配置 ,以指定 AMQ Streams 作为节点地址使用的第一个地址类型的首选
      8
      (可选)AMQ Streams 自动决定要公告给客户端的地址。这些地址由 OpenShift 自动分配。如果您运行 AMQ Streams 的基础架构不提供正确的 地址,您可以覆盖 bootstrap 和代理服务 地址。验证不会在覆盖上执行。覆盖配置根据监听器类型而有所不同。例如,您可以覆盖用于 路由、DNS 名称或 负载均衡器 IP 地址的主机, 以及 nodeport 的节点端口。
      9
      指定为 simple 的授权,它使用 AclAuthorizer Kafka 插件。
      10
      (可选)超级用户可以访问所有代理,无论 ACL 中定义的任何访问权限限制。
      警告

      OpenShift Route 地址包含 Kafka 集群的名称、侦听器的名称以及在其中创建的命名空间的名称。例如,my -cluster-kafka-listener1-bootstrap-myproject (CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE)。如果您使用的是 路由 监听程序类型,请注意地址的长度不超过 63 个字符的最大限制。

  2. 创建或更新 Kafka 资源。

    oc apply -f KAFKA-CONFIG-FILE

    使用 TLS 身份验证配置 Kafka 代理监听程序。

    为每个 Kafka 代理 pod 创建一个服务。

    创建一个服务作为连接到 Kafka 集群的 bootstrap 地址

    服务也作为使用 nodeport 侦听器到 Kafka 集群 的外部 bootstrap 地址 创建。

    也使用与 Kafka 资源相同的名称来创建用于验证 kafka 代理身份的集群 CA 证书。

  3. Kafka 资源的状态查找 bootstrap 地址和端口。

    oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'

    使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。

  4. 创建或修改代表需要访问 Kafka 集群的客户端的用户。

    • 指定与 Kafka 侦听器相同的验证类型。
    • 指定简单授权的授权 ACL。

      例如:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaUser
      metadata:
        name: my-user
        labels:
          strimzi.io/cluster: my-cluster 1
      spec:
        authentication:
          type: tls 2
        authorization:
          type: simple
          acls: 3
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operation: Read
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operation: Describe
            - resource:
                type: group
                name: my-group
                patternType: literal
              operation: Read
      1
      该标签必须与 Kafka 集群的标签匹配才能创建用户。
      2
      指定为 tls 的身份验证。
      3
      简单授权需要附带的 ACL 规则列表才能应用到用户。规则根据 用户名 (my-user)定义 Kafka 资源允许的操作。
  5. 创建或修改 KafkaUser 资源。

    oc apply -f USER-CONFIG-FILE

    已创建用户,以及名称与 KafkaUser 资源相同的 Secret。Secret 包含用于 TLS 客户端身份验证的私钥和公钥。

    例如:

    apiVersion: v1
    kind: Secret
    metadata:
      name: my-user
      labels:
        strimzi.io/kind: KafkaUser
        strimzi.io/cluster: my-cluster
    type: Opaque
    data:
      ca.crt: PUBLIC-KEY-OF-THE-CLIENT-CA
      user.crt: USER-CERTIFICATE-CONTAINING-PUBLIC-KEY-OF-USER
      user.key: PRIVATE-KEY-OF-USER
      user.p12: P12-ARCHIVE-FILE-STORING-CERTIFICATES-AND-KEYS
      user.password: PASSWORD-PROTECTING-P12-ARCHIVE
  6. 将公共集群 CA 证书提取到所需的证书格式:

    oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
  7. 从密码文件中提取密码:

    oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
  8. 使用公共集群证书的身份验证详情配置客户端:

    客户端代码示例

    properties.put("security.protocol","SSL"); 1
    properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/path/to/ca.p12"); 2
    properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,CA-PASSWORD); 3
    properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"PKCS12"); 4

    1
    启用 TLS 加密(带有或不进行 TLS 客户端身份验证)。
    2
    指定导入证书的信任存储位置。
    3
    指定用于访问信任存储的密码。如果信任存储不需要此属性,则可以省略它。
    4
    标识 truststore 类型。
    注意

    在通过 TLS 使用 SCRAM-SHA 身份验证时,请使用 security.protocol: SASL_SSL

  9. 将用户 CA 证书从用户 Secret 提取到所需的证书格式:

    oc get secret USER-NAME -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
  10. 从密码文件中提取密码:

    oc get secret USER-NAME -o jsonpath='{.data.user\.password}' | base64 -d > user.password
  11. 使用用户 CA 证书的身份验证详情配置您的客户端:

    客户端代码示例

    properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/path/to/user.p12"); 1
    properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"<user.password>"); 2
    properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"PKCS12"); 3

    1
    指定导入证书的密钥存储位置。
    2
    指定用于访问密钥存储的密码。如果密钥存储不需要此属性,则可以省略它。公共用户证书在创建时由客户端 CA 签名。
    3
    标识密钥存储类型。
  12. 添加用于连接到 Kafka 集群的 bootstrap 地址和端口:

    bootstrap.servers: BOOTSTRAP-ADDRESS:PORT

其它资源