Skip to content

Commit

Permalink
Add support for bulk document operations
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Jul 10, 2024
1 parent 8a110b0 commit 8c5b1f0
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 4 deletions.
84 changes: 83 additions & 1 deletion elasticsearch_dsl/_async/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@
# under the License.

import collections.abc
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
from typing import (
TYPE_CHECKING,
Any,
AsyncIterable,
Dict,
List,
Optional,
Tuple,
Union,
cast,
)

from elasticsearch.exceptions import NotFoundError, RequestError
from elasticsearch.helpers import async_bulk
from typing_extensions import Self, dataclass_transform

from .._async.index import AsyncIndex
Expand Down Expand Up @@ -438,3 +449,74 @@ async def save(
setattr(self.meta, k, meta["_" + k])

return meta if return_doc_meta else meta["result"]

@classmethod
async def bulk(
cls,
actions: AsyncIterable[Union[Self, Dict[str, Any]]],
using: Optional[AsyncUsingType] = None,
index: Optional[str] = None,
validate: bool = True,
skip_empty: bool = True,
**kwargs: Any,
) -> Tuple[int, Union[int, List[Any]]]:
"""
Allows to perform multiple indexing operations in a single request.
:arg actions: a generator that returns document instances to be indexed,
bulk operation dictionaries.
:arg using: connection alias to use, defaults to ``'default'``
:arg index: elasticsearch index to use, if the ``Document`` is
associated with an index this can be omitted.
:arg validate: set to ``False`` to skip validating the documents
:arg skip_empty: if set to ``False`` will cause empty values (``None``,
``[]``, ``{}``) to be left on the document. Those values will be
stripped out otherwise as they make no difference in elasticsearch.
Any additional keyword arguments will be passed to
``Elasticsearch.bulk`` unchanged.
:return: bulk operation results
"""
es = cls._get_connection(using)

i = cls._default_index(index)
assert i is not None

class Generate:
def __init__(
self,
doc_iterator: AsyncIterable[Union[AsyncDocument, Dict[str, Any]]],
):
self.doc_iterator = doc_iterator.__aiter__()

def __aiter__(self) -> Self:
return self

async def __anext__(self) -> Dict[str, Any]:
doc = await self.doc_iterator.__anext__()

if isinstance(doc, dict):
action = doc
doc: Optional[AsyncDocument] = None
if "_source" in action and isinstance(
action["_source"], AsyncDocument
):
doc = action["_source"]
if validate: # pragma: no cover
doc.full_clean()
action["_source"] = doc.to_dict(
include_meta=False, skip_empty=skip_empty
)
else:
if validate: # pragma: no cover
doc.full_clean()
action = doc.to_dict(include_meta=True, skip_empty=skip_empty)
if "_index" not in action:
action["_index"] = i
# if doc is not None and doc.meta.id is not None and "_id" not in action:
# action["_id"] = doc.meta.id
print(action)
return action

return await async_bulk(es, Generate(actions), **kwargs)
82 changes: 81 additions & 1 deletion elasticsearch_dsl/_sync/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@
# under the License.

import collections.abc
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
cast,
)

from elasticsearch.exceptions import NotFoundError, RequestError
from elasticsearch.helpers import bulk
from typing_extensions import Self, dataclass_transform

from .._sync.index import Index
Expand Down Expand Up @@ -432,3 +443,72 @@ def save(
setattr(self.meta, k, meta["_" + k])

return meta if return_doc_meta else meta["result"]

@classmethod
def bulk(
cls,
actions: Iterable[Union[Self, Dict[str, Any]]],
using: Optional[UsingType] = None,
index: Optional[str] = None,
validate: bool = True,
skip_empty: bool = True,
**kwargs: Any,
) -> Tuple[int, Union[int, List[Any]]]:
"""
Allows to perform multiple indexing operations in a single request.
:arg actions: a generator that returns document instances to be indexed,
bulk operation dictionaries.
:arg using: connection alias to use, defaults to ``'default'``
:arg index: elasticsearch index to use, if the ``Document`` is
associated with an index this can be omitted.
:arg validate: set to ``False`` to skip validating the documents
:arg skip_empty: if set to ``False`` will cause empty values (``None``,
``[]``, ``{}``) to be left on the document. Those values will be
stripped out otherwise as they make no difference in elasticsearch.
Any additional keyword arguments will be passed to
``Elasticsearch.bulk`` unchanged.
:return: bulk operation results
"""
es = cls._get_connection(using)

i = cls._default_index(index)
assert i is not None

class Generate:
def __init__(
self,
doc_iterator: Iterable[Union[Document, Dict[str, Any]]],
):
self.doc_iterator = doc_iterator.__iter__()

def __iter__(self) -> Self:
return self

def __next__(self) -> Dict[str, Any]:
doc = self.doc_iterator.__next__()

if isinstance(doc, dict):
action = doc
doc: Optional[Document] = None
if "_source" in action and isinstance(action["_source"], Document):
doc = action["_source"]
if validate: # pragma: no cover
doc.full_clean()
action["_source"] = doc.to_dict(
include_meta=False, skip_empty=skip_empty
)
else:
if validate: # pragma: no cover
doc.full_clean()
action = doc.to_dict(include_meta=True, skip_empty=skip_empty)
if "_index" not in action:
action["_index"] = i
# if doc is not None and doc.meta.id is not None and "_id" not in action:
# action["_id"] = doc.meta.id
print(action)
return action

return bulk(es, Generate(actions), **kwargs)
90 changes: 89 additions & 1 deletion tests/test_integration/_async/test_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@

from datetime import datetime
from ipaddress import ip_address
from typing import Any
from typing import TYPE_CHECKING, Any, List

import pytest
from elasticsearch import AsyncElasticsearch, ConflictError, NotFoundError
from elasticsearch.helpers.errors import BulkIndexError
from pytest import raises
from pytz import timezone

Expand All @@ -49,6 +50,7 @@
RankFeatures,
Text,
analyzer,
mapped_field,
)
from elasticsearch_dsl.utils import AttrList

Expand Down Expand Up @@ -705,3 +707,89 @@ async def test_highlight_in_meta(async_data_client: AsyncElasticsearch) -> None:
assert "description" in commit.meta.highlight
assert isinstance(commit.meta.highlight["description"], AttrList)
assert len(commit.meta.highlight["description"]) > 0


@pytest.mark.asyncio
async def test_bulk(async_data_client: AsyncElasticsearch) -> None:
class Address(InnerDoc):
street: str
active: bool

class Doc(AsyncDocument):
if TYPE_CHECKING:
_id: int
name: str
age: int
languages: List[str] = mapped_field(Keyword())
addresses: List[Address]

class Index:
name = "bulk-index"

await Doc._index.delete(ignore_unavailable=True)
await Doc.init()

async def gen1():
yield Doc(
name="Joe",
age=33,
languages=["en", "fr"],
addresses=[
Address(street="123 Main St", active=True),
Address(street="321 Park Dr.", active=False),
],
)
yield Doc(name="Susan", age=20, languages=["en"])
yield {"_op_type": "create", "_id": "45", "_source": Doc(name="Sarah", age=45)}

await Doc.bulk(gen1(), refresh=True)
docs = list(await Doc.search().execute())
assert len(docs) == 3
assert docs[0].to_dict() == {
"name": "Joe",
"age": 33,
"languages": [
"en",
"fr",
],
"addresses": [
{
"active": True,
"street": "123 Main St",
},
{
"active": False,
"street": "321 Park Dr.",
},
],
}
assert docs[1].to_dict() == {
"name": "Susan",
"age": 20,
"languages": ["en"],
}
assert docs[2].to_dict() == {
"name": "Sarah",
"age": 45,
}
assert docs[2].meta.id == "45"

async def gen2():
yield {"_op_type": "create", "_id": "45", "_source": Doc(name="Sarah", age=45)}

# a "create" action with an existing id should fail
with raises(BulkIndexError):
await Doc.bulk(gen2(), refresh=True)

async def gen3():
yield Doc(_id="45", name="Sarah", age=45, languages=["es"])
yield {"_op_type": "delete", "_id": docs[1].meta.id}

await Doc.bulk(gen3(), refresh=True)
with raises(NotFoundError):
await Doc.get(docs[1].meta.id)
assert (await Doc.get("45")).to_dict() == {
"name": "Sarah",
"age": 45,
"languages": ["es"],
}
Loading

0 comments on commit 8c5b1f0

Please sign in to comment.