1

基本上我正在使用来自 spring cloud stream kafka 的消息并将其插入到 MongoDB 如果我的 mongo 集群启动,我的代码工作正常我有 2 个问题如果我的 Mongo 实例关闭

  1. 云流的自动提交已禁用(autoCommitOffset 设置为 false),即使尚未确认消息,也不会发生重新轮询
  2. 在检查 Mongo 连接时,如果它收到两个具有相同 ID 的消息,则在该时间段内需要一些时间,然后如果我启动 mongo 实例,它会复制在正常情况下工作正常的消息

我们有解决这些问题的办法吗?

这是我的代码,

interface ResourceInventorySink {
companion object {
    const val INPUT = "resourceInventoryInput"
}
@Input(INPUT)
fun input(): SubscribableChannel

}

    @EnableBinding(ResourceInventorySink::class)
    class InventoryEventListeners {



   val logger = LoggerFactory.getLogger(javaClass)
    @Autowired
    lateinit var  resourceInventoryService : ResourceInventoryService


    @StreamListener(ResourceInventorySink.INPUT, condition = OperationConstants.INSERT)
    fun receiveInsert(event : Message<ResourceInventoryEvent>) {
        logger.info("received Insert message {}", event.payload.toString())
        val success = resourceInventoryService.insert(event.payload)
        success.subscribe({
            logger.info("Data Inserted", event.payload.toString())
            event.headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)?.acknowledge()
        },{
            if(it !is DataAccessResourceFailureException) {
                logger.error("Exception Occured {} {}", it.message , it.cause.toString())
                event.headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)?.acknowledge()
            }
            else {
                logger.error("Error Inserting in Mongo DB {}", it.cause)
            }

        })
    }

这是我的服务班

@Service
class ResourceInventoryService() {

    val logger = LoggerFactory.getLogger(javaClass)

    @Autowired
    lateinit var  resourceInventoryRepository: ResourceInventoryRepository

    fun insert(newResource: ResourceInventoryEvent) = resourceInventoryRepository
                                                     .findByProductId(newResource.productId)
                                                     .switchIfEmpty(newResource.convertTODocument().toMono())
                                                     .flatMap { resourceInventoryRepository.save(it) }
                                                     .onErrorResume { Mono.error(it) }

这是我的 application.yml

spring:
  cloud:
    stream:
      default:
        consumer:
          useNativeEncoding: true
      kafka:
        binder:
          brokers:
            - localhost:9092
          consumer-properties:
            key.deserializer : org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema.registry.url: http://localhost:8081
            enable.auto.commit: false
            specific.avro.reader: true
        bindings:
          resourceInventoryInput:
            consumer:
              autoOffsetCommit: false
      default-binder: kafka
      bindings:
        resourceInventoryInput:
          binder: kafka
          destination: ${application.messaging.topic}
          content-type: application/*+avro
          group: ${application.messaging.group}

编辑 1. 确认为空

4

0 回答 0