9

我们有一个应用程序,消费者读取消息,线程执行许多操作,包括在向另一个主题生成消息之前访问数据库。在线程上消费和产生消息之间的时间可能需要几分钟。一旦为新主题生成消息,就会完成提交以指示我们已完成对消费者队列消息的工作。由于这个原因,自动提交被禁用。

我正在使用高级消费者,我注意到 Zookeeper 和 kafka 会话超时,因为我们在消费者队列上做任何事情需要很长时间,所以每次线程返回以从消费者那里读取更多内容时,kafka 最终都会重新平衡队列,并且消费者在一段时间后读取新消息之前开始需要很长时间。

我可以将 Zookeeper 会话超时设置得非常高,以免造成问题,但随后我必须相应地调整重新平衡参数,并且 kafka 在一段时间内不会接收新的消费者以及其他副作用。

我有什么办法来解决这个问题?有没有办法让 kafka 和 zookeeper 心跳,让双方都开心?如果我使用一个简单的消费者,我还会遇到同样的问题吗?

4

2 回答 2

4

听起来您的问题归结为依靠高级消费者来管理上次读取的偏移量。使用简单的消费者可以解决这个问题,因为您控制了该偏移量的持久性。请注意,所有高级消费者提交所做的都是将上次读取的偏移量存储在 zookeeper 中。没有采取任何其他操作,您刚刚阅读的消息仍然存在于分区中,并且可供其他消费者阅读。

使用 kafka 简单消费者,您可以更好地控制偏移存储的发生时间和方式。您甚至可以在 Zookeeper 以外的其他地方(例如数据库)保留该偏移量。

坏消息是,虽然简单消费者本身比高级消费者更简单,但您还需要在代码方面做很多工作才能使其工作。您还必须编写代码来访问多个分区 - 高级消费者为您做得非常好。

于 2014-12-21T16:18:59.427 回答
0

我认为问题是消费者的轮询方法触发消费者的心跳请求。当你增加 session.timeout 时。消费者的心跳不会到达协调者。由于这个心跳跳过,协调器标记消费者死亡。而且消费者重新加入的速度非常慢,尤其是在单一消费者的情况下。

我遇到了类似的问题,为了解决我必须更改消费者配置属性中的以下参数

session.timeout.ms=request.timeout.ms=超过会话超时

您还必须在 kafka 代理节点的 server.properties 中添加以下属性。group.max.session.timeout.ms =

您可以查看以下链接以获取更多详细信息。 http://grokbase.com/t/kafka/users/16324waa50/session-timeout-ms-limit

于 2016-08-11T10:20:15.140 回答