我正在开发一个使用 Spring Cloud Stream 构建的 Kafka Streams 应用程序。在这个应用程序中,我需要:
- 使用可在以后检索的连续消息流。
- 保留与某些条件匹配的消息 ID 列表。
- 在一个单独的线程中,运行一个调度程序,它会定期读取消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。
- 从列表中删除已处理的消息 ID,以免重复工作。
我考虑过如下实施:
- 将传入的消息流作为具体化的 KTable 使用,以便我以后可以通过键查找和检索消息。
- 在另一个状态存储中实现消息 ID 列表。
InteractiveQueryService
使用 Spring 的调度机制运行一个单独的线程,该线程通过bean从状态存储中读取。
我遇到的问题是InteractiveQueryService
提供了对状态存储的只读访问权限,因此我无法删除另一个线程中的条目。我决定不使用 Kafka Stream 的 punctuate 功能,因为语义不同;我的调度线程必须始终定期运行,而不管入站消息的处理。
另一种选择可能是使用低级处理器 API,并将对可写状态存储的引用传递给我的调度程序线程。我需要同步写操作。但是我不确定这是否可行,或者在从单独的线程访问这样的状态存储时是否存在其他限制。
任何意见或建议将不胜感激!