-
Notifications
You must be signed in to change notification settings - Fork 141
/
Copy pathmotor.py
95 lines (78 loc) · 2.82 KB
/
motor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
__all__ = [
"paginate",
"paginate_aggregate",
]
from typing import TYPE_CHECKING, Any, Optional
from motor.core import AgnosticCollection
from typing_extensions import TypeAlias
from fastapi_pagination.api import apply_items_transformer, create_page
from fastapi_pagination.bases import AbstractParams
from fastapi_pagination.types import AdditionalData, AsyncItemsTransformer
from fastapi_pagination.utils import verify_params
if TYPE_CHECKING:
_AgnosticCollection: TypeAlias = AgnosticCollection[Any]
else:
_AgnosticCollection = AgnosticCollection
async def paginate(
collection: _AgnosticCollection,
query_filter: Optional[dict[Any, Any]] = None,
params: Optional[AbstractParams] = None,
sort: Optional[Any] = None,
*,
transformer: Optional[AsyncItemsTransformer] = None,
additional_data: Optional[AdditionalData] = None,
**kwargs: Any,
) -> Any:
params, raw_params = verify_params(params, "limit-offset")
query_filter = query_filter or {}
total = await collection.count_documents(query_filter) if raw_params.include_total else None
cursor = collection.find(query_filter, skip=raw_params.offset, limit=raw_params.limit, **kwargs)
if sort is not None:
cursor = cursor.sort(*sort) if isinstance(sort, tuple) else cursor.sort(sort)
items = await cursor.to_list(length=raw_params.limit)
t_items = await apply_items_transformer(items, transformer, async_=True)
return create_page(
t_items,
total=total,
params=params,
**(additional_data or {}),
)
async def paginate_aggregate(
collection: _AgnosticCollection,
aggregate_pipeline: Optional[list[dict[Any, Any]]] = None,
params: Optional[AbstractParams] = None,
*,
transformer: Optional[AsyncItemsTransformer] = None,
additional_data: Optional[AdditionalData] = None,
) -> Any:
params, raw_params = verify_params(params, "limit-offset")
aggregate_pipeline = aggregate_pipeline or []
paginate_data = []
if raw_params.limit is not None:
paginate_data.append({"$limit": raw_params.limit + (raw_params.offset or 0)})
if raw_params.offset is not None:
paginate_data.append({"$skip": raw_params.offset})
cursor = collection.aggregate(
[
*aggregate_pipeline,
{
"$facet": {
"metadata": [{"$count": "total"}],
"data": paginate_data,
},
},
],
)
data, *_ = await cursor.to_list(length=None)
items = data["data"]
try:
total = data["metadata"][0]["total"]
except IndexError:
total = 0
t_items = await apply_items_transformer(items, transformer, async_=True)
return create_page(
t_items,
total=total,
params=params,
**(additional_data or {}),
)