2

我正在使用 kafka 0.8 和 spring-integration-kafka 1.2.0.RELEASE

我有 2 个名为主要和次要的主题。我需要从主要主题消费,经过一些处理后需要生成次要主题,以便稍后完成下一组处理。

虽然主要主题的消费工作正常,但几分钟后生产次要主题开始失败。问题从我设置的 500 毫秒后向 kafka 超时发送请求开始。以线程池耗尽结束。

如果我试图为另一个 kafka 集群的次要主题生成事件,它可以正常工作。

我有 4 个消费者运行两个主题,每个主题都有 200 个分区。

我对卡夫卡有点陌生,请原谅缺乏知识。请评论我应该提供的任何缺失信息。

4

2 回答 2

2

使用提供的信息有点难以了解,但我怀疑这个问题是您可以从第一个主题消费然后计算结果的速度比对第二个主题产生的速度要快。发生这种情况的原因可能有很多。例如,可能对次要主题的写入没有很好地分布在分区之间。同样,由于各种原因,包括更快的机器、更多的机器、更好的网络等,生产到不同的集群可能会成功。

基本问题并不是卡夫卡特有的:如果您从一个源消费并将该数据发送到第二个接收器,您通常不能假设第二个接收器总是比第一个源快。每当第二个水槽变慢时,即使是一点点,你最终都会遇到这样的问题。例如,假设您每秒可以从主接收器读取 100 个事件,但辅助接收器每秒只能消耗 99 个事件。这意味着每一秒您都会在内存中多出 1 个事件等待发送到您的接收器。如果您不采取任何措施来降低从主要来源读取的速度,您将耗尽 RAM、线程或其他资源。

一般的解决方案是某种节流。例如,您可以使用Semaphore以 500 个许可开头的 a:这意味着您永远无法从尚未成功发送到接收器的主要来源读取超过 500 个项目。在从主要来源阅读项目之前,您会减少 500 个项目,Semaphore以便如果您已经“领先”次要来源 500 个项目,您的阅读器将阻止。每次您成功地将一个项目发送到您的次要主题时,您都会释放一个允许另一个阅读继续进行的许可。

我会警告不要使用第二个 Kafka 集群或其他有效但不能真正解决核心问题的修复。例如,如果现在生产到不同的集群可以工作,那么当该集群由于节点丢失、大的重新平衡等而减速时它就不会了。这只是暂时隐藏了问题。

于 2016-12-27T20:47:41.737 回答
0

在尝试了所有可能的配置后终于发现了问题。

错误地忘记删除之前为消费者集成添加的以下依赖项。

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>

它在生产时引起了一些冲突,正在添加处于等待状态的线程。如果有人可以指导它可以增加什么冲突,那将是一个很好的学习。

谢谢。

于 2016-12-29T09:59:20.040 回答