diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c554f9d4..9f964a34 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,7 @@ jobs: with: python-version: ${{ matrix.python }} - run: python -m pip install --upgrade wheel poetry poethepoet - - run: poetry install + - run: poetry install --with pydantic - run: poe lint - run: poe test -s -o log_cli_level=DEBUG - run: poe test -s -o log_cli_level=DEBUG --workflow-environment time-skipping diff --git a/README.md b/README.md index 268e727b..8bc36863 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity. * [encryption](encryption) - Apply end-to-end encryption for all input/output. * [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry. +* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [sentry](sentry) - Report errors to Sentry. ## Test diff --git a/poetry.lock b/poetry.lock index a82bbeac..6b51ef9f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -380,6 +380,21 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +[[package]] +name = "pydantic" +version = "1.10.4" +description = "Data validation and settings management using python type hints" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +typing-extensions = ">=4.2.0" + +[package.extras] +dotenv = ["python-dotenv (>=0.10.4)"] +email = ["email-validator (>=1.0.3)"] + [[package]] name = "pyparsing" version = "3.0.9" @@ -609,7 +624,7 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools" [metadata] lock-version = "1.1" python-versions = "^3.7" -content-hash = "8a2f56bce06a187e2b34cee8c69042025c980c47a3869a713f51d15dc50e8da8" +content-hash = "f47fea5c8855978e7f42535db6f68571393ce360a0a9c30492ce44eb0f93cdf9" [metadata.files] aiohttp = [ @@ -1077,6 +1092,44 @@ pycparser = [ {file = "pycparser-2.21-py2.py3-none-any.whl", hash = "sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9"}, {file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"}, ] +pydantic = [ + {file = "pydantic-1.10.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b5635de53e6686fe7a44b5cf25fcc419a0d5e5c1a1efe73d49d48fe7586db854"}, + {file = "pydantic-1.10.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:6dc1cc241440ed7ca9ab59d9929075445da6b7c94ced281b3dd4cfe6c8cff817"}, + {file = "pydantic-1.10.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:51bdeb10d2db0f288e71d49c9cefa609bca271720ecd0c58009bd7504a0c464c"}, + {file = "pydantic-1.10.4-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:78cec42b95dbb500a1f7120bdf95c401f6abb616bbe8785ef09887306792e66e"}, + {file = "pydantic-1.10.4-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:8775d4ef5e7299a2f4699501077a0defdaac5b6c4321173bcb0f3c496fbadf85"}, + {file = "pydantic-1.10.4-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:572066051eeac73d23f95ba9a71349c42a3e05999d0ee1572b7860235b850cc6"}, + {file = "pydantic-1.10.4-cp310-cp310-win_amd64.whl", hash = "sha256:7feb6a2d401f4d6863050f58325b8d99c1e56f4512d98b11ac64ad1751dc647d"}, + {file = "pydantic-1.10.4-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:39f4a73e5342b25c2959529f07f026ef58147249f9b7431e1ba8414a36761f53"}, + {file = "pydantic-1.10.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:983e720704431a6573d626b00662eb78a07148c9115129f9b4351091ec95ecc3"}, + {file = "pydantic-1.10.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:75d52162fe6b2b55964fbb0af2ee58e99791a3138588c482572bb6087953113a"}, + {file = "pydantic-1.10.4-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fdf8d759ef326962b4678d89e275ffc55b7ce59d917d9f72233762061fd04a2d"}, + {file = "pydantic-1.10.4-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:05a81b006be15655b2a1bae5faa4280cf7c81d0e09fcb49b342ebf826abe5a72"}, + {file = "pydantic-1.10.4-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d88c4c0e5c5dfd05092a4b271282ef0588e5f4aaf345778056fc5259ba098857"}, + {file = "pydantic-1.10.4-cp311-cp311-win_amd64.whl", hash = "sha256:6a05a9db1ef5be0fe63e988f9617ca2551013f55000289c671f71ec16f4985e3"}, + {file = "pydantic-1.10.4-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:887ca463c3bc47103c123bc06919c86720e80e1214aab79e9b779cda0ff92a00"}, + {file = "pydantic-1.10.4-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fdf88ab63c3ee282c76d652fc86518aacb737ff35796023fae56a65ced1a5978"}, + {file = "pydantic-1.10.4-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a48f1953c4a1d9bd0b5167ac50da9a79f6072c63c4cef4cf2a3736994903583e"}, + {file = "pydantic-1.10.4-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:a9f2de23bec87ff306aef658384b02aa7c32389766af3c5dee9ce33e80222dfa"}, + {file = "pydantic-1.10.4-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:cd8702c5142afda03dc2b1ee6bc358b62b3735b2cce53fc77b31ca9f728e4bc8"}, + {file = "pydantic-1.10.4-cp37-cp37m-win_amd64.whl", hash = "sha256:6e7124d6855b2780611d9f5e1e145e86667eaa3bd9459192c8dc1a097f5e9903"}, + {file = "pydantic-1.10.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:0b53e1d41e97063d51a02821b80538053ee4608b9a181c1005441f1673c55423"}, + {file = "pydantic-1.10.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:55b1625899acd33229c4352ce0ae54038529b412bd51c4915349b49ca575258f"}, + {file = "pydantic-1.10.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:301d626a59edbe5dfb48fcae245896379a450d04baeed50ef40d8199f2733b06"}, + {file = "pydantic-1.10.4-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b6f9d649892a6f54a39ed56b8dfd5e08b5f3be5f893da430bed76975f3735d15"}, + {file = "pydantic-1.10.4-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:d7b5a3821225f5c43496c324b0d6875fde910a1c2933d726a743ce328fbb2a8c"}, + {file = "pydantic-1.10.4-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:f2f7eb6273dd12472d7f218e1fef6f7c7c2f00ac2e1ecde4db8824c457300416"}, + {file = "pydantic-1.10.4-cp38-cp38-win_amd64.whl", hash = "sha256:4b05697738e7d2040696b0a66d9f0a10bec0efa1883ca75ee9e55baf511909d6"}, + {file = "pydantic-1.10.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:a9a6747cac06c2beb466064dda999a13176b23535e4c496c9d48e6406f92d42d"}, + {file = "pydantic-1.10.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:eb992a1ef739cc7b543576337bebfc62c0e6567434e522e97291b251a41dad7f"}, + {file = "pydantic-1.10.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:990406d226dea0e8f25f643b370224771878142155b879784ce89f633541a024"}, + {file = "pydantic-1.10.4-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2e82a6d37a95e0b1b42b82ab340ada3963aea1317fd7f888bb6b9dfbf4fff57c"}, + {file = "pydantic-1.10.4-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:9193d4f4ee8feca58bc56c8306bcb820f5c7905fd919e0750acdeeeef0615b28"}, + {file = "pydantic-1.10.4-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2b3ce5f16deb45c472dde1a0ee05619298c864a20cded09c4edd820e1454129f"}, + {file = "pydantic-1.10.4-cp39-cp39-win_amd64.whl", hash = "sha256:9cbdc268a62d9a98c56e2452d6c41c0263d64a2009aac69246486f01b4f594c4"}, + {file = "pydantic-1.10.4-py3-none-any.whl", hash = "sha256:4948f264678c703f3877d1c8877c4e3b2e12e549c57795107f08cf70c6ec7774"}, + {file = "pydantic-1.10.4.tar.gz", hash = "sha256:b9a3859f24eb4e097502a3be1fb4b2abb79b6103dd9e2e0edb70613a4459a648"}, +] pyparsing = [ {file = "pyparsing-3.0.9-py3-none-any.whl", hash = "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc"}, {file = "pyparsing-3.0.9.tar.gz", hash = "sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb"}, diff --git a/pydantic_converter/README.md b/pydantic_converter/README.md new file mode 100644 index 00000000..5914fa3d --- /dev/null +++ b/pydantic_converter/README.md @@ -0,0 +1,31 @@ +# Pydantic Converter Sample + +This sample shows how to create a custom Pydantic converter to properly serialize Pydantic models. + +For this sample, the optional `pydantic` dependency group must be included. To include, run: + + poetry install --with pydantic + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the +worker: + + poetry run python worker.py + +This will start the worker. Then, in another terminal, run the following to execute the workflow: + + poetry run python starter.py + +In the worker terminal, the workflow and its activity will log that it received the Pydantic models. In the starter +terminal, the Pydantic models in the workflow result will be logged. + +### Notes + +This is the preferred way to use Pydantic models with Temporal Python SDK. The converter code is small and meant to +embed into other projects. + +This sample also demonstrates use of `datetime` inside of Pydantic models. Due to a known issue with the Temporal +sandbox, this class is seen by Pydantic as `date` instead of `datetime` upon deserialization. This is due to a +[known Python issue](https://github.com/python/cpython/issues/89010) where, when we proxy the `datetime` class in the +sandbox to prevent non-deterministic calls like `now()`, `issubclass` fails for the proxy type causing Pydantic to think +it's a `date` instead. In `worker.py`, we have shown a workaround of disabling restrictions on `datetime` which solves +this issue but no longer protects against workflow developers making non-deterministic calls in that module. \ No newline at end of file diff --git a/pydantic_converter/__init__.py b/pydantic_converter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pydantic_converter/converter.py b/pydantic_converter/converter.py new file mode 100644 index 00000000..a3c1cee6 --- /dev/null +++ b/pydantic_converter/converter.py @@ -0,0 +1,56 @@ +import json +from typing import Any, Optional + +from pydantic.json import pydantic_encoder +from temporalio.api.common.v1 import Payload +from temporalio.converter import ( + CompositePayloadConverter, + DataConverter, + DefaultPayloadConverter, + JSONPlainPayloadConverter, +) + + +class PydanticJSONPayloadConverter(JSONPlainPayloadConverter): + """Pydantic JSON payload converter. + + This extends the :py:class:`JSONPlainPayloadConverter` to override + :py:meth:`to_payload` using the Pydantic encoder. + """ + + def to_payload(self, value: Any) -> Optional[Payload]: + """Convert all values with Pydantic encoder or fail. + + Like the base class, we fail if we cannot convert. This payload + converter is expected to be the last in the chain, so it can fail if + unable to convert. + """ + # We let JSON conversion errors be thrown to caller + return Payload( + metadata={"encoding": self.encoding.encode()}, + data=json.dumps( + value, separators=(",", ":"), sort_keys=True, default=pydantic_encoder + ).encode(), + ) + + +class PydanticPayloadConverter(CompositePayloadConverter): + """Payload converter that replaces Temporal JSON conversion with Pydantic + JSON conversion. + """ + + def __init__(self) -> None: + super().__init__( + *( + c + if not isinstance(c, JSONPlainPayloadConverter) + else PydanticJSONPayloadConverter() + for c in DefaultPayloadConverter.default_encoding_payload_converters + ) + ) + + +pydantic_data_converter = DataConverter( + payload_converter_class=PydanticPayloadConverter +) +"""Data converter using Pydantic JSON conversion.""" diff --git a/pydantic_converter/starter.py b/pydantic_converter/starter.py new file mode 100644 index 00000000..5aceca6e --- /dev/null +++ b/pydantic_converter/starter.py @@ -0,0 +1,39 @@ +import asyncio +import logging +from datetime import datetime +from ipaddress import IPv4Address + +from temporalio.client import Client + +from pydantic_converter.converter import pydantic_data_converter +from pydantic_converter.worker import MyPydanticModel, MyWorkflow + + +async def main(): + logging.basicConfig(level=logging.INFO) + # Connect client using the Pydantic converter + client = await Client.connect( + "localhost:7233", data_converter=pydantic_data_converter + ) + + # Run workflow + result = await client.execute_workflow( + MyWorkflow.run, + [ + MyPydanticModel( + some_ip=IPv4Address("127.0.0.1"), + some_date=datetime(2000, 1, 2, 3, 4, 5), + ), + MyPydanticModel( + some_ip=IPv4Address("127.0.0.2"), + some_date=datetime(2001, 2, 3, 4, 5, 6), + ), + ], + id=f"pydantic_converter-workflow-id", + task_queue="pydantic_converter-task-queue", + ) + logging.info("Got models from client: %s" % result) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pydantic_converter/worker.py b/pydantic_converter/worker.py new file mode 100644 index 00000000..b7e0bedf --- /dev/null +++ b/pydantic_converter/worker.py @@ -0,0 +1,98 @@ +import asyncio +import dataclasses +import logging +from datetime import datetime, timedelta +from ipaddress import IPv4Address +from typing import List + +from temporalio import activity, workflow +from temporalio.client import Client +from temporalio.worker import Worker +from temporalio.worker.workflow_sandbox import ( + SandboxedWorkflowRunner, + SandboxRestrictions, +) + +# We always want to pass through external modules to the sandbox that we know +# are safe for workflow use +with workflow.unsafe.imports_passed_through(): + from pydantic import BaseModel + + from pydantic_converter.converter import pydantic_data_converter + + +class MyPydanticModel(BaseModel): + some_ip: IPv4Address + some_date: datetime + + +@activity.defn +async def my_activity(models: List[MyPydanticModel]) -> List[MyPydanticModel]: + activity.logger.info("Got models in activity: %s" % models) + return models + + +@workflow.defn +class MyWorkflow: + @workflow.run + async def run(self, models: List[MyPydanticModel]) -> List[MyPydanticModel]: + workflow.logger.info("Got models in workflow: %s" % models) + return await workflow.execute_activity( + my_activity, models, start_to_close_timeout=timedelta(minutes=1) + ) + + +# Due to known issues with Pydantic's use of issubclass and our inability to +# override the check in sandbox, Pydantic will think datetime is actually date +# in the sandbox. At the expense of protecting against datetime.now() use in +# workflows, we're going to remove datetime module restrictions. See sdk-python +# README's discussion of known sandbox issues for more details. +def new_sandbox_runner() -> SandboxedWorkflowRunner: + # TODO(cretz): Use with_child_unrestricted when https://github.com/temporalio/sdk-python/issues/254 + # is fixed and released + invalid_module_member_children = dict( + SandboxRestrictions.invalid_module_members_default.children + ) + del invalid_module_member_children["datetime"] + return SandboxedWorkflowRunner( + restrictions=dataclasses.replace( + SandboxRestrictions.default, + invalid_module_members=dataclasses.replace( + SandboxRestrictions.invalid_module_members_default, + children=invalid_module_member_children, + ), + ) + ) + + +interrupt_event = asyncio.Event() + + +async def main(): + logging.basicConfig(level=logging.INFO) + # Connect client using the Pydantic converter + client = await Client.connect( + "localhost:7233", data_converter=pydantic_data_converter + ) + + # Run a worker for the workflow + async with Worker( + client, + task_queue="pydantic_converter-task-queue", + workflows=[MyWorkflow], + activities=[my_activity], + workflow_runner=new_sandbox_runner(), + ): + # Wait until interrupted + print("Worker started, ctrl+c to exit") + await interrupt_event.wait() + print("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/pyproject.toml b/pyproject.toml index 57c1c6fe..bf621774 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,10 @@ optional = true temporalio = { version = "*", extras = ["opentelemetry"] } opentelemetry-exporter-jaeger-thrift = "^1.13.0" +[tool.poetry.group.pydantic] +optional = true +dependencies = { pydantic = "^1.10.4" } + [tool.poetry.group.sentry] optional = true dependencies = { sentry-sdk = "^1.11.0" } @@ -46,7 +50,7 @@ dependencies = { sentry-sdk = "^1.11.0" } [tool.poe.tasks] format = [{cmd = "black ."}, {cmd = "isort ."}] lint = [{cmd = "black --check ."}, {cmd = "isort --check-only ."}, {ref = "lint-types" }] -lint-types = "mypy --check-untyped-defs ." +lint-types = "mypy --check-untyped-defs --namespace-packages ." test = "pytest" [build-system] diff --git a/tests/pydantic_converter/__init__.py b/tests/pydantic_converter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/pydantic_converter/workflow_test.py b/tests/pydantic_converter/workflow_test.py new file mode 100644 index 00000000..a547caf0 --- /dev/null +++ b/tests/pydantic_converter/workflow_test.py @@ -0,0 +1,46 @@ +import uuid +from datetime import datetime +from ipaddress import IPv4Address + +from temporalio.client import Client +from temporalio.worker import Worker + +from pydantic_converter.converter import pydantic_data_converter +from pydantic_converter.worker import ( + MyPydanticModel, + MyWorkflow, + my_activity, + new_sandbox_runner, +) + + +async def test_workflow_with_pydantic_model(client: Client): + # Replace data converter in client + new_config = client.config() + new_config["data_converter"] = pydantic_data_converter + client = Client(**new_config) + task_queue_name = str(uuid.uuid4()) + + orig_models = [ + MyPydanticModel( + some_ip=IPv4Address("127.0.0.1"), some_date=datetime(2000, 1, 2, 3, 4, 5) + ), + MyPydanticModel( + some_ip=IPv4Address("127.0.0.2"), some_date=datetime(2001, 2, 3, 4, 5, 6) + ), + ] + + async with Worker( + client, + task_queue=task_queue_name, + workflows=[MyWorkflow], + activities=[my_activity], + workflow_runner=new_sandbox_runner(), + ): + result = await client.execute_workflow( + MyWorkflow.run, + orig_models, + id=str(uuid.uuid4()), + task_queue=task_queue_name, + ) + assert orig_models == result