2

我正在尝试使用 python 中的 elasticsearch helpers 中的并行批量方法对生成器进行批量索引,但是该方法似乎没有执行任何操作。如果我使用常规批量方法,则对 elasticsearch 的摄取运行良好。我已经查看了这个问题并遇到了这个解决方案:https ://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498我试过了(预计发电机会被消耗) 但它似乎仍然不起作用。没有输出错误,也没有消耗迭代器,这是我的代码:

@staticmethod
    def fetch_rows(cursor):
        frame = cursor.fetchone()
        while frame is not None:
            yield frame
            frame = cursor.fetchone()

@staticmethod
def __generate_field(body):
    """
    Takes an action and creates an iterator element json join body
    :param body: adds json body to generator
    :return: item iterator
    """
    for item in body:
        yield item

def json_for_bulk_body_sql_list(self, body, index_name: str, name_of_docs: str):
    """
    :param body: List that will be made as a generator
    :param index_name : name of the index based on location
    :param name_of_docs : name of the docs that you want of in the index
    :return: Structured JSON file for bulking
    """

    # if not isinstance(body, list):
    #     raise TypeError('Body must be a list')
    if not isinstance(index_name, str):
        raise TypeError('index must be a string')

    structured_json_body = ({
        '_op_type': 'index',
        '_index': index_name,  # index name Twitter
        '_type': name_of_docs,  # type is tweet
        '_id': doc['tweet_id'],  # id of the tweet
        '_source': doc
    } for doc in self.__generate_field(body))
    return structured_json_body


json_results = (dict(zip(column_names, row)) for row in self.fetch_rows(cursor))
actions = (self.json_for_bulk_body_sql_list(json_results, index_name=index_, name_of_docs=doc_name))

for success, info in self.bulk_es_parallel(actions=actions):
    if not success:
        print('Doc failed: '.upper(), info)
    else:
        ingested += 1

我正在做与解决方案 url 中的示例所说的完全相同的事情,但它仍然没有摄取到弹性搜索中。即使在我调试之后也无法弄清楚为什么。

太感谢了 !

4

0 回答 0