From 5a01221c80bda426350635b7e0d7e92dc1e003a8 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Wed, 10 Jul 2024 16:17:29 +0100 Subject: [PATCH 1/2] Add support for bulk document operations --- elasticsearch_dsl/_async/document.py | 83 ++++++++++++++++- elasticsearch_dsl/_sync/document.py | 81 +++++++++++++++- .../test_integration/_async/test_document.py | 92 ++++++++++++++++++- tests/test_integration/_sync/test_document.py | 92 ++++++++++++++++++- 4 files changed, 344 insertions(+), 4 deletions(-) diff --git a/elasticsearch_dsl/_async/document.py b/elasticsearch_dsl/_async/document.py index 8baa4ca4..ac416843 100644 --- a/elasticsearch_dsl/_async/document.py +++ b/elasticsearch_dsl/_async/document.py @@ -16,9 +16,20 @@ # under the License. import collections.abc -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + AsyncIterable, + Dict, + List, + Optional, + Tuple, + Union, + cast, +) from elasticsearch.exceptions import NotFoundError, RequestError +from elasticsearch.helpers import async_bulk from typing_extensions import Self, dataclass_transform from .._async.index import AsyncIndex @@ -438,3 +449,73 @@ async def save( setattr(self.meta, k, meta["_" + k]) return meta if return_doc_meta else meta["result"] + + @classmethod + async def bulk( + cls, + actions: AsyncIterable[Union[Self, Dict[str, Any]]], + using: Optional[AsyncUsingType] = None, + index: Optional[str] = None, + validate: bool = True, + skip_empty: bool = True, + **kwargs: Any, + ) -> Tuple[int, Union[int, List[Any]]]: + """ + Allows to perform multiple indexing operations in a single request. + + :arg actions: a generator that returns document instances to be indexed, + bulk operation dictionaries. + :arg using: connection alias to use, defaults to ``'default'`` + :arg index: elasticsearch index to use, if the ``Document`` is + associated with an index this can be omitted. + :arg validate: set to ``False`` to skip validating the documents + :arg skip_empty: if set to ``False`` will cause empty values (``None``, + ``[]``, ``{}``) to be left on the document. Those values will be + stripped out otherwise as they make no difference in elasticsearch. + + Any additional keyword arguments will be passed to + ``Elasticsearch.bulk`` unchanged. + + :return: bulk operation results + """ + es = cls._get_connection(using) + + i = cls._default_index(index) + assert i is not None + + class Generate: + def __init__( + self, + doc_iterator: AsyncIterable[Union[AsyncDocument, Dict[str, Any]]], + ): + self.doc_iterator = doc_iterator.__aiter__() + + def __aiter__(self) -> Self: + return self + + async def __anext__(self) -> Dict[str, Any]: + doc: Optional[Union[AsyncDocument, Dict[str, Any]]] = ( + await self.doc_iterator.__anext__() + ) + + if isinstance(doc, dict): + action = doc + doc = None + if "_source" in action and isinstance( + action["_source"], AsyncDocument + ): + doc = action["_source"] + if validate: # pragma: no cover + doc.full_clean() + action["_source"] = doc.to_dict( + include_meta=False, skip_empty=skip_empty + ) + elif doc is not None: + if validate: # pragma: no cover + doc.full_clean() + action = doc.to_dict(include_meta=True, skip_empty=skip_empty) + if "_index" not in action: + action["_index"] = i + return action + + return await async_bulk(es, Generate(actions), **kwargs) diff --git a/elasticsearch_dsl/_sync/document.py b/elasticsearch_dsl/_sync/document.py index d9a4a271..ac0a3a73 100644 --- a/elasticsearch_dsl/_sync/document.py +++ b/elasticsearch_dsl/_sync/document.py @@ -16,9 +16,20 @@ # under the License. import collections.abc -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, + cast, +) from elasticsearch.exceptions import NotFoundError, RequestError +from elasticsearch.helpers import bulk from typing_extensions import Self, dataclass_transform from .._sync.index import Index @@ -432,3 +443,71 @@ def save( setattr(self.meta, k, meta["_" + k]) return meta if return_doc_meta else meta["result"] + + @classmethod + def bulk( + cls, + actions: Iterable[Union[Self, Dict[str, Any]]], + using: Optional[UsingType] = None, + index: Optional[str] = None, + validate: bool = True, + skip_empty: bool = True, + **kwargs: Any, + ) -> Tuple[int, Union[int, List[Any]]]: + """ + Allows to perform multiple indexing operations in a single request. + + :arg actions: a generator that returns document instances to be indexed, + bulk operation dictionaries. + :arg using: connection alias to use, defaults to ``'default'`` + :arg index: elasticsearch index to use, if the ``Document`` is + associated with an index this can be omitted. + :arg validate: set to ``False`` to skip validating the documents + :arg skip_empty: if set to ``False`` will cause empty values (``None``, + ``[]``, ``{}``) to be left on the document. Those values will be + stripped out otherwise as they make no difference in elasticsearch. + + Any additional keyword arguments will be passed to + ``Elasticsearch.bulk`` unchanged. + + :return: bulk operation results + """ + es = cls._get_connection(using) + + i = cls._default_index(index) + assert i is not None + + class Generate: + def __init__( + self, + doc_iterator: Iterable[Union[Document, Dict[str, Any]]], + ): + self.doc_iterator = doc_iterator.__iter__() + + def __iter__(self) -> Self: + return self + + def __next__(self) -> Dict[str, Any]: + doc: Optional[Union[Document, Dict[str, Any]]] = ( + self.doc_iterator.__next__() + ) + + if isinstance(doc, dict): + action = doc + doc = None + if "_source" in action and isinstance(action["_source"], Document): + doc = action["_source"] + if validate: # pragma: no cover + doc.full_clean() + action["_source"] = doc.to_dict( + include_meta=False, skip_empty=skip_empty + ) + elif doc is not None: + if validate: # pragma: no cover + doc.full_clean() + action = doc.to_dict(include_meta=True, skip_empty=skip_empty) + if "_index" not in action: + action["_index"] = i + return action + + return bulk(es, Generate(actions), **kwargs) diff --git a/tests/test_integration/_async/test_document.py b/tests/test_integration/_async/test_document.py index 36173eb6..cbe9a405 100644 --- a/tests/test_integration/_async/test_document.py +++ b/tests/test_integration/_async/test_document.py @@ -23,10 +23,11 @@ from datetime import datetime from ipaddress import ip_address -from typing import Any +from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, List, Union import pytest from elasticsearch import AsyncElasticsearch, ConflictError, NotFoundError +from elasticsearch.helpers.errors import BulkIndexError from pytest import raises from pytz import timezone @@ -49,6 +50,7 @@ RankFeatures, Text, analyzer, + mapped_field, ) from elasticsearch_dsl.utils import AttrList @@ -705,3 +707,91 @@ async def test_highlight_in_meta(async_data_client: AsyncElasticsearch) -> None: assert "description" in commit.meta.highlight assert isinstance(commit.meta.highlight["description"], AttrList) assert len(commit.meta.highlight["description"]) > 0 + + +@pytest.mark.asyncio +async def test_bulk(async_data_client: AsyncElasticsearch) -> None: + class Address(InnerDoc): + street: str + active: bool + + class Doc(AsyncDocument): + if TYPE_CHECKING: + _id: int + name: str + age: int + languages: List[str] = mapped_field(Keyword()) + addresses: List[Address] + + class Index: + name = "bulk-index" + + await Doc._index.delete(ignore_unavailable=True) + await Doc.init() + + async def gen1() -> AsyncIterator[Union[Doc, Dict[str, Any]]]: + yield Doc( + name="Joe", + age=33, + languages=["en", "fr"], + addresses=[ + Address(street="123 Main St", active=True), + Address(street="321 Park Dr.", active=False), + ], + ) + yield Doc(name="Susan", age=20, languages=["en"]) + yield {"_op_type": "create", "_id": "45", "_source": Doc(name="Sarah", age=45)} + + await Doc.bulk(gen1(), refresh=True) + docs = list(await Doc.search().execute()) + assert len(docs) == 3 + assert docs[0].to_dict() == { + "name": "Joe", + "age": 33, + "languages": [ + "en", + "fr", + ], + "addresses": [ + { + "active": True, + "street": "123 Main St", + }, + { + "active": False, + "street": "321 Park Dr.", + }, + ], + } + assert docs[1].to_dict() == { + "name": "Susan", + "age": 20, + "languages": ["en"], + } + assert docs[2].to_dict() == { + "name": "Sarah", + "age": 45, + } + assert docs[2].meta.id == "45" + + async def gen2() -> AsyncIterator[Union[Doc, Dict[str, Any]]]: + yield {"_op_type": "create", "_id": "45", "_source": Doc(name="Sarah", age=45)} + + # a "create" action with an existing id should fail + with raises(BulkIndexError): + await Doc.bulk(gen2(), refresh=True) + + async def gen3() -> AsyncIterator[Union[Doc, Dict[str, Any]]]: + yield Doc(_id="45", name="Sarah", age=45, languages=["es"]) + yield {"_op_type": "delete", "_id": docs[1].meta.id} + + await Doc.bulk(gen3(), refresh=True) + with raises(NotFoundError): + await Doc.get(docs[1].meta.id) + doc = await Doc.get("45") + assert doc is not None + assert (doc).to_dict() == { + "name": "Sarah", + "age": 45, + "languages": ["es"], + } diff --git a/tests/test_integration/_sync/test_document.py b/tests/test_integration/_sync/test_document.py index a102a333..c36a9931 100644 --- a/tests/test_integration/_sync/test_document.py +++ b/tests/test_integration/_sync/test_document.py @@ -23,10 +23,11 @@ from datetime import datetime from ipaddress import ip_address -from typing import Any +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Union import pytest from elasticsearch import ConflictError, Elasticsearch, NotFoundError +from elasticsearch.helpers.errors import BulkIndexError from pytest import raises from pytz import timezone @@ -49,6 +50,7 @@ Search, Text, analyzer, + mapped_field, ) from elasticsearch_dsl.utils import AttrList @@ -699,3 +701,91 @@ def test_highlight_in_meta(data_client: Elasticsearch) -> None: assert "description" in commit.meta.highlight assert isinstance(commit.meta.highlight["description"], AttrList) assert len(commit.meta.highlight["description"]) > 0 + + +@pytest.mark.sync +def test_bulk(data_client: Elasticsearch) -> None: + class Address(InnerDoc): + street: str + active: bool + + class Doc(Document): + if TYPE_CHECKING: + _id: int + name: str + age: int + languages: List[str] = mapped_field(Keyword()) + addresses: List[Address] + + class Index: + name = "bulk-index" + + Doc._index.delete(ignore_unavailable=True) + Doc.init() + + def gen1() -> Iterator[Union[Doc, Dict[str, Any]]]: + yield Doc( + name="Joe", + age=33, + languages=["en", "fr"], + addresses=[ + Address(street="123 Main St", active=True), + Address(street="321 Park Dr.", active=False), + ], + ) + yield Doc(name="Susan", age=20, languages=["en"]) + yield {"_op_type": "create", "_id": "45", "_source": Doc(name="Sarah", age=45)} + + Doc.bulk(gen1(), refresh=True) + docs = list(Doc.search().execute()) + assert len(docs) == 3 + assert docs[0].to_dict() == { + "name": "Joe", + "age": 33, + "languages": [ + "en", + "fr", + ], + "addresses": [ + { + "active": True, + "street": "123 Main St", + }, + { + "active": False, + "street": "321 Park Dr.", + }, + ], + } + assert docs[1].to_dict() == { + "name": "Susan", + "age": 20, + "languages": ["en"], + } + assert docs[2].to_dict() == { + "name": "Sarah", + "age": 45, + } + assert docs[2].meta.id == "45" + + def gen2() -> Iterator[Union[Doc, Dict[str, Any]]]: + yield {"_op_type": "create", "_id": "45", "_source": Doc(name="Sarah", age=45)} + + # a "create" action with an existing id should fail + with raises(BulkIndexError): + Doc.bulk(gen2(), refresh=True) + + def gen3() -> Iterator[Union[Doc, Dict[str, Any]]]: + yield Doc(_id="45", name="Sarah", age=45, languages=["es"]) + yield {"_op_type": "delete", "_id": docs[1].meta.id} + + Doc.bulk(gen3(), refresh=True) + with raises(NotFoundError): + Doc.get(docs[1].meta.id) + doc = Doc.get("45") + assert doc is not None + assert (doc).to_dict() == { + "name": "Sarah", + "age": 45, + "languages": ["es"], + } From 84530060f6224af43624d61cf691667aa7f8557b Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Tue, 30 Jul 2024 13:55:04 +0100 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Quentin Pradet --- elasticsearch_dsl/_async/document.py | 4 ++-- elasticsearch_dsl/_sync/document.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/elasticsearch_dsl/_async/document.py b/elasticsearch_dsl/_async/document.py index ac416843..1ff68a79 100644 --- a/elasticsearch_dsl/_async/document.py +++ b/elasticsearch_dsl/_async/document.py @@ -466,12 +466,12 @@ async def bulk( :arg actions: a generator that returns document instances to be indexed, bulk operation dictionaries. :arg using: connection alias to use, defaults to ``'default'`` - :arg index: elasticsearch index to use, if the ``Document`` is + :arg index: Elasticsearch index to use, if the ``Document`` is associated with an index this can be omitted. :arg validate: set to ``False`` to skip validating the documents :arg skip_empty: if set to ``False`` will cause empty values (``None``, ``[]``, ``{}``) to be left on the document. Those values will be - stripped out otherwise as they make no difference in elasticsearch. + stripped out otherwise as they make no difference in Elasticsearch. Any additional keyword arguments will be passed to ``Elasticsearch.bulk`` unchanged. diff --git a/elasticsearch_dsl/_sync/document.py b/elasticsearch_dsl/_sync/document.py index ac0a3a73..abcda3a3 100644 --- a/elasticsearch_dsl/_sync/document.py +++ b/elasticsearch_dsl/_sync/document.py @@ -460,12 +460,12 @@ def bulk( :arg actions: a generator that returns document instances to be indexed, bulk operation dictionaries. :arg using: connection alias to use, defaults to ``'default'`` - :arg index: elasticsearch index to use, if the ``Document`` is + :arg index: Elasticsearch index to use, if the ``Document`` is associated with an index this can be omitted. :arg validate: set to ``False`` to skip validating the documents :arg skip_empty: if set to ``False`` will cause empty values (``None``, ``[]``, ``{}``) to be left on the document. Those values will be - stripped out otherwise as they make no difference in elasticsearch. + stripped out otherwise as they make no difference in Elasticsearch. Any additional keyword arguments will be passed to ``Elasticsearch.bulk`` unchanged.