7

在弹性搜索中滚动时,在每次滚动时提供最新的很重要scroll_id

初始搜索请求和每个后续滚动请求都会返回一个新的 scroll_id — 只应使用最近的 scroll_id。

以下示例(取自此处)使我感到困惑。一、滚动初始化:

rs = es.search(index=['tweets-2014-04-12','tweets-2014-04-13'], 
               scroll='10s', 
               search_type='scan', 
               size=100, 
               preference='_primary_first',
               body={
                 "fields" : ["created_at", "entities.urls.expanded_url", "user.id_str"],
                   "query" : {
                     "wildcard" : { "entities.urls.expanded_url" : "*.ru" }
                   }
               }
   )
sid = rs['_scroll_id']

然后循环:

tweets = [] while (1):
    try:
        rs = es.scroll(scroll_id=sid, scroll='10s')
        tweets += rs['hits']['hits']
    except:
        break

它有效,但我看不到在哪里sid更新......我相信它发生在内部,在 python 客户端中;但我不明白它是如何工作的......

4

5 回答 5

10

这是一个老问题,但由于某种原因,在搜索“elasticsearch python scroll”时首先出现。python 模块提供了一个辅助方法来为您完成所有工作。它是一个生成器函数,将在管理底层滚动 ID 时将每个文档返回给您。

https://elasticsearch-py.readthedocs.io/en/master/helpers.html#scan

这是一个使用示例:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan

query = {
    "query": {"match_all": {}}
}

es = Elasticsearch(...)
for hit in scan(es, index="my-index", query=query):
    print(hit["_source"]["field"])
于 2020-08-14T17:27:23.470 回答
7

使用 python 请求

import requests
import json

elastic_url = 'http://localhost:9200/my_index/_search?scroll=1m'
scroll_api_url = 'http://localhost:9200/_search/scroll'
headers = {'Content-Type': 'application/json'}

payload = {
    "size": 100,
    "sort": ["_doc"]
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    }
}

r1 = requests.request(
    "POST",
    elastic_url,
    data=json.dumps(payload),
    headers=headers
)

# first batch data
try:
    res_json = r1.json()
    data = res_json['hits']['hits']
    _scroll_id = res_json['_scroll_id']
except KeyError:
    data = []
    _scroll_id = None
    print 'Error: Elastic Search: %s' % str(r1.json())
while data:
    print data
    # scroll to get next batch data
    scroll_payload = json.dumps({
        'scroll': '1m',
        'scroll_id': _scroll_id
    })
    scroll_res = requests.request(
        "POST", scroll_api_url,
        data=scroll_payload,
        headers=headers
    )
    try:
        res_json = scroll_res.json()
        data = res_json['hits']['hits']
        _scroll_id = res_json['_scroll_id']
    except KeyError:
        data = []
        _scroll_id = None
        err_msg = 'Error: Elastic Search Scroll: %s'
        print err_msg % str(scroll_res.json())

参考:https ://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#search-request-scroll

于 2018-04-10T12:31:34.647 回答
4

事实上,代码中有一个错误 - 为了正确使用滚动功能,您应该在下一次调用 scroll() 时使用每次新调用返回的新 scroll_id,而不是重用第一个:

重要的

初始搜索请求和每个后续滚动请求都会返回一个新的 scroll_id — 只应使用最近的 scroll_id。

http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html

它之所以有效,是因为 Elasticsearch 并不总是在调用之间更改 scroll_id,并且对于较小的结果集,它可以返回与最初返回一段时间相同的 scroll_id。去年的讨论是在其他两个看到相同问题的用户之间进行的,相同的 scroll_id 被返回了一段时间:

http://elasticsearch-users.115913.n3.nabble.com/Distributing-query-results-using-scrolling-td4036726.html

因此,虽然您的代码适用于较小的结果集,但它是不正确的 - 您需要捕获在每次新调用 scroll() 时返回的 scroll_id 并将其用于下一次调用。

于 2014-07-28T15:42:42.210 回答
1

self._elkUrl = " http://Hostname:9200/logstash- */_search?scroll=1m"

self._scrollUrl="http://Hostname:9200/_search/scroll"


    """
    Function to get the data from ELK through scrolling mechanism
    """ 
    def GetDataFromELK(self):
        #implementing scroll and retriving data from elk to get more than 100000 records at one search
        #ref :https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-scroll.html
        try :
            dataFrame=pd.DataFrame()
            if self._elkUrl is None:
                raise ValueError("_elkUrl is missing")
            if self._username is None:
                raise ValueError("_userNmae for elk is missing")
            if self._password is None:
                raise ValueError("_password for elk is missing")
            response=requests.post(self._elkUrl,json=self.body,auth=(self._username,self._password))
            response=response.json()
            if response is None:
                raise ValueError("response is missing")
            sid  = response['_scroll_id']
            hits = response['hits']
            total= hits["total"]
            if total is  None:
                raise ValueError("total hits from ELK is none")
            total_val=int(total['value'])
            url = self._scrollUrl
            if url is None:
                raise ValueError("scroll url is missing")
            #start scrolling 
            while(total_val>0):
                #keep search context alive for 2m
                scroll = '2m'
                scroll_query={"scroll" : scroll, "scroll_id" : sid }
                response1=requests.post(url,json=scroll_query,auth=(self._username,self._password))
                response1=response1.json()
                # The result from the above request includes a scroll_id, which should be passed to the scroll API in order to retrieve the next batch of results
                sid = response1['_scroll_id']
                hits=response1['hits']
                data=response1['hits']['hits']
                if len(data)>0:
                    cleanDataFrame=self.DataClean(data)
                    dataFrame=dataFrame.append(cleanDataFrame)
                total_val=len(response1['hits']['hits'])
                num=len(dataFrame)
            print('Total records recieved from ELK=',num)
            return dataFrame
        except Exception as e:
            logging.error('Error while getting the data from elk', exc_info=e)
            sys.exit()

于 2020-05-26T20:32:09.203 回答
1
from elasticsearch import Elasticsearch

elasticsearch_user_name ='es_username'
elasticsearch_user_password ='es_password'
es_index = "es_index"

es = Elasticsearch(["127.0.0.1:9200"],
                   http_auth=(elasticsearch_user_name, elasticsearch_user_password))


query = {
  "query": {
     "bool": {
        "must": [
            {
            "range": {
                "es_datetime": {
                    "gte": "2021-06-21T09:00:00.356Z",
                    "lte": "2021-06-21T09:01:00.356Z",
                    "format": "strict_date_optional_time"
                    }
                }
        }
    ]
 }
},
  "fields": [
      "*"
],
  "_source": False,
  "size": 2000,
}
resp = es.search(index=es_index, body=query, scroll="1m")
old_scroll_id = resp['_scroll_id']
results = resp['hits']['hits']
while len(results):
    for i, r in enumerate(results):
    # do something whih data
    pass
    result = es.scroll(
        scroll_id=old_scroll_id,
        scroll='1m'  # length of time to keep search context
    )
    # check if there's a new scroll ID
    if old_scroll_id != result['_scroll_id']:
        print("NEW SCROLL ID:", result['_scroll_id'])
    # keep track of pass scroll _id
    old_scroll_id = result['_scroll_id']

    results = result['hits']['hits']
于 2021-06-23T06:23:37.770 回答