3

我收到了最终出现在 Kafka 中的事件。从这些事件中,我使用 Kafka Streams 应用程序获取 id,并将其作为另一个主题中的一对 (id, 1) 发布回 Kafka。然后我想看看 id 是否已经存在于 ElasticSearch 中,如果存在则更新其计数器,否则在 ElasticSearch 中创建一条新记录,其 id 来自 Kafka,计数器设置为 1,即记录的更新 (id, 1)到 ES。

我希望为此使用 Kafka Connect to ElasticSearch,但如果可能的话,它似乎并不那么简单。我可以看到向 ​​ES 添加记录是可行的,但是与现有记录合并似乎是我还没有发现的事情。这是否已经可行,如果可以,如何实现,如果不可以,是否计划在附近的版本中实现?

4

1 回答 1

3

我分叉了datamountaineer ES sink 连接器以允许 Upsert。有了它,您可以指定一个 PK 并使用 docAsUpsert 将更新运行到 ES 中。您可以从我的 github fork获取项目并编译 Jar 。

于 2017-01-29T16:52:55.413 回答