Skip to content

Commit

Permalink
Added point in time support and the Search.iterate() method (#1833)
Browse files Browse the repository at this point in the history
* Added point in time support and the Search.iterate() method

* Update elasticsearch_dsl/_async/search.py

Co-authored-by: Quentin Pradet <[email protected]>

* feedback

---------

Co-authored-by: Quentin Pradet <[email protected]>
  • Loading branch information
miguelgrinberg and pquentin authored May 30, 2024
1 parent 76a57fd commit 4a9d882
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 0 deletions.
43 changes: 43 additions & 0 deletions elasticsearch_dsl/_async/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.

import contextlib

from elasticsearch.exceptions import ApiError
from elasticsearch.helpers import async_scan

Expand Down Expand Up @@ -92,6 +94,8 @@ async def scan(self):
pass to the underlying ``scan`` helper from ``elasticsearch-py`` -
https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
The ``iterate()`` method should be preferred, as it provides similar
functionality using an Elasticsearch point in time.
"""
es = get_connection(self._using)

Expand All @@ -113,6 +117,45 @@ async def delete(self):
)
)

@contextlib.asynccontextmanager
async def point_in_time(self, keep_alive="1m"):
"""
Open a point in time (pit) that can be used across several searches.
This method implements a context manager that returns a search object
configured to operate within the created pit.
:arg keep_alive: the time to live for the point in time, renewed with each search request
"""
es = get_connection(self._using)

pit = await es.open_point_in_time(
index=self._index or "*", keep_alive=keep_alive
)
search = self.index().extra(pit={"id": pit["id"], "keep_alive": keep_alive})
if not search._sort:
search = search.sort("_shard_doc")
yield search
await es.close_point_in_time(id=pit["id"])

async def iterate(self, keep_alive="1m"):
"""
Return a generator that iterates over all the documents matching the query.
This method uses a point in time to provide consistent results even when
the index is changing. It should be preferred over ``scan()``.
:arg keep_alive: the time to live for the point in time, renewed with each new search request
"""
async with self.point_in_time(keep_alive=keep_alive) as s:
while True:
r = await s.execute()
for hit in r:
yield hit
if len(r.hits) == 0:
break
s = r.search_after()


class AsyncMultiSearch(MultiSearchBase):
"""
Expand Down
41 changes: 41 additions & 0 deletions elasticsearch_dsl/_sync/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.

import contextlib

from elasticsearch.exceptions import ApiError
from elasticsearch.helpers import scan

Expand Down Expand Up @@ -88,6 +90,8 @@ def scan(self):
pass to the underlying ``scan`` helper from ``elasticsearch-py`` -
https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
The ``iterate()`` method should be preferred, as it provides similar
functionality using an Elasticsearch point in time.
"""
es = get_connection(self._using)

Expand All @@ -105,6 +109,43 @@ def delete(self):
es.delete_by_query(index=self._index, body=self.to_dict(), **self._params)
)

@contextlib.contextmanager
def point_in_time(self, keep_alive="1m"):
"""
Open a point in time (pit) that can be used across several searches.
This method implements a context manager that returns a search object
configured to operate within the created pit.
:arg keep_alive: the time to live for the point in time, renewed with each search request
"""
es = get_connection(self._using)

pit = es.open_point_in_time(index=self._index or "*", keep_alive=keep_alive)
search = self.index().extra(pit={"id": pit["id"], "keep_alive": keep_alive})
if not search._sort:
search = search.sort("_shard_doc")
yield search
es.close_point_in_time(id=pit["id"])

def iterate(self, keep_alive="1m"):
"""
Return a generator that iterates over all the documents matching the query.
This method uses a point in time to provide consistent results even when
the index is changing. It should be preferred over ``scan()``.
:arg keep_alive: the time to live for the point in time, renewed with each new search request
"""
with self.point_in_time(keep_alive=keep_alive) as s:
while True:
r = s.execute()
for hit in r:
yield hit
if len(r.hits) == 0:
break
s = r.search_after()


class MultiSearch(MultiSearchBase):
"""
Expand Down
31 changes: 31 additions & 0 deletions tests/test_integration/_async/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,37 @@ async def test_search_after_no_results(async_data_client):
await r.search_after()


@pytest.mark.asyncio
async def test_point_in_time(async_data_client):
page_size = 7
commits = []
async with AsyncSearch(index="flat-git")[:page_size].point_in_time(
keep_alive="30s"
) as s:
pit_id = s._extra["pit"]["id"]
while True:
r = await s.execute()
commits += r.hits
if len(r.hits) < page_size:
break
s = r.search_after()
assert pit_id == s._extra["pit"]["id"]
assert "30s" == s._extra["pit"]["keep_alive"]

assert 52 == len(commits)
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}


@pytest.mark.asyncio
async def test_iterate(async_data_client):
s = AsyncSearch(index="flat-git")

commits = [commit async for commit in s.iterate()]

assert 52 == len(commits)
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}


@pytest.mark.asyncio
async def test_response_is_cached(async_data_client):
s = Repository.search()
Expand Down
29 changes: 29 additions & 0 deletions tests/test_integration/_sync/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,35 @@ def test_search_after_no_results(data_client):
r.search_after()


@pytest.mark.sync
def test_point_in_time(data_client):
page_size = 7
commits = []
with Search(index="flat-git")[:page_size].point_in_time(keep_alive="30s") as s:
pit_id = s._extra["pit"]["id"]
while True:
r = s.execute()
commits += r.hits
if len(r.hits) < page_size:
break
s = r.search_after()
assert pit_id == s._extra["pit"]["id"]
assert "30s" == s._extra["pit"]["keep_alive"]

assert 52 == len(commits)
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}


@pytest.mark.sync
def test_iterate(data_client):
s = Search(index="flat-git")

commits = [commit for commit in s.iterate()]

assert 52 == len(commits)
assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits}


@pytest.mark.sync
def test_response_is_cached(data_client):
s = Repository.search()
Expand Down
1 change: 1 addition & 0 deletions utils/run-unasync.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def main(check=False):
"async_sleep": "sleep",
"assert_awaited_once_with": "assert_called_once_with",
"pytest_asyncio": "pytest",
"asynccontextmanager": "contextmanager",
}
rules = [
unasync.Rule(
Expand Down

0 comments on commit 4a9d882

Please sign in to comment.