我试图解决的问题可以描述如下:
- k8s 调度程序每周一早上调用我的 Finagle/Finch 应用程序的 HTTP 端点
- HTTP 端点将处理请求并返回一个
IO[Int]
(立即) - 在生成 HTTP 响应之前,应该启动另一个函数的执行,因此它将异步运行
- 异步进程应该使用来自使用 Doobie 生成的 Postgres RDB 的数据流
- 该流应在恒定内存中进行操作,并且应将数据推送到 Kafka 主题中
val myHTTPEndpoint: Endpoint[IO, Int] =
get("k8s-trigger") { () =>
keyService
.processKeys(LocalDateTime.now)
.map(Ok)
}
class KeyService {
def processKeys(endDateTime: LocalDateTime): IO[Int] =
for {
numberOfKeys <- reportCardRepository.countReportCardKeys(endDateTime) // Doobie Repo
_ <- IO.shift *> enqueueKeys(endDateTime, numberOfKeys, keysAccumulator = 0) // To be executed asynchronously
} yield numberOfKeys
def enqueueKeys(endDateTime: LocalDateTime,
keysCounter: Int,
keysAccumulator: Int): IO[Int] = ???
所以本质上我想从数据库中获取要处理的键的数量,触发异步执行,enqueueKeys()
它将获取 numberOfKeys 作为输入参数,并将这个数字返回给 HTTP 端点以便在之后立即构建响应,而不等待enqueueKeys()
完成其计算。
我想知道是否有可能enqueueKeys()
作为批处理过程异步执行,该批处理过程应该发生在另一个线程池 - 另一个 ContextShift 上。使用cats.effect.Async或者使用FS2 是否可行?