基本上我正在使用来自 spring cloud stream kafka 的消息并将其插入到 MongoDB 如果我的 mongo 集群启动,我的代码工作正常我有 2 个问题如果我的 Mongo 实例关闭
- 云流的自动提交已禁用(autoCommitOffset 设置为 false),即使尚未确认消息,也不会发生重新轮询
- 在检查 Mongo 连接时,如果它收到两个具有相同 ID 的消息,则在该时间段内需要一些时间,然后如果我启动 mongo 实例,它会复制在正常情况下工作正常的消息
我们有解决这些问题的办法吗?
这是我的代码,
interface ResourceInventorySink {
companion object {
const val INPUT = "resourceInventoryInput"
}
@Input(INPUT)
fun input(): SubscribableChannel
}
@EnableBinding(ResourceInventorySink::class)
class InventoryEventListeners {
val logger = LoggerFactory.getLogger(javaClass)
@Autowired
lateinit var resourceInventoryService : ResourceInventoryService
@StreamListener(ResourceInventorySink.INPUT, condition = OperationConstants.INSERT)
fun receiveInsert(event : Message<ResourceInventoryEvent>) {
logger.info("received Insert message {}", event.payload.toString())
val success = resourceInventoryService.insert(event.payload)
success.subscribe({
logger.info("Data Inserted", event.payload.toString())
event.headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)?.acknowledge()
},{
if(it !is DataAccessResourceFailureException) {
logger.error("Exception Occured {} {}", it.message , it.cause.toString())
event.headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)?.acknowledge()
}
else {
logger.error("Error Inserting in Mongo DB {}", it.cause)
}
})
}
这是我的服务班
@Service
class ResourceInventoryService() {
val logger = LoggerFactory.getLogger(javaClass)
@Autowired
lateinit var resourceInventoryRepository: ResourceInventoryRepository
fun insert(newResource: ResourceInventoryEvent) = resourceInventoryRepository
.findByProductId(newResource.productId)
.switchIfEmpty(newResource.convertTODocument().toMono())
.flatMap { resourceInventoryRepository.save(it) }
.onErrorResume { Mono.error(it) }
这是我的 application.yml
spring:
cloud:
stream:
default:
consumer:
useNativeEncoding: true
kafka:
binder:
brokers:
- localhost:9092
consumer-properties:
key.deserializer : org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
enable.auto.commit: false
specific.avro.reader: true
bindings:
resourceInventoryInput:
consumer:
autoOffsetCommit: false
default-binder: kafka
bindings:
resourceInventoryInput:
binder: kafka
destination: ${application.messaging.topic}
content-type: application/*+avro
group: ${application.messaging.group}
编辑 1. 确认为空