1

我想使用 Spring Cloud Stream 设置一些转换(从一个主题消费,生产到另一个主题)。我也希望它是

  • 可靠 - 让我们说“至少一次”
  • 快速生产 - 每批次执行一次 producer.flush(),而不是每条消息
  • 快速偏移提交 - 每批从分区执行一次提交

使用原始 kafka 客户端,或者使用 spring-kafka 我会做下一个(假设生产者配置正确:acks=all,linger.ms > 0,batch-size 足够大等等):

  1. 获取消费消息(假设来自 consumer.poll())
  2. 为每条消息做我的自定义转换
  3. 每个消息的 producer.send(message).addCallback(...)
  4. 生产者.flush()
  5. 检查发送回调中是否没有错误
  6. consumer.commit() - 最高偏移量

问题是如何与 spring-cloud-stream 实现相似?我知道consumer.batch-mode = true和producer.sync = false,但我不确定将可靠产品与偏移提交绑定在一起的正确方法是什么

UPDATE 我想出了一个简单的解决方案(请注意,除了初始要求之外,我还需要动态路由和动态主题创建):

配置

public class DynamicRouterCfg {

    @Autowired
    private BinderAwareChannelResolver outputResolver;

    @Autowired
    private DynamicRouterProducerListener dynamicRouterProducerListener;

    private final Lock lock = new ReentrantLock();

    @Bean
    public Consumer<Message<List<byte[]>>> dynamicRouter() {
        return msgs -> {
            lock.lock();
            try {
                dynamicRouterProducerListener.setBatchSize(msgs.getPayload().size());

                for (byte[] payload : msgs.getPayload()) {
                    doBusinessLogic(payload);
                    outputResolver.resolveDestination(resolveDestination(payload))
                            .send(MessageBuilder.withPayload(payload)
                            .build());
                }

                if (dynamicRouterProducerListener.waitForBatchSuccess()) {
                    Acknowledgment ack = (Acknowledgment) msgs.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT);
                    Objects.requireNonNull(ack).acknowledge();
                }

            } finally {
                lock.unlock();
            }
        };
    }

    private void doBusinessLogic(byte[] payload) {
        // placeholder for business transformation
    }

    private String resolveDestination(byte[] payload) {
        // some business logic for destination resolving
        return null;
    }

    private static class DynamicRouterProducerListener implements ProducerListener<Object, Object> {

        private volatile CountDownLatch batchLatch;

        public void setBatchSize(int batchSize) {
            this.batchLatch = new CountDownLatch(batchSize);
        }

        public boolean waitForBatchSuccess() {
            try {
                return batchLatch.await(10000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                return false;
            }
        }

        @Override
        public void onSuccess(ProducerRecord<Object, Object> producerRecord, RecordMetadata recordMetadata) {
            batchLatch.countDown();
        }

    }
}

应用程序.yml

spring:
  cloud:
    function:
      definition: dynamicRouter
    stream:
      bindings:
        dynamicRouter-in-0:
          destination: consumed.topic
          group: test.group
          consumer:
            batch-mode: true
            concurrency: 1
            header-mode: none
            use-native-decoding: true
      kafka:
        binder:
          brokers: broker:9092
          auto-create-topics: true
          required-acks: all
        bindings:
          router-in-0:
            consumer:
              auto-rebalance-enabled: false
              auto-commit-offset: false
              auto-commit-on-error: false
              configuration.fetch.max.bytes: 5024000
              configuration.isolation.level: read_committed
        default:
          producer:
            sync: false
            batch-timeout: 10 # as i see this one would be converted to linger.ms
            compression: gzip
            configuration:
              max.in.flight.requests.per.connection: 1
              max.request.size: 5024000
            # i need to create topics with custom configuration
            topic.replication-factor: 2
            topic.properties:
              cleanup.policy: compact
              min.cleanable.dirty.ratio: 0.1

我可以在这里看到什么缺点:

  1. 使用已弃用的 BinderAwareChannelResolver:不知道如何spring.cloud.stream.sendto.destination在我的情况下使用
  2. 在每批之后它会等待多余的 ${batch-timeout}
  3. 不确定producer.onSuccess方法的保证(如果“重复”成功调用是可能的),尤其是当来自单个传入批次的消息被路由到不同的主题时
4

0 回答 0