0

我正在开发一个使用 Spring Cloud Stream 构建的 Kafka Streams 应用程序。在这个应用程序中,我需要:

  1. 使用可在以后检索的连续消息流。
  2. 保留与某些条件匹配的消息 ID 列表。
  3. 在一个单独的线程中,运行一个调度程序,它会定期读取消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。
  4. 从列表中删除已处理的消息 ID,以免重复工作。

我考虑过如下实施:

  1. 将传入的消息流作为具体化的 KTable 使用,以便我以后可以通过键查找和检索消息。
  2. 在另一个状态存储中实现消息 ID 列表。
  3. InteractiveQueryService使用 Spring 的调度机制运行一个单独的线程,该线程通过bean从状态存储中读取。

我遇到的问题是InteractiveQueryService提供了对状态存储的只读访问权限,因此我无法删除另一个线程中的条目。我决定不使用 Kafka Stream 的 punctuate 功能,因为语义不同;我的调度线程必须始终定期运行,而不管入站消息的处理。

另一种选择可能是使用低级处理器 API,并将对可写状态存储的引用传递给我的调度程序线程。我需要同步写操作。但是我不确定这是否可行,或者在从单独的线程访问这样的状态存储时是否存在其他限制。

任何意见或建议将不胜感激!

4

1 回答 1

1

我的调度线程必须始终定期运行,无论入站消息的处理如何

好吧,基于的标点符号WALL_CLOCK_TIME完全符合您上面的描述。

我遇到的问题是 InteractiveQueryService 提供对状态存储的只读访问

使用处理器 API 和标点符号允许您访问 with 中的状态存储init()ProcessorContext#getStateStore()并从ProcessorContext#schedule(). 此解决方案的优点是,处理器和标点符号在同一个线程中运行,您不需要它们之间的任何同步。

于 2020-06-24T15:53:57.623 回答