0

我想将一长串事件发布到使用 fs2.Stream 的 Kafka 中,该 fs2.Stream 对应于一个非常大的 DB 行列表,如果编译为 List,最终将导致 Out Of Memory 错误。

因此,假设我有一个非常大的 UUID 键列表,其中包含数百万条记录:

def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO, UUID]

并且我想使用这个 Publisher 将一个事件发布到 Kafka 中,该事件对应于 500 个键的块:

trait KeyPublisher {
  def publish(event: ChunkOfKeys): IO[Long]
}

我想创建一个函数来将此流排入队列/发布到 Kafka:

def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
  getKeyStream(endDateTime)
     .chunkN(500)
     .evalMap(myChunk => ?????)
     ...
}

如何使用由数据库产生的流,将其拆分为恒定大小的块,然后将它们中的每一个发布到 Kafka 中?

显然,很难找到关于这个主题的好的文档或示例。你能指出我正确的方向吗?

4

1 回答 1

3

既然你没有说什么类型ChunkOfKeys,我会假设它是这样的Chunk[UUID]

def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime)(
    xa: Transactor[IO],
    publisher: KeyPublisher
): IO[Unit] =
  getKeyStream(endDateTime)
    .transact(xa) // Convert the ConnectionIO stream to Stream[IO, UUID]
    .chunkN(500)  // into Stream[IO, Chunk[UUID]]
    .evalMap(publisher.publish)  // Into Stream[IO, Long]
    .compile
    .drain // An IO[Unit] that describes the whole process
于 2021-02-25T14:51:23.790 回答