我们正在尝试编写一个订阅特定主题的消费者,并在满足数据中的几个条件时进行一些处理。然而,处理中的一个操作只能执行一次,为了简单起见,让我们认为这是一个非幂等的 POST http 请求。
以下是其他一些注意事项:-
- 消费者需要在 node.js 中编码并使用 kafkajs 库,
- 消费者需要在 Kubernetes 上运行,
- 副本的数量将等于生产者副本的数量
- 我们将为我们的消费者使用一个消费者组
我们在想,为了让这个消费者幂等,我们也许可以做这样的事情:-
For Every Message
Check if message was processed
try:
If !processed
Do Processing (http POST included here)
catch for errors:
Do error processing
finally:
Mark message as processed
“将消息标记为已处理”基本上是将一些细节捕获到关系数据库(如 Postgres),偏移量、时间戳和其他一些细节,以确保我们捕获的键允许我们唯一地识别记录
- 以上看起来对使消费者幂等有好处吗?
- 您还能建议哪些其他替代方案效果更好?
除了上述之外,还有一些关于上述场景中数据库处理最佳实践的问题:-
假设我有 3 个 k8s 节点,每个节点都有 3 个消费者 pod 运行,本质上提供了 9 个单线程 kafka 消费者。这是正确的理解吗?
现在由于这些线程中的每一个都将执行数据库插入/读取,那么在池和客户端之间使用什么会更好(假设 node-postgres 库)?
似乎如果我们在一天的开始时打开一个客户端连接并让它一直存在到一天结束,它应该对我们有用。这是一个好方法还是一个非常糟糕的设计?
如果我们对这 9 个运行的消费者的每条消息进行处理,我们是否会通过使用池获得任何好处。
附加假设:-
- 交通时间:东部时间上午 7:00 开始,白天接载,美国晚上逐渐减少。东部时间凌晨 2:00 到东部时间早上 6:00 之间没有交通。
- 平均:- 美国白天每秒 1 条消息,
- 最大:- 在美国白天,每秒 5 条消息,持续时间短。
- 延迟容忍度:在正常情况下,消费者中的 POST 可以延迟不超过消息发布时间的 5 分钟。
感谢您耐心阅读并阅读此内容。为这篇文章的长度道歉。