1

我试图解决的问题可以描述如下:

  1. k8s 调度程序每周一早上调用我的 Finagle/Finch 应用程序的 HTTP 端点
  2. HTTP 端点将处理请求并返回一个IO[Int](立即)
  3. 在生成 HTTP 响应之前,应该启动另一个函数的执行,因此它将异步运行
  4. 异步进程应该使用来自使用 Doobie 生成的 Postgres RDB 的数据流
  5. 该流应在恒定内存中进行操作,并且应将数据推送到 Kafka 主题中
  val myHTTPEndpoint: Endpoint[IO, Int] =
    get("k8s-trigger") { () =>
      keyService
        .processKeys(LocalDateTime.now)
        .map(Ok)
    }
class KeyService {
  def processKeys(endDateTime: LocalDateTime): IO[Int] =
    for {
      numberOfKeys <- reportCardRepository.countReportCardKeys(endDateTime) // Doobie Repo
      _ <- IO.shift *> enqueueKeys(endDateTime, numberOfKeys, keysAccumulator = 0) // To be executed asynchronously
    } yield numberOfKeys

  def enqueueKeys(endDateTime: LocalDateTime,
                  keysCounter: Int,
                  keysAccumulator: Int): IO[Int] = ???

所以本质上我想从数据库中获取要处理的键的数量,触发异步执行,enqueueKeys()它将获取 numberOfKeys 作为输入参数,并将这个数字返回给 HTTP 端点以便在之后立即构建响应,而不等待enqueueKeys()完成其计算。

我想知道是否有可能enqueueKeys()作为批处理过程异步执行,该批处理过程应该发生在另一个线程池 - 另一个 ContextShift 上。使用cats.effect.Async或者使用FS2 是否可行?

4

0 回答 0