Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for bulk document operations #1864

Merged
merged 2 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion elasticsearch_dsl/_async/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@
# under the License.

import collections.abc
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
from typing import (
TYPE_CHECKING,
Any,
AsyncIterable,
Dict,
List,
Optional,
Tuple,
Union,
cast,
)

from elasticsearch.exceptions import NotFoundError, RequestError
from elasticsearch.helpers import async_bulk
from typing_extensions import Self, dataclass_transform

from .._async.index import AsyncIndex
Expand Down Expand Up @@ -438,3 +449,73 @@ async def save(
setattr(self.meta, k, meta["_" + k])

return meta if return_doc_meta else meta["result"]

@classmethod
async def bulk(
cls,
actions: AsyncIterable[Union[Self, Dict[str, Any]]],
using: Optional[AsyncUsingType] = None,
index: Optional[str] = None,
validate: bool = True,
skip_empty: bool = True,
**kwargs: Any,
) -> Tuple[int, Union[int, List[Any]]]:
"""
Allows to perform multiple indexing operations in a single request.

:arg actions: a generator that returns document instances to be indexed,
bulk operation dictionaries.
:arg using: connection alias to use, defaults to ``'default'``
:arg index: elasticsearch index to use, if the ``Document`` is
miguelgrinberg marked this conversation as resolved.
Show resolved Hide resolved
associated with an index this can be omitted.
:arg validate: set to ``False`` to skip validating the documents
:arg skip_empty: if set to ``False`` will cause empty values (``None``,
``[]``, ``{}``) to be left on the document. Those values will be
stripped out otherwise as they make no difference in elasticsearch.
miguelgrinberg marked this conversation as resolved.
Show resolved Hide resolved

Any additional keyword arguments will be passed to
``Elasticsearch.bulk`` unchanged.

:return: bulk operation results
"""
es = cls._get_connection(using)

i = cls._default_index(index)
assert i is not None

class Generate:
def __init__(
self,
doc_iterator: AsyncIterable[Union[AsyncDocument, Dict[str, Any]]],
):
self.doc_iterator = doc_iterator.__aiter__()

def __aiter__(self) -> Self:
return self

async def __anext__(self) -> Dict[str, Any]:
doc: Optional[Union[AsyncDocument, Dict[str, Any]]] = (
await self.doc_iterator.__anext__()
)

if isinstance(doc, dict):
action = doc
doc = None
if "_source" in action and isinstance(
action["_source"], AsyncDocument
):
doc = action["_source"]
if validate: # pragma: no cover
doc.full_clean()
action["_source"] = doc.to_dict(
include_meta=False, skip_empty=skip_empty
)
elif doc is not None:
if validate: # pragma: no cover
doc.full_clean()
action = doc.to_dict(include_meta=True, skip_empty=skip_empty)
if "_index" not in action:
action["_index"] = i
return action

return await async_bulk(es, Generate(actions), **kwargs)
81 changes: 80 additions & 1 deletion elasticsearch_dsl/_sync/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@
# under the License.

import collections.abc
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
cast,
)

from elasticsearch.exceptions import NotFoundError, RequestError
from elasticsearch.helpers import bulk
from typing_extensions import Self, dataclass_transform

from .._sync.index import Index
Expand Down Expand Up @@ -432,3 +443,71 @@ def save(
setattr(self.meta, k, meta["_" + k])

return meta if return_doc_meta else meta["result"]

@classmethod
def bulk(
cls,
actions: Iterable[Union[Self, Dict[str, Any]]],
using: Optional[UsingType] = None,
index: Optional[str] = None,
validate: bool = True,
skip_empty: bool = True,
**kwargs: Any,
) -> Tuple[int, Union[int, List[Any]]]:
"""
Allows to perform multiple indexing operations in a single request.

:arg actions: a generator that returns document instances to be indexed,
bulk operation dictionaries.
:arg using: connection alias to use, defaults to ``'default'``
:arg index: elasticsearch index to use, if the ``Document`` is
associated with an index this can be omitted.
:arg validate: set to ``False`` to skip validating the documents
:arg skip_empty: if set to ``False`` will cause empty values (``None``,
``[]``, ``{}``) to be left on the document. Those values will be
stripped out otherwise as they make no difference in elasticsearch.

Any additional keyword arguments will be passed to
``Elasticsearch.bulk`` unchanged.

:return: bulk operation results
"""
es = cls._get_connection(using)

i = cls._default_index(index)
assert i is not None

class Generate:
def __init__(
self,
doc_iterator: Iterable[Union[Document, Dict[str, Any]]],
):
self.doc_iterator = doc_iterator.__iter__()

def __iter__(self) -> Self:
return self

def __next__(self) -> Dict[str, Any]:
doc: Optional[Union[Document, Dict[str, Any]]] = (
self.doc_iterator.__next__()
)

if isinstance(doc, dict):
action = doc
doc = None
if "_source" in action and isinstance(action["_source"], Document):
doc = action["_source"]
if validate: # pragma: no cover
doc.full_clean()
action["_source"] = doc.to_dict(
include_meta=False, skip_empty=skip_empty
)
elif doc is not None:
if validate: # pragma: no cover
doc.full_clean()
action = doc.to_dict(include_meta=True, skip_empty=skip_empty)
if "_index" not in action:
action["_index"] = i
return action

return bulk(es, Generate(actions), **kwargs)
92 changes: 91 additions & 1 deletion tests/test_integration/_async/test_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@

from datetime import datetime
from ipaddress import ip_address
from typing import Any
from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, List, Union

import pytest
from elasticsearch import AsyncElasticsearch, ConflictError, NotFoundError
from elasticsearch.helpers.errors import BulkIndexError
from pytest import raises
from pytz import timezone

Expand All @@ -49,6 +50,7 @@
RankFeatures,
Text,
analyzer,
mapped_field,
)
from elasticsearch_dsl.utils import AttrList

Expand Down Expand Up @@ -705,3 +707,91 @@ async def test_highlight_in_meta(async_data_client: AsyncElasticsearch) -> None:
assert "description" in commit.meta.highlight
assert isinstance(commit.meta.highlight["description"], AttrList)
assert len(commit.meta.highlight["description"]) > 0


@pytest.mark.asyncio
async def test_bulk(async_data_client: AsyncElasticsearch) -> None:
class Address(InnerDoc):
street: str
active: bool

class Doc(AsyncDocument):
if TYPE_CHECKING:
_id: int
name: str
age: int
languages: List[str] = mapped_field(Keyword())
addresses: List[Address]

class Index:
name = "bulk-index"

await Doc._index.delete(ignore_unavailable=True)
await Doc.init()

async def gen1() -> AsyncIterator[Union[Doc, Dict[str, Any]]]:
yield Doc(
name="Joe",
age=33,
languages=["en", "fr"],
addresses=[
Address(street="123 Main St", active=True),
Address(street="321 Park Dr.", active=False),
],
)
yield Doc(name="Susan", age=20, languages=["en"])
yield {"_op_type": "create", "_id": "45", "_source": Doc(name="Sarah", age=45)}

await Doc.bulk(gen1(), refresh=True)
docs = list(await Doc.search().execute())
assert len(docs) == 3
assert docs[0].to_dict() == {
"name": "Joe",
"age": 33,
"languages": [
"en",
"fr",
],
"addresses": [
{
"active": True,
"street": "123 Main St",
},
{
"active": False,
"street": "321 Park Dr.",
},
],
}
assert docs[1].to_dict() == {
"name": "Susan",
"age": 20,
"languages": ["en"],
}
assert docs[2].to_dict() == {
"name": "Sarah",
"age": 45,
}
assert docs[2].meta.id == "45"

async def gen2() -> AsyncIterator[Union[Doc, Dict[str, Any]]]:
yield {"_op_type": "create", "_id": "45", "_source": Doc(name="Sarah", age=45)}

# a "create" action with an existing id should fail
with raises(BulkIndexError):
await Doc.bulk(gen2(), refresh=True)

async def gen3() -> AsyncIterator[Union[Doc, Dict[str, Any]]]:
yield Doc(_id="45", name="Sarah", age=45, languages=["es"])
yield {"_op_type": "delete", "_id": docs[1].meta.id}

await Doc.bulk(gen3(), refresh=True)
with raises(NotFoundError):
await Doc.get(docs[1].meta.id)
doc = await Doc.get("45")
assert doc is not None
assert (doc).to_dict() == {
"name": "Sarah",
"age": 45,
"languages": ["es"],
}
Loading
Loading