0

我们在 application.YAML 中配置了两个 Kafka 代理,一个使用 SASL KERBEROS,另一个使用 SASL SCRAM。在启动服务时,它使用 SASL KERBEROS 连接到代理,并低于其他代理 (SASL SCRAM) 的错误。当我们在应用程序 YAML 中使用 SALS SCRAM 连接到一个代理时,它正在连接且没有任何错误

==================================================== ============================================= 将 SASL 客户端状态设置为 RECEIVE_APIVERSIONS_RESPONSE main] OakcsaSaslClientAuthenticator 将 SASL 客户端状态设置为 SEND_HANDSHAKE_REQUEST main] OakcsaSaslClientAuthenticator 将 SASL 客户端状态设置为 RECEIVE_HANDSHAKE_RESPONSE main] OakcsaSaslClientAuthenticator 将 SASL 客户端状态设置为 INITIAL main] o.apache.kafka.common.network.Selector 来自 100.76.140.194 的意外错误;关闭连接

java.lang.NullPointerException:在 org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java) 的 org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:389) 处为空:296) 在 org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:237)

应用程序.YAML

  binders:
   binder1:
    type: kafka
    environment:
     spring:
      cloud:
       stream:
        kafka:
         binder:
          replication-factor: 1
          brokers: ${eventhub.broker.hosts2}
          zkNodes: ${eventhub.zookeper.hosts2}
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: GSSAPI
            ssl:
              truststore:
                location: ${eventhub.broker.cert.location2}
                password: ${eventhub.broker.cert.password2}

          jaas:
           options:
            useKeyTab: true
            storeKey: true
            keyTab: /scratch/kafka/kafka2/krb5.keytab
            serviceName: kafka
            principal: kafka/XXXXXXXXXXXXXXXX.COM
         default:
          consumer:
           autoCommitOffset: false

   binder2:
    type: kafka
    environment:
     spring:
      cloud:
       stream:
        kafka:
         binder:
          brokers: ${eventhub.broker.hosts} # 10.40.158.93:9093
          zkNodes: ${eventhub.zookeper.hosts} #10.40.158.93:2181
          autoCreateTopics: false
          zkConnectionTimeout: 36000
          headers: 
           - event
           - sourceSystem
           - userId
           - branchCode
           - kafka_messageKey
          jaas:
            loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
            options:
              username: ${eventhub.broker.user}
              password: ${eventhub.broker.password}
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: SCRAM-SHA-256
            ssl:
              enabled:
              truststore:
                location: ${eventhub.broker.cert.location}
                password: ${eventhub.broker.cert.password}
4

1 回答 1

0

当您在单个应用程序中有多个具有不同安全上下文的集群时,您需要使用KIP-85中提到的方法,而不是依赖于JAAS通过 binder 设置配置或设置属性。基本上,您需要设置优先于其他方法的属性。通过使用,您可以覆盖使用 JVM 范围的静态安全上下文的 JVM 设置的限制,从而忽略在第一个配置之后发现的任何后续 JAAS 配置。java.security.auth.login.configsasl.jaas.configsasl.jaas.config

这是一个示例应用程序,它演示了如何将具有不同安全上下文的多个 Kafka 集群连接为一个多绑定器应用程序。

于 2021-07-07T20:45:09.660 回答