我的 Scs 应用程序有两个具有此配置的 Kafka 生产者:
spring:
cloud:
function:
definition: myProducer1;myProducer2
stream:
bindings:
myproducer1-out-0:
destination: topic1
producer:
useNativeEncoding: true
myproducer2-out-0:
destination: topic2
producer:
useNativeEncoding: true
kafka:
binder:
brokers: ${kafka.brokers:localhost}
min-partition-count: 3
replication-factor: 3
producerProperties:
enable:
idempotence: false
retries: 10000
acks: all
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${schema-registry.url:http://localhost:8081}
它在大约 10 秒内开始:
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MyAppApplicationKt : Started MyAppApplicationKt in 11.288 seconds (JVM running for 11.868)
我需要我的生产者是幂等的,所以我设置了enabled.idempotence: true
. 通过此更改,启动时间会慢 7 倍(有时甚至超过 10 倍):
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MyAppApplicationKt : Started MyAppApplicationKt in 71.489 seconds (JVM running for 72.127)
如何加快启动速度?
更新:
我在启动过程中发现了Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms.
一个问题(当它没有出现时,启动速度和以前一样快。
在以下日志中,它仅发生在一个生产者中:
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586864007183
org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: lkc-nvqmv
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 32029 with epoch 0
然后在被卡住 30 秒后ProducerId set to 32029 with epoch 0
,它记录了第二个生产者的信息消息Proceeding to force close...
并初始化了第二个生产者,没有任何问题:
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Proceeding to force close the producer since pending
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer1-out-0' has 1 subscriber(s).
o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: topic2
o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
...
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Instantiated an idempotent producer.
o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586864038612
org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Cluster ID: lkc-nvqmv
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 30000 ms.
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] ProducerId set to 32030 with epoch 0
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Proceeding to force close the producer since pending
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MetricsIngestorApplicationKt : Started MetricsIngestorApplicationKt in 66.834 seconds (JVM running for 67.544)
更新 2:
我已经调试了这背后的逻辑,它发生在doBindProducer()
方法期间。它获取主题的分区,并为此在KafkaMessageChannelBinder
.
@Override
protected MessageHandler createProducerMessageHandler(
final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
MessageChannel channel, MessageChannel errorChannel) throws Exception {
/*
* IMPORTANT: With a transactional binder, individual producer properties for
* Kafka are ignored; the global binder
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
* instead, for all producers. A binder is transactional when
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
*/
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null, producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(), false, () -> {
Producer<byte[], byte[]> producer = producerFB.createProducer();
List<PartitionInfo> partitionsFor = producer
.partitionsFor(destination.getName());
producer.close();
if (this.transactionManager == null) {
((DisposableBean) producerFB).destroy();
}
return partitionsFor;
}, destination.getName());
正确检索此列表后List<PartitionInfo> partitionsFor
,它会卡在 KafkaProducer.destroy() 中,直到 30 秒超时到期:
为什么会在那里阻塞?会不会是活页夹的bug?