0

我正在尝试上传大量文件——大约 700 万份。

我为要添加的每个文档创建了操作,并将它们拆分为大约 260 个文件,每个文件大约 30K 个文档。

下面是动作的格式:

a = someDocument with nesting

esActionFromFile = [{
'_index': 'mt-interval-test-9', 
'_type': 'doc', 
'_id': 5641254, 
'_source': a, 
'_op_type': 'create'}]

我曾尝试使用 helpers.bulk、helpers.parallel_bulk 和 helpers.streaming_bulk,并且使用 helpers.bulk 和 helpers.streaming_bulk 取得了部分成功。

每次运行测试时,我都会删除,然后使用以下命令重新创建索引:

# Refresh Index
es.indices.delete(index=index, ignore=[400, 404])
es.indices.create(index = index, body = mappings_request_body)

当我部分成功时 - 加载了许多文档,但最终我收到 409 版本冲突错误。

我知道,当 ES 没有足够的时间在通过查询删除后处理单个文档的删除时,可能会产生版本冲突。

起初,我以为这里发生了类似的事情。但是,我意识到我经常在第一次处理文件时从文件中获取错误(即,即使删除导致问题,这个特定文件也从未加载过,因此不会发生冲突)。

我使用的_id 值是我从中提取数据的原始数据库的主键 - 所以我确定它们是唯一的。此外,我检查了我的操作数组或我创建它们的文件中是否存在无意重复的记录,并且没有重复

我无法解释为什么会发生这种情况,并且很难找到上传数据的解决方案。

任何帮助将不胜感激!

4

1 回答 1

1

回复中应该附有信息409,可以准确地告诉您出了什么问题以及是哪个文件导致的。

可能导致这种情况的另一件事是retry- 当elasticsearch-py无法连接到集群时,它将再次将请求重新发送到不同的节点。在一些复杂的场景中,可能会发生请求因此被发送两次。如果您启用了retry_on_timeout选项,则尤其如此。

于 2018-04-27T09:17:15.253 回答