Skip to content

Commit

Permalink
Update pydantic samples to use SDK contrib module (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison authored Feb 19, 2025
1 parent 81b5098 commit 3bd017d
Show file tree
Hide file tree
Showing 16 changed files with 1,787 additions and 1,501 deletions.
14 changes: 11 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ jobs:
# Using fixed Poetry version until
# https://github.com/python-poetry/poetry/pull/7694 is fixed
- run: python -m pip install --upgrade wheel "poetry==1.4.0" poethepoet
- run: poetry install --with pydantic --with dsl --with encryption --with trio_async
- run: poetry install --with pydantic_converter --with dsl --with encryption --with trio_async
- run: poe lint
- run: mkdir junit-xml
- run: poe test -s -o log_cli_level=DEBUG --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml
- run: poe test -s -o log_cli_level=DEBUG --workflow-environment time-skipping --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--time-skipping.xml
- run: poe test -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml
- run: poe test -s --workflow-environment time-skipping --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--time-skipping.xml
# This must remain the last step since it downgrades pydantic
- name: Uninstall pydantic
shell: bash
run: |
echo y | poetry run pip uninstall pydantic
echo y | poetry run pip uninstall pydantic-core
poetry run pip install pydantic==1.10
poe test -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--pydantic-v1.xml tests/pydantic_converter_v1/workflow_test.py
# On latest, run gevent test
- name: Gevent test
Expand Down
2,966 changes: 1,536 additions & 1,430 deletions poetry.lock

Large diffs are not rendered by default.

18 changes: 3 additions & 15 deletions pydantic_converter/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Pydantic Converter Sample

This sample shows how to create a custom Pydantic converter to properly serialize Pydantic models.
This sample shows how to use the Pydantic data converter.

For this sample, the optional `pydantic` dependency group must be included. To include, run:
For this sample, the optional `pydantic_converter` dependency group must be included. To include, run:

poetry install --with pydantic
poetry install --with pydantic_converter

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:
Expand All @@ -17,15 +17,3 @@ This will start the worker. Then, in another terminal, run the following to exec

In the worker terminal, the workflow and its activity will log that it received the Pydantic models. In the starter
terminal, the Pydantic models in the workflow result will be logged.

### Notes

This is the preferred way to use Pydantic models with Temporal Python SDK. The converter code is small and meant to
embed into other projects.

This sample also demonstrates use of `datetime` inside of Pydantic models. Due to a known issue with the Temporal
sandbox, this class is seen by Pydantic as `date` instead of `datetime` upon deserialization. This is due to a
[known Python issue](https://github.com/python/cpython/issues/89010) where, when we proxy the `datetime` class in the
sandbox to prevent non-deterministic calls like `now()`, `issubclass` fails for the proxy type causing Pydantic to think
it's a `date` instead. In `worker.py`, we have shown a workaround of disabling restrictions on `datetime` which solves
this issue but no longer protects against workflow developers making non-deterministic calls in that module.
4 changes: 2 additions & 2 deletions pydantic_converter/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from ipaddress import IPv4Address

from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter

from pydantic_converter.converter import pydantic_data_converter
from pydantic_converter.worker import MyPydanticModel, MyWorkflow


Expand All @@ -29,7 +29,7 @@ async def main():
some_date=datetime(2001, 2, 3, 4, 5, 6),
),
],
id=f"pydantic_converter-workflow-id",
id="pydantic_converter-workflow-id",
task_queue="pydantic_converter-task-queue",
)
logging.info("Got models from client: %s" % result)
Expand Down
36 changes: 3 additions & 33 deletions pydantic_converter/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import dataclasses
import logging
from datetime import datetime, timedelta
from ipaddress import IPv4Address
Expand All @@ -8,17 +7,12 @@
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import (
SandboxedWorkflowRunner,
SandboxRestrictions,
)

# We always want to pass through external modules to the sandbox that we know
# are safe for workflow use
# Always pass through external modules to the sandbox that you know are safe for
# workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel

from pydantic_converter.converter import pydantic_data_converter
from temporalio.contrib.pydantic import pydantic_data_converter


class MyPydanticModel(BaseModel):
Expand All @@ -42,29 +36,6 @@ async def run(self, models: List[MyPydanticModel]) -> List[MyPydanticModel]:
)


# Due to known issues with Pydantic's use of issubclass and our inability to
# override the check in sandbox, Pydantic will think datetime is actually date
# in the sandbox. At the expense of protecting against datetime.now() use in
# workflows, we're going to remove datetime module restrictions. See sdk-python
# README's discussion of known sandbox issues for more details.
def new_sandbox_runner() -> SandboxedWorkflowRunner:
# TODO(cretz): Use with_child_unrestricted when https://github.com/temporalio/sdk-python/issues/254
# is fixed and released
invalid_module_member_children = dict(
SandboxRestrictions.invalid_module_members_default.children
)
del invalid_module_member_children["datetime"]
return SandboxedWorkflowRunner(
restrictions=dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=dataclasses.replace(
SandboxRestrictions.invalid_module_members_default,
children=invalid_module_member_children,
),
)
)


interrupt_event = asyncio.Event()


Expand All @@ -81,7 +52,6 @@ async def main():
task_queue="pydantic_converter-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
workflow_runner=new_sandbox_runner(),
):
# Wait until interrupted
print("Worker started, ctrl+c to exit")
Expand Down
31 changes: 31 additions & 0 deletions pydantic_converter_v1/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Pydantic v1 Converter Sample

**This sample shows how to use Pydantic v1 with Temporal. This is not recommended: use Pydantic v2 if possible, and use the
main [pydantic_converter](../pydantic_converter/README.md) sample.**

To install, run:

poetry install --with pydantic_converter
poetry run pip uninstall pydantic pydantic-core
poetry run pip install pydantic==1.10

To run, first see the root [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute the workflow:

poetry run python starter.py

In the worker terminal, the workflow and its activity will log that it received the Pydantic models. In the starter
terminal, the Pydantic models in the workflow result will be logged.

### Notes

This sample also demonstrates use of `datetime` inside of Pydantic v1 models. Due to a known issue with the Temporal
sandbox, this class is seen by Pydantic v1 as `date` instead of `datetime` upon deserialization. This is due to a
[known Python issue](https://github.com/python/cpython/issues/89010) where, when we proxy the `datetime` class in the
sandbox to prevent non-deterministic calls like `now()`, `issubclass` fails for the proxy type causing Pydantic v1 to think
it's a `date` instead. In `worker.py`, we have shown a workaround of disabling restrictions on `datetime` which solves
this issue but no longer protects against workflow developers making non-deterministic calls in that module.
Empty file.
File renamed without changes.
39 changes: 39 additions & 0 deletions pydantic_converter_v1/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
import logging
from datetime import datetime
from ipaddress import IPv4Address

from temporalio.client import Client

from pydantic_converter_v1.converter import pydantic_data_converter
from pydantic_converter_v1.worker import MyPydanticModel, MyWorkflow


async def main():
logging.basicConfig(level=logging.INFO)
# Connect client using the Pydantic converter
client = await Client.connect(
"localhost:7233", data_converter=pydantic_data_converter
)

# Run workflow
result = await client.execute_workflow(
MyWorkflow.run,
[
MyPydanticModel(
some_ip=IPv4Address("127.0.0.1"),
some_date=datetime(2000, 1, 2, 3, 4, 5),
),
MyPydanticModel(
some_ip=IPv4Address("127.0.0.2"),
some_date=datetime(2001, 2, 3, 4, 5, 6),
),
],
id="pydantic_converter-workflow-id",
task_queue="pydantic_converter-task-queue",
)
logging.info("Got models from client: %s" % result)


if __name__ == "__main__":
asyncio.run(main())
98 changes: 98 additions & 0 deletions pydantic_converter_v1/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio
import dataclasses
import logging
from datetime import datetime, timedelta
from ipaddress import IPv4Address
from typing import List

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import (
SandboxedWorkflowRunner,
SandboxRestrictions,
)

# We always want to pass through external modules to the sandbox that we know
# are safe for workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel

from pydantic_converter_v1.converter import pydantic_data_converter


class MyPydanticModel(BaseModel):
some_ip: IPv4Address
some_date: datetime


@activity.defn
async def my_activity(models: List[MyPydanticModel]) -> List[MyPydanticModel]:
activity.logger.info("Got models in activity: %s" % models)
return models


@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, models: List[MyPydanticModel]) -> List[MyPydanticModel]:
workflow.logger.info("Got models in workflow: %s" % models)
return await workflow.execute_activity(
my_activity, models, start_to_close_timeout=timedelta(minutes=1)
)


# Due to known issues with Pydantic's use of issubclass and our inability to
# override the check in sandbox, Pydantic will think datetime is actually date
# in the sandbox. At the expense of protecting against datetime.now() use in
# workflows, we're going to remove datetime module restrictions. See sdk-python
# README's discussion of known sandbox issues for more details.
def new_sandbox_runner() -> SandboxedWorkflowRunner:
# TODO(cretz): Use with_child_unrestricted when https://github.com/temporalio/sdk-python/issues/254
# is fixed and released
invalid_module_member_children = dict(
SandboxRestrictions.invalid_module_members_default.children
)
del invalid_module_member_children["datetime"]
return SandboxedWorkflowRunner(
restrictions=dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=dataclasses.replace(
SandboxRestrictions.invalid_module_members_default,
children=invalid_module_member_children,
),
)
)


interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)
# Connect client using the Pydantic converter
client = await Client.connect(
"localhost:7233", data_converter=pydantic_data_converter
)

# Run a worker for the workflow
async with Worker(
client,
task_queue="pydantic_converter-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
workflow_runner=new_sandbox_runner(),
):
# Wait until interrupted
print("Worker started, ctrl+c to exit")
await interrupt_event.wait()
print("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
10 changes: 6 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ packages = [

[tool.poetry.dependencies]
python = "^3.9"
temporalio = "^1.9.0"
temporalio = "^1.10.0"

[tool.poetry.dev-dependencies]
black = "^22.3.0"
isort = "^5.10.1"
mypy = "^0.981"
mypy = "^1.4.1"
pytest = "^7.1.2"
pytest-asyncio = "^0.18.3"
frozenlist = "^1.4.0"
types-pyyaml = "^6.0.12.20241230"


# All sample-specific dependencies are in optional groups below, named after the
# sample they apply to
Expand Down Expand Up @@ -63,9 +65,9 @@ optional = true
temporalio = { version = "*", extras = ["opentelemetry"] }
opentelemetry-exporter-otlp-proto-grpc = "1.18.0"

[tool.poetry.group.pydantic]
[tool.poetry.group.pydantic_converter]
optional = true
dependencies = { pydantic = "^1.10.4" }
dependencies = { pydantic = "^2.10.6" }

[tool.poetry.group.sentry]
optional = true
Expand Down
12 changes: 8 additions & 4 deletions sentry/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ async def execute_activity(self, input: ExecuteActivityInput) -> Any:
try:
return await super().execute_activity(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
set_context("temporal.activity.input", asdict(input.args[0]))
if len(input.args) == 1:
[arg] = input.args
if is_dataclass(arg) and not isinstance(arg, type):
set_context("temporal.activity.input", asdict(arg))
set_context("temporal.activity.info", activity.info().__dict__)
capture_exception()
raise e
Expand All @@ -58,8 +60,10 @@ async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
try:
return await super().execute_workflow(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
set_context("temporal.workflow.input", asdict(input.args[0]))
if len(input.args) == 1:
[arg] = input.args
if is_dataclass(arg) and not isinstance(arg, type):
set_context("temporal.workflow.input", asdict(arg))
set_context("temporal.workflow.info", workflow.info().__dict__)

if not workflow.unsafe.is_replaying():
Expand Down
4 changes: 2 additions & 2 deletions tests/message_passing/safe_message_handlers/workflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ async def test_safe_message_handlers(client: Client, env: WorkflowEnvironment):

await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster)

result = await cluster_manager_handle.result()
assert result.num_currently_assigned_nodes == 0
cluster_manager_result = await cluster_manager_handle.result()
assert cluster_manager_result.num_currently_assigned_nodes == 0


async def test_update_idempotency(client: Client, env: WorkflowEnvironment):
Expand Down
Loading

0 comments on commit 3bd017d

Please sign in to comment.