我想使用 Spring Cloud Stream 设置一些转换(从一个主题消费,生产到另一个主题)。我也希望它是
- 可靠 - 让我们说“至少一次”
- 快速生产 - 每批次执行一次 producer.flush(),而不是每条消息
- 快速偏移提交 - 每批从分区执行一次提交
使用原始 kafka 客户端,或者使用 spring-kafka 我会做下一个(假设生产者配置正确:acks=all,linger.ms > 0,batch-size 足够大等等):
- 获取消费消息(假设来自 consumer.poll())
- 为每条消息做我的自定义转换
- 每个消息的 producer.send(message).addCallback(...)
- 生产者.flush()
- 检查发送回调中是否没有错误
- consumer.commit() - 最高偏移量
问题是如何与 spring-cloud-stream 实现相似?我知道consumer.batch-mode = true和producer.sync = false,但我不确定将可靠产品与偏移提交绑定在一起的正确方法是什么
UPDATE 我想出了一个简单的解决方案(请注意,除了初始要求之外,我还需要动态路由和动态主题创建):
配置
public class DynamicRouterCfg {
@Autowired
private BinderAwareChannelResolver outputResolver;
@Autowired
private DynamicRouterProducerListener dynamicRouterProducerListener;
private final Lock lock = new ReentrantLock();
@Bean
public Consumer<Message<List<byte[]>>> dynamicRouter() {
return msgs -> {
lock.lock();
try {
dynamicRouterProducerListener.setBatchSize(msgs.getPayload().size());
for (byte[] payload : msgs.getPayload()) {
doBusinessLogic(payload);
outputResolver.resolveDestination(resolveDestination(payload))
.send(MessageBuilder.withPayload(payload)
.build());
}
if (dynamicRouterProducerListener.waitForBatchSuccess()) {
Acknowledgment ack = (Acknowledgment) msgs.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT);
Objects.requireNonNull(ack).acknowledge();
}
} finally {
lock.unlock();
}
};
}
private void doBusinessLogic(byte[] payload) {
// placeholder for business transformation
}
private String resolveDestination(byte[] payload) {
// some business logic for destination resolving
return null;
}
private static class DynamicRouterProducerListener implements ProducerListener<Object, Object> {
private volatile CountDownLatch batchLatch;
public void setBatchSize(int batchSize) {
this.batchLatch = new CountDownLatch(batchSize);
}
public boolean waitForBatchSuccess() {
try {
return batchLatch.await(10000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return false;
}
}
@Override
public void onSuccess(ProducerRecord<Object, Object> producerRecord, RecordMetadata recordMetadata) {
batchLatch.countDown();
}
}
}
应用程序.yml
spring:
cloud:
function:
definition: dynamicRouter
stream:
bindings:
dynamicRouter-in-0:
destination: consumed.topic
group: test.group
consumer:
batch-mode: true
concurrency: 1
header-mode: none
use-native-decoding: true
kafka:
binder:
brokers: broker:9092
auto-create-topics: true
required-acks: all
bindings:
router-in-0:
consumer:
auto-rebalance-enabled: false
auto-commit-offset: false
auto-commit-on-error: false
configuration.fetch.max.bytes: 5024000
configuration.isolation.level: read_committed
default:
producer:
sync: false
batch-timeout: 10 # as i see this one would be converted to linger.ms
compression: gzip
configuration:
max.in.flight.requests.per.connection: 1
max.request.size: 5024000
# i need to create topics with custom configuration
topic.replication-factor: 2
topic.properties:
cleanup.policy: compact
min.cleanable.dirty.ratio: 0.1
我可以在这里看到什么缺点:
- 使用已弃用的 BinderAwareChannelResolver:不知道如何
spring.cloud.stream.sendto.destination
在我的情况下使用 - 在每批之后它会等待多余的 ${batch-timeout}
- 不确定
producer.onSuccess
方法的保证(如果“重复”成功调用是可能的),尤其是当来自单个传入批次的消息被路由到不同的主题时