Skip to content

Commit

Permalink
More Python samples (#65)
Browse files Browse the repository at this point in the history
Fixes #53
Fixes #58
Fixes #59
  • Loading branch information
cretz authored Mar 29, 2023
1 parent 867a870 commit 4a602ff
Show file tree
Hide file tree
Showing 16 changed files with 476 additions and 0 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Some examples require extra dependencies. See each sample's directory for specif
* [hello_activity](hello/hello_activity.py) - Execute an activity from a workflow.
* [hello_activity_choice](hello/hello_activity_choice.py) - Execute certain activities inside a workflow based on
dynamic input.
* [hello_activity_method](hello/hello_activity_method.py) - Demonstrate an activity that is an instance method on a
class and can access class state.
* [hello_activity_multiprocess](hello/hello_activity_multiprocess.py) - Execute a synchronous activity on a process
pool.
* [hello_activity_retry](hello/hello_activity_retry.py) - Demonstrate activity retry by failing until a certain number
Expand Down Expand Up @@ -55,6 +57,8 @@ Some examples require extra dependencies. See each sample's directory for specif
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [sentry](sentry) - Report errors to Sentry.

Expand Down
2 changes: 2 additions & 0 deletions hello/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Replace `hello_activity.py` in the command with any other example filename to r
* [hello_activity](hello_activity.py) - Execute an activity from a workflow.
* [hello_activity_choice](hello_activity_choice.py) - Execute certain activities inside a workflow based on dynamic
input.
* [hello_activity_method](hello/hello_activity_method.py) - Demonstrate an activity that is an instance method on a
class and can access class state.
* [hello_activity_multiprocess](hello_activity_multiprocess.py) - Execute a synchronous activity on a process pool.
* [hello_activity_retry](hello_activity_retry.py) - Demonstrate activity retry by failing until a certain number of
attempts.
Expand Down
62 changes: 62 additions & 0 deletions hello/hello_activity_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import asyncio
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker


class MyDatabaseClient:
async def run_database_update(self) -> None:
print("Database update executed")


class MyActivities:
def __init__(self, db_client: MyDatabaseClient) -> None:
self.db_client = db_client

@activity.defn
async def do_database_thing(self) -> None:
await self.db_client.run_database_update()


@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
await workflow.execute_activity_method(
MyActivities.do_database_thing,
start_to_close_timeout=timedelta(seconds=10),
)


async def main():
# Start client
client = await Client.connect("localhost:7233")

# Create our database client that can then be used in the activity
db_client = MyDatabaseClient()
# Instantiate our class containing state that can be referenced from
# activity methods
my_activities = MyActivities(db_client)

# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-method-task-queue",
workflows=[MyWorkflow],
activities=[my_activities.do_database_thing],
):

# While the worker is running, use the client to run the workflow and
# print out its result. Note, in many production setups, the client
# would be in a completely separate process from the worker.
await client.execute_workflow(
MyWorkflow.run,
id="hello-activity-method-workflow-id",
task_queue="hello-activity-method-task-queue",
)


if __name__ == "__main__":
asyncio.run(main())
97 changes: 97 additions & 0 deletions patching/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Patching Sample

This sample shows how to safely alter a workflow using `patched` and `deprecate_patch` in stages.

To run, first see [README.md](../README.md) for prerequisites. Then follow the patching stages below.

### Stage 1 - Initial code

This stage is for existing running workflows. To simulate our initial workflow, run the worker in a separate terminal:

poetry run python worker.py --workflow initial

Now we can start this workflow:

poetry run python starter.py --start-workflow initial-workflow-id

This will output "Started workflow with ID initial-workflow-id and ...". Now query this workflow:

poetry run python starter.py --query-workflow initial-workflow-id

This will output "Query result for ID initial-workflow-id: pre-patch".

### Stage 2 - Patch the workflow

This stage is for needing to run old and new workflows at the same time. To simulate our patched workflow, stop the
worker from before and start it again with the patched workflow:

poetry run python worker.py --workflow patched

Now let's start another workflow with this patched code:

poetry run python starter.py --start-workflow patched-workflow-id

This will output "Started workflow with ID patched-workflow-id and ...". Now query the old workflow that's still
running:

poetry run python starter.py --query-workflow initial-workflow-id

This will output "Query result for ID initial-workflow-id: pre-patch" since it is pre-patch. But if we execute a query
against the new code:

poetry run python starter.py --query-workflow patched-workflow-id

We get "Query result for ID patched-workflow-id: post-patch". This is how old workflow code can take old paths and new
workflow code can take new paths.

### Stage 3 - Deprecation

Once we know that all workflows that started with the initial code from "Stage 1" are no longer running, we don't need
the patch so we can deprecate it. To use the patch deprecated workflow, stop the workflow from before and start it again
with:

poetry run python worker.py --workflow patch-deprecated

All workflows in "Stage 2" and any new workflows will work. Now let's start another workflow with this patch deprecated
code:

poetry run python starter.py --start-workflow patch-deprecated-workflow-id

This will output "Started workflow with ID patch-deprecated-workflow-id and ...". Now query the patched workflow that's
still running:

poetry run python starter.py --query-workflow patched-workflow-id

This will output "Query result for ID patched-workflow-id: post-patch". And if we execute a query against the latest
workflow:

poetry run python starter.py --query-workflow patch-deprecated-workflow-id

As expected, this will output "Query result for ID patch-deprecated-workflow-id: post-patch".

### Stage 4 - Patch complete

Once we know we don't even have any workflows running on "Stage 2" or before (i.e. the workflow with the patch with
both code paths), we can just remove the patch deprecation altogether. To use the patch complete workflow, stop the
workflow from before and start it again with:

poetry run python worker.py --workflow patch-complete

All workflows in "Stage 3" and any new workflows will work. Now let's start another workflow with this patch complete
code:

poetry run python starter.py --start-workflow patch-complete-workflow-id

This will output "Started workflow with ID patch-complete-workflow-id and ...". Now query the patch deprecated workflow
that's still running:

poetry run python starter.py --query-workflow patch-deprecated-workflow-id

This will output "Query result for ID patch-deprecated-workflow-id: post-patch". And if we execute a query against the
latest workflow:

poetry run python starter.py --query-workflow patch-complete-workflow-id

As expected, this will output "Query result for ID patch-complete-workflow-id: post-patch".

Following these stages, we have successfully altered our workflow code.
Empty file added patching/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions patching/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from temporalio import activity


@activity.defn
async def pre_patch_activity() -> str:
return "pre-patch"


@activity.defn
async def post_patch_activity() -> str:
return "post-patch"
34 changes: 34 additions & 0 deletions patching/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import argparse
import asyncio

from temporalio.client import Client

# Since it's just used for typing purposes, it doesn't matter which one we
# import
from patching.workflow_1_initial import MyWorkflow


async def main():
parser = argparse.ArgumentParser(description="Run worker")
parser.add_argument("--start-workflow", help="Start workflow with this ID")
parser.add_argument("--query-workflow", help="Query workflow with this ID")
args = parser.parse_args()
if not args.start_workflow and not args.query_workflow:
raise RuntimeError("Either --start-workflow or --query-workflow is required")

# Connect client
client = await Client.connect("localhost:7233")

if args.start_workflow:
handle = await client.start_workflow(
MyWorkflow.run, id=args.start_workflow, task_queue="patching-task-queue"
)
print(f"Started workflow with ID {handle.id} and run ID {handle.result_run_id}")
if args.query_workflow:
handle = client.get_workflow_handle_for(MyWorkflow.run, args.query_workflow)
result = await handle.query(MyWorkflow.result)
print(f"Query result for ID {handle.id}: {result}")


if __name__ == "__main__":
asyncio.run(main())
54 changes: 54 additions & 0 deletions patching/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import argparse
import asyncio

from temporalio.client import Client
from temporalio.worker import Worker

from patching.activities import post_patch_activity, pre_patch_activity

interrupt_event = asyncio.Event()


async def main():
# Import which workflow based on CLI arg
parser = argparse.ArgumentParser(description="Run worker")
parser.add_argument(
"--workflow",
help="Which workflow. Can be 'initial', 'patched', 'patch-deprecated', or 'patch-complete'",
required=True,
)
args = parser.parse_args()
if args.workflow == "initial":
from patching.workflow_1_initial import MyWorkflow
elif args.workflow == "patched":
from patching.workflow_2_patched import MyWorkflow # type: ignore
elif args.workflow == "patch-deprecated":
from patching.workflow_3_patch_deprecated import MyWorkflow # type: ignore
elif args.workflow == "patch-complete":
from patching.workflow_4_patch_complete import MyWorkflow # type: ignore
else:
raise RuntimeError("Unrecognized workflow")

# Connect client
client = await Client.connect("localhost:7233")

# Run a worker for the workflow
async with Worker(
client,
task_queue="patching-task-queue",
workflows=[MyWorkflow],
activities=[pre_patch_activity, post_patch_activity],
):
# Wait until interrupted
print("Worker started")
await interrupt_event.wait()
print("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
20 changes: 20 additions & 0 deletions patching/workflow_1_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from patching.activities import pre_patch_activity


@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
pre_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)

@workflow.query
def result(self) -> str:
return self._result
26 changes: 26 additions & 0 deletions patching/workflow_2_patched.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from patching.activities import post_patch_activity, pre_patch_activity


@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
if workflow.patched("my-patch"):
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
else:
self._result = await workflow.execute_activity(
pre_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)

@workflow.query
def result(self) -> str:
return self._result
21 changes: 21 additions & 0 deletions patching/workflow_3_patch_deprecated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from patching.activities import post_patch_activity


@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
workflow.deprecate_patch("my-patch")
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)

@workflow.query
def result(self) -> str:
return self._result
20 changes: 20 additions & 0 deletions patching/workflow_4_patch_complete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from patching.activities import post_patch_activity


@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)

@workflow.query
def result(self) -> str:
return self._result
Loading

0 comments on commit 4a602ff

Please sign in to comment.