-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathconverter.py
56 lines (45 loc) · 1.72 KB
/
converter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import json
from typing import Any, Optional
from pydantic.json import pydantic_encoder
from temporalio.api.common.v1 import Payload
from temporalio.converter import (
CompositePayloadConverter,
DataConverter,
DefaultPayloadConverter,
JSONPlainPayloadConverter,
)
class PydanticJSONPayloadConverter(JSONPlainPayloadConverter):
"""Pydantic JSON payload converter.
This extends the :py:class:`JSONPlainPayloadConverter` to override
:py:meth:`to_payload` using the Pydantic encoder.
"""
def to_payload(self, value: Any) -> Optional[Payload]:
"""Convert all values with Pydantic encoder or fail.
Like the base class, we fail if we cannot convert. This payload
converter is expected to be the last in the chain, so it can fail if
unable to convert.
"""
# We let JSON conversion errors be thrown to caller
return Payload(
metadata={"encoding": self.encoding.encode()},
data=json.dumps(
value, separators=(",", ":"), sort_keys=True, default=pydantic_encoder
).encode(),
)
class PydanticPayloadConverter(CompositePayloadConverter):
"""Payload converter that replaces Temporal JSON conversion with Pydantic
JSON conversion.
"""
def __init__(self) -> None:
super().__init__(
*(
c
if not isinstance(c, JSONPlainPayloadConverter)
else PydanticJSONPayloadConverter()
for c in DefaultPayloadConverter.default_encoding_payload_converters
)
)
pydantic_data_converter = DataConverter(
payload_converter_class=PydanticPayloadConverter
)
"""Data converter using Pydantic JSON conversion."""