0

我们创建了一个新的 AWS MSK (kafka) 集群

我们创建了一个新配置并分配给该集群:

auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000

使用 python 脚本,我们想发送一条新消息:

client = boto3.client('kafka')
response = client.get_bootstrap_brokers(ClusterArn=os.environ.get('MSKARN'))
bootstrap_servers = response['BootstrapBrokerStringTls']

instance = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    security_protocol='SSL',
    ssl_keyfile='kafka.client.truststore.jsk',
    api_version=(2, 2, 1)
)
    
instance.send("TEST", value=message)

它不起作用并说:

kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

如果我们在发送消息之前创建主题,它可以正常工作。

4

1 回答 1

1

找到了解决方案。

问题与这一行有关:

default.replication.factor=3

它必须是经纪人的数量,我们有 2 个经纪人并重置该值。

于 2021-02-23T18:26:00.867 回答