1

我们正在尝试编写一个订阅特定主题的消费者,并在满足数据中的几个条件时进行一些处理。然而,处理中的一个操作只能执行一次,为了简单起见,让我们认为这是一个非幂等的 POST http 请求。

以下是其他一些注意事项:-

  • 消费者需要在 node.js 中编码并使用 kafkajs 库,
  • 消费者需要在 Kubernetes 上运行,
  • 副本的数量将等于生产者副本的数量
  • 我们将为我们的消费者使用一个消费者组

我们在想,为了让这个消费者幂等,我们也许可以做这样的事情:-

For Every Message
    Check if message was processed
    try:
        If !processed
            Do Processing (http POST included here)
    catch for errors:
            Do error processing
    finally:
        Mark message as processed

“将消息标记为已处理”基本上是将一些细节捕获到关系数据库(如 Postgres),偏移量、时间戳和其他一些细节,以确保我们捕获的键允许我们唯一地识别记录

  1. 以上看起来对使消费者幂等有好处吗?
  2. 您还能建议哪些其他替代方案效果更好?

除了上述之外,还有一些关于上述场景中数据库处理最佳实践的问题:-

  1. 假设我有 3 个 k8s 节点,每个节点都有 3 个消费者 pod 运行,本质上提供了 9 个单线程 kafka 消费者。这是正确的理解吗?

  2. 现在由于这些线程中的每一个都将执行数据库插入/读取,那么在池和客户端之间使用什么会更好(假设 node-postgres 库)?

  3. 似乎如果我们在一天的开始时打开一个客户端连接并让它一直存在到一天结束,它应该对我们有用。这是一个好方法还是一个非常糟糕的设计?

  4. 如果我们对这 9 个运行的消费者的每条消息进行处理,我们是否会通过使用池获得任何好处。

附加假设:-

  • 交通时间:东部时间上午 7:00 开始,白天接载,美国晚上逐渐减少。东部时间凌晨 2:00 到东部时间早上 6:00 之间没有交通。
  • 平均:- 美国白天每秒 1 条消息,
  • 最大:- 在美国白天,每秒 5 条消息,持续时间短。
  • 延迟容忍度:在正常情况下,消费者中的 POST 可以延迟不超过消息发布时间的 5 分钟。

感谢您耐心阅读并阅读此内容。为这篇文章的长度道歉。

4

1 回答 1

2
  1. 以上看起来对使消费者幂等有好处吗?

是的,从幂等性的 POV 来看,您的代码看起来不错。由于您使用的是 Kafka 消费者,因此您不需要用于消息处理的专用 for 循环。每次消息到达时都会调用消费者。您的伪代码应如下所示:

Check if message was processed
try:
    If !processed
        Do Processing (http POST included here)
catch for errors:
        Do error processing
finally:
    Mark message as processed
  1. 您还能建议哪些其他替代方案效果更好?

您的代码错过了一个重要方面,即并发重复消息。例如,生产者以某种方式同时生成了两条消息(这实际上是生产者端的错误),并且该消息应该只处理一次。消费者开始处理这两条消息。在这一点上If !processed,两个消费者都看到了相同的状态,not processed并且他们都继续前进Do Processing。您可以通过获取某个 id 的锁定来避免这种情况,通过该 id 可以判断消息是否重复。由于您已经在使用 Postgres,因此您可以查看pg_advisory_locks. 所以现在,你的伪代码现在看起来像:

Check if message was processed
try:
    acquire_lock(message.uniqId)    //pg_advisory_lock
    If !processed
        Do Processing (http POST included here)
catch for errors:
    if error is lock_already_acquired_by_other_process
        return as duplicate processor
    else
        Do error processing
finally:
    Mark message as processed
    release lock

我们仍然可以做一些改进。上面的代码不处理我们想要重试的失败场景。有多种方法可以实现这一目标。哦,等等,你正在使用 Kafka。为什么不在一段时间后在同一个 Kafka 主题中发布处理失败的消息(显然不是那些重复的消息),并在消息对象中设置一些计数器来检查该消息被处理了多少次。我们当然希望只重试有限的次数,所以每次处理消息时,我们都可以检查我们之前在消息对象中设置的计数器来控制重试次数。到目前为止一切顺利,但是即使在固定重试次数之后仍然失败的消息又如何呢?对于这种情况,你'

这听起来需要编写很多代码。我们还有另一个好消息。您可以利用一些可用的库来实现所有这些。一个这样的图书馆是公牛

  1. 假设我有 3 个 k8s 节点,每个节点都有 3 个消费者 pod 运行,本质上提供了 9 个单线程 kafka 消费者。这是正确的理解吗?

是的。据我了解。

  1. 现在,由于这些线程中的每一个都将执行数据库插入/读取,那么在池和客户端之间使用什么会更好(假设 node-postgres 库)?

建议使用池,因为您还希望实现更快的处理。使用连接池,您可以执行诸如同时触发多个查询而无需将它们排队,利用任何使用并行执行的底层库等操作。当然,我们不应该用连接填充我们的内存,因此调整数量的连接根据 pod 的内存在池中是可取的。

  1. 似乎如果我们在一天的开始时打开一个客户端连接并让它一直存在到一天结束,它应该对我们有用。这是一个好方法还是一个非常糟糕的设计?

我无法正确理解您在这里尝试做什么,但我希望连接池。

  1. 如果我们对这 9 个运行的消费者的每条消息进行处理,我们是否会通过使用池获得任何好处。

是的。除了第 4 点中已经提到的好处之外,您还可以更好地利用 k8s pod 的资源(同样,这取决于根据消息传入率是否有 9 个消费者是最优的)。

于 2021-08-07T07:00:42.657 回答