4

在多处理脚本中使用 elasticsearch-py 的正确方法是什么?我应该在启动进程之前创建一个新的客户端对象并使用该对象,还是应该在每个进程中创建一个新对象。第二个给我一个来自弹性搜索的连接问题的错误

谢谢基兰

4

2 回答 2

3

当我将客户端对象声明为全局变量时,似乎第一种方法对我有用。

from multiprocessing import Pool
from elasticsearch import Elasticsearch
import time


def task(body):
    result = es.index(index='test', doc_type='test', body=body)
    return result


def main():
    pool = Pool(processes=MAX_CONNECTS)
    result = []
    for x in range(10):
        result.append(pool.apply_async(task, ({'id': x},)))
    time.sleep(1)
    for rs in result:
        print(rs.get())


if __name__ == "__main__":
    MAX_CONNECTS = 5
    es = Elasticsearch(hosts="localhost", maxsize=MAX_CONNECTS)
    main()

输出看起来像

{'_index': 'test', '_type': 'test', '_id': 'xEjqBWcB9xsUYKqz-P6U', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'w0jqBWcB9xsUYKqz-P6U', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'x0jqBWcB9xsUYKqz-P6X', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'xkjqBWcB9xsUYKqz-P6X', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'xUjqBWcB9xsUYKqz-P6W', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'yEjqBWcB9xsUYKqz-P66', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'ykjqBWcB9xsUYKqz-P7I', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'yUjqBWcB9xsUYKqz-P7I', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'y0jqBWcB9xsUYKqz-P7P', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'zEjqBWcB9xsUYKqz-P7V', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 5, '_primary_term': 1}
于 2018-11-12T03:21:41.113 回答
0

推荐的方法是创建一个唯一的客户端对象,您可以使用maxsize(默认为 10)增加同时线程的数量。

es = Elasticsearch( "host1", maxsize=25)

资源

于 2016-12-28T14:41:45.233 回答