该框架不提供任何KinesisClientLibConfiguration
. 公开这样一个 bean 以及您需要的任何选项是您的项目责任:https ://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream -binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties
从版本 2.0.1 开始,KinesisClientLibConfiguration
可以在应用程序上下文中提供类型的 bean,以完全控制 Kinesis 客户端库配置选项。
生产者端确实被KinesisProducerConfiguration
bean覆盖KinesisBinderConfiguration
:
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled")
public KinesisProducerConfiguration kinesisProducerConfiguration() {
KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration();
kinesisProducerConfiguration.setCredentialsProvider(this.awsCredentialsProvider);
kinesisProducerConfiguration.setRegion(this.region);
return kinesisProducerConfiguration;
}
从这里我看不出有什么大问题,在您自己的配置中声明这样一个 bean 并带有您希望拥有的任何其他属性,包括提到的指标。
如果这仍然不适合您,您可以将这种 bean 注入到您自己的 bean 中,并以您想要的任何方式对其进行变异:
@Bean
String configurerBean(KinesisProducerConfiguration kinesisProducerConfiguration) {
kinesisProducerConfiguration.setMetricsLevel();
return null;
}
更新
消费者部分:
这是一个基于我们在内部创建的 KCL 的默认配置实例的 bean:
@Bean
KinesisClientLibConfiguration kinesisClientLibConfiguration() {
return new KinesisClientLibConfiguration(this.consumerGroup,
this.stream,
null,
null,
this.streamInitialSequence,
this.kinesisProxyCredentialsProvider,
null,
null,
KinesisClientLibConfiguration.DEFAULT_FAILOVER_TIME_MILLIS,
this.workerId,
KinesisClientLibConfiguration.DEFAULT_MAX_RECORDS,
this.idleBetweenPolls,
false,
KinesisClientLibConfiguration.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
this.consumerBackoff,
KinesisClientLibConfiguration.DEFAULT_METRICS_BUFFER_TIME_MILLIS,
KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE,
KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
null,
KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE,
new SimpleRecordsFetcherFactory(),
DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS,
DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
}
无论您看到什么,this.
都必须用您环境中的相应值替换。在这种情况下,这可能KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE
就是您要寻找的东西。
this.consumerGroup
和this.stream
必须与您要为其配置使用者的绑定相同。