1

我正在使用启用了 KPL/KCL 的用于 Kinesis 的 Spring Cloud Stream Binder。我们希望禁用 Cloudwatch 指标,而不必自己管理 KPL 和 KCL 的配置(完全覆盖 bean)。我们希望对除和属性之外的KinesisProducerConfiguration每个属性使用相同的 bean 定义。KinesisClientLibConfigurationKinesisProducerConfiguration.setMetricsLevel()KinesisClientLibConfiguration.withMetricsLevel(...)

作为参考,这里是在 Spring Cloud Stream Kinesis Binder 中定义 AWS bean 的位置:KinesisBinderConfiguration.java

最有效的方法是什么?

任何帮助表示赞赏!谢谢。

4

1 回答 1

1

该框架不提供任何KinesisClientLibConfiguration. 公开这样一个 bean 以及您需要的任何选项是您的项目责任:https ://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream -binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties

从版本 2.0.1 开始,KinesisClientLibConfiguration可以在应用程序上下文中提供类型的 bean,以完全控制 Kinesis 客户端库配置选项。

生产者端确实被KinesisProducerConfigurationbean覆盖KinesisBinderConfiguration

@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled")
public KinesisProducerConfiguration kinesisProducerConfiguration() {
    KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration();
    kinesisProducerConfiguration.setCredentialsProvider(this.awsCredentialsProvider);
    kinesisProducerConfiguration.setRegion(this.region);
    return kinesisProducerConfiguration;
}

从这里我看不出有什么大问题,在您自己的配置中声明这样一个 bean 并带有您希望拥有的任何其他属性,包括提到的指标。

如果这仍然不适合您,您可以将这种 bean 注入到您自己的 bean 中,并以您想要的任何方式对其进行变异:

@Bean
String configurerBean(KinesisProducerConfiguration kinesisProducerConfiguration)  {
   kinesisProducerConfiguration.setMetricsLevel();
   return null;
}

更新

消费者部分:

这是一个基于我们在内部创建的 KCL 的默认配置实例的 bean:

@Bean
KinesisClientLibConfiguration kinesisClientLibConfiguration() {
    return new KinesisClientLibConfiguration(this.consumerGroup,
                            this.stream,
                            null,
                            null,
                            this.streamInitialSequence,
                            this.kinesisProxyCredentialsProvider,
                            null,
                            null,
                            KinesisClientLibConfiguration.DEFAULT_FAILOVER_TIME_MILLIS,
                            this.workerId,
                            KinesisClientLibConfiguration.DEFAULT_MAX_RECORDS,
                            this.idleBetweenPolls,
                            false,
                            KinesisClientLibConfiguration.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
                            KinesisClientLibConfiguration.DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
                            KinesisClientLibConfiguration.DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
                            new ClientConfiguration(),
                            new ClientConfiguration(),
                            new ClientConfiguration(),
                            this.consumerBackoff,
                            KinesisClientLibConfiguration.DEFAULT_METRICS_BUFFER_TIME_MILLIS,
                            KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE,
                            KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
                            null,
                            KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS,
                            KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE,
                            new SimpleRecordsFetcherFactory(),
                            DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS,
                            DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
                            DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
}

无论您看到什么,this.都必须用您环境中的相应值替换。在这种情况下,这可能KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE就是您要寻找的东西。

this.consumerGroupthis.stream必须与您要为其配置使用者的绑定相同。

于 2021-12-07T15:29:28.400 回答