我想将一长串事件发布到使用 fs2.Stream 的 Kafka 中,该 fs2.Stream 对应于一个非常大的 DB 行列表,如果编译为 List,最终将导致 Out Of Memory 错误。
因此,假设我有一个非常大的 UUID 键列表,其中包含数百万条记录:
def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO, UUID]
并且我想使用这个 Publisher 将一个事件发布到 Kafka 中,该事件对应于 500 个键的块:
trait KeyPublisher {
def publish(event: ChunkOfKeys): IO[Long]
}
我想创建一个函数来将此流排入队列/发布到 Kafka:
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
getKeyStream(endDateTime)
.chunkN(500)
.evalMap(myChunk => ?????)
...
}
如何使用由数据库产生的流,将其拆分为恒定大小的块,然后将它们中的每一个发布到 Kafka 中?
显然,很难找到关于这个主题的好的文档或示例。你能指出我正确的方向吗?