0

I am trying to mock elasticsearch data for hosted CI unit-testing purposes.

I have prepared some fixtures that I can successfully load with bulk(), but then, for unknown reason, I cannot match anything, even though the test_index seemingly contains the data (because I can get() items by their IDs).

The fixtures.json is a subset of ES documents that I fetched from real production index. With real world index, everything works as expected and all tests pass.

An artificial example of the strange behaviour follows:

class MyTestCase(TestCase):
    es = Elasticsearch()

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.es.indices.create('test_index', SOME_SCHEMA)

        with open('fixtures.json') as fixtures:
            bulk(cls.es, json.load(fixtures))

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()
        cls.es.indices.delete('test_index')

    def test_something(self):
        # check all documents are there:
        with open('fixtures.json') as fixtures:
            for f in json.load(fixtures):
                print(self.es.get(index='test_index', id=f['_id']))
                # yes they are!

        # BUT:
        match_all = {"query": {"match_all": {}}}
        print('hits:', self.es.search(index='test_index', body=match_all)['hits']['hits'])
        # prints `hits: []` like there was nothing in

        print('count:', self.es.count(index='test_index', body=match_all)['count'])
        # prints `count: 0`
4

2 回答 2

1

虽然@jsmesami 在他的回答中非常正确,但这样做可能更干净。如果您注意到,问题是因为 ES 没有重新索引。API 实际上为此目的公开了一些函数。尝试类似的东西,

cls.es.indices.flush(wait_if_ongoing=True)
cls.es.indices.refresh(index='*')

更具体地说,您可以传递index='test_index'给这两个函数。我认为这是一种比使用sleep(..).

于 2016-09-18T09:53:31.700 回答
1

虽然我可以完全理解您的痛苦(除了测试之外一切正常),但答案实际上很简单:与您的实验相比,测试太快了。

  • Elasticsearch 是近乎实时的搜索引擎,这意味着在对文档进行索引和可搜索之间存在长达 1 秒的延迟。
  • 在创建索引和准备就绪之间也存在不可预测的延迟(取决于实际开销)。

所以解决方法是time.sleep()给 ES 一些空间来创建它需要给你结果的所有魔法。我会这样做:

@classmethod
def setUpClass(cls):
    super().setUpClass()
    cls.es.indices.create('test_index', SOME_SCHEMA)

    with open('fixtures.json') as fixtures:
        bulk(cls.es, json.load(fixtures))

    cls.wait_until_index_ready()

@classmethod
def wait_until_index_ready(cls, timeout=10):
    for sec in range(timeout):
        time.sleep(1)
        if cls.es.cluster.health().get('status') in ('green', 'yellow'):
            break
于 2016-09-16T09:16:56.470 回答