0

我在本地使用wurstmeister kafkazookeeper docker图像来测试 kafka 中的 SASL 和 ACL。

的 docker-compose.yml是 -

version: '3'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    hostname: zookeeper
    container_name: zookeeper
    volumes:
    - ./zookeeper/zookeeper.sasl.jaas.config:/etc/kafka/zookeeper_server_jaas.conf
    - ./zk/data:/var/lib/zookeeper/data
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SET_ACL: 'true'
      KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf
          -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
          -Dzookeeper.allowSaslFailedClients=false
          -Dzookeeper.requireClientAuthScheme=sasl

  broker:
    image: wurstmeister/kafka:2.13-2.6.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    volumes:
      - ./kafka/kafka.jaas.conf:/etc/kafka/kafka_server_jaas.conf
      - ./kfk/data:/kafka
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:SASL_PLAINTEXT
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_AUTO_CREATE_TOPIC: 'true'
      KAFKA_LISTENERS: EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:9092
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_LISTENER_NAME_EXTERNAL_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_LISTENER_NAME_EXTERNAL_PLAIN_SASL_JAAS_CONFIG: |
            org.apache.kafka.common.security.plain.PlainLoginModule required \
            username="broker" \
            password="broker" \
            user_broker="broker" \
            user_client="client-secret" \
            user_alice="alice-secret";
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_INTER_BROKER_LISTENER_NAME: EXTERNAL

以下是 zookeeper 和 kafka 的 jaas 文件 -

zookeeper.sasl.jaas.config -

Server {
   org.apache.zookeeper.server.auth.DigestLoginModule required
   user_kafka="kafka";
};

kafka.jaas.config -

Client {
   org.apache.zookeeper.server.auth.DigestLoginModule required
   username="kafka"
   password="kafka";
};

我创建了 zookeeper 和 kafka 容器,并在 kafka 容器中运行了命令 -

/opt/kafka_2.13-2.6.0/bin # ./kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper:2181 --add --allow-principal User:alice --producer --topic testtopic
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=testtopic, patternType=LITERAL)`: 
    (principal=User:alice, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=testtopic, patternType=LITERAL)`: 
    (principal=User:alice, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW) 

但是当我尝试从我的 go 代码(使用 sarama)中产生事件时 - 它给出了错误

kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes.

我的代码是 -

package main

import "github.com/Shopify/sarama"

var brokers = []string{"127.0.0.1:9092"}

func newProducer() (sarama.SyncProducer, error) {
    config := sarama.NewConfig()
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true
    config.Net.SASL.User = "alice"
    config.Net.SASL.Password = "alice-secret"
    config.Net.SASL.Handshake = true
    config.Net.SASL.Enable = true
    producer, err := sarama.NewSyncProducer(brokers, config)

    return producer, err
}

func prepareMessage(topic, message string) *sarama.ProducerMessage {
    msg := &sarama.ProducerMessage{
        Topic:     topic,
        Partition: -1,
        Value:     sarama.StringEncoder(message),
    }

    return msg
}

func panicOnError(err error) {
    if err != nil {
        panic(err)
    }
}

func main() {
    producer, err := newProducer()
    panicOnError(err)
    msg := prepareMessage("testtopic", `{"key":"value"}`)
    _, _, err = producer.SendMessage(msg)
    panicOnError(err)
}

我也尝试了带有 --bootstrap-server (command - ./kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:alice --producer --topic testtopic) 参数的 kafka-acls.sh 但是脚本会卡住,我可以在 kafka docker 日志中观察到身份验证错误-

[2021-05-29 16:27:46,288] INFO [SocketServer brokerId=1002] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

PS:如果我只使用 SASL(没有 ACL),一切都可以正常工作

现在我被困在 acl 部分。任何人都知道我缺少什么(可能在 zookeeper 或 kafka 配置中)?

任何帮助表示赞赏。提前致谢。

4

1 回答 1

0

对于您的第一个问题,我会尝试以下建议https://github.com/Shopify/sarama/issues/272

对于第二个问题,您应该添加到命令行 --command-config /path/cmd.cfg

指示管理客户端属性以连接您的代理,例如 mechainsem SASL 等...

KAFKA_OPTS 设置 jaas 文件

并且 jaas 文件应该包含 KafkaClient 和用户、密码,以使用 PLAIN 身份验证方法连接到您的代理

于 2021-05-29T21:59:21.560 回答