diff --git a/README.md b/README.md index 5c9b8cbd..c32b301f 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ Some examples require extra dependencies. See each sample's directory for specif while running. * [hello_signal](hello/hello_signal.py) - Send signals to a workflow. -* [activity_sticky_queue](activity_sticky_queue) - Uses unique task queues to ensure activities run on specific workers. +* [activity_sticky_queue](activity_sticky_queues) - Uses unique task queues to ensure activities run on specific workers. * [activity_worker](activity_worker) - Use Python activities from a workflow in another language. * [custom_converter](custom_converter) - Use a custom payload converter to handle custom types. * [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity. @@ -63,6 +63,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. +* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. ## Test diff --git a/poetry.lock b/poetry.lock index d3ee4cac..1aa4dc7a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.4.0 and should not be changed by hand. [[package]] name = "aiohttp" @@ -886,23 +886,6 @@ deprecated = ">=1.2.6" importlib-metadata = ">=6.0.0,<6.1.0" setuptools = ">=16.0" -[[package]] -name = "opentelemetry-exporter-jaeger-thrift" -version = "1.18.0" -description = "Jaeger Thrift Exporter for OpenTelemetry" -category = "dev" -optional = false -python-versions = ">=3.7" -files = [ - {file = "opentelemetry_exporter_jaeger_thrift-1.18.0-py3-none-any.whl", hash = "sha256:fd90f858c5bb0ab214f8862f90a5196dc83821ae37d9ac731d0162d945ef5ca3"}, - {file = "opentelemetry_exporter_jaeger_thrift-1.18.0.tar.gz", hash = "sha256:39accc3a5ffe601afa5ca4184dc5d4da3b1a266038ea10fadfb97bf25551f9d5"}, -] - -[package.dependencies] -opentelemetry-api = ">=1.3,<2.0" -opentelemetry-sdk = ">=1.11,<2.0" -thrift = ">=0.10.0" - [[package]] name = "opentelemetry-exporter-otlp-proto-common" version = "1.18.0" @@ -1271,18 +1254,18 @@ files = [ [[package]] name = "temporalio" -version = "1.2.0" +version = "1.3.0" description = "Temporal.io Python SDK" category = "main" optional = false python-versions = ">=3.7,<4.0" files = [ - {file = "temporalio-1.2.0-cp37-abi3-macosx_10_9_x86_64.whl", hash = "sha256:66c7e3e072eed1c40ef42c693eb47fb9024bb7ac0f31c34d3eada796d43e9728"}, - {file = "temporalio-1.2.0-cp37-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:1359d65ff302d2d446d830aade8939727e32418909fb8b0cf2490891ad2ef0be"}, - {file = "temporalio-1.2.0-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:d6a1dd159b968ec85589ab5d20c5899bf4a19c5ac736f30364b1b3fd85115f82"}, - {file = "temporalio-1.2.0-cp37-abi3-win_amd64.whl", hash = "sha256:60f4332531922387b72a306d8db350526185aa941d670fd1169daa641dd61ff4"}, - {file = "temporalio-1.2.0-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:0872bad3d694322140f8513e1bf3da1cbce057acec0ffd10aeefdc1ac05a3605"}, - {file = "temporalio-1.2.0.tar.gz", hash = "sha256:bb8d68d4a7c3208bd98df082c42fad3e909a218ca425ad5546208e88472a1bcf"}, + {file = "temporalio-1.3.0-cp37-abi3-macosx_10_9_x86_64.whl", hash = "sha256:174c4bd116df56ad9e07f7ca12fc4153af48df0bf95ae7de9753169af7db7ac7"}, + {file = "temporalio-1.3.0-cp37-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:5d169e0a82f014d59b628a9a51a24baed78c770ebb6abe85bdd17ca50a9abffe"}, + {file = "temporalio-1.3.0-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:3ba8ab00b1a7a9444220f19e49557659901a00901a11eb9787886f2a19539699"}, + {file = "temporalio-1.3.0-cp37-abi3-win_amd64.whl", hash = "sha256:6f98de2414167e341cf16d7d9aab4ef3fe6ea5381867cd2dfa85564a5109d17b"}, + {file = "temporalio-1.3.0-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:8dd07c5e6bd3b5c8ef211b6761d9a00f647481b703b43931e8ff161e4a59e23d"}, + {file = "temporalio-1.3.0.tar.gz", hash = "sha256:0eab4916ec9383d59364ab764e6b7e1aa7df8189c5ade969de4bac4e121c95fd"}, ] [package.dependencies] @@ -1297,25 +1280,6 @@ typing-extensions = ">=4.2.0,<5.0.0" grpc = ["grpcio (>=1.48.0,<2.0.0)"] opentelemetry = ["opentelemetry-api (>=1.11.1,<2.0.0)", "opentelemetry-sdk (>=1.11.1,<2.0.0)"] -[[package]] -name = "thrift" -version = "0.16.0" -description = "Python bindings for the Apache Thrift RPC system" -category = "dev" -optional = false -python-versions = "*" -files = [ - {file = "thrift-0.16.0.tar.gz", hash = "sha256:2b5b6488fcded21f9d312aa23c9ff6a0195d0f6ae26ddbd5ad9e3e25dfc14408"}, -] - -[package.dependencies] -six = ">=1.7.2" - -[package.extras] -all = ["tornado (>=4.0)", "twisted"] -tornado = ["tornado (>=4.0)"] -twisted = ["twisted"] - [[package]] name = "tomli" version = "2.0.1" @@ -1597,4 +1561,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.7" -content-hash = "46df4d045bef1fdf5fa313c04089afc719426bf90b837dd26b1ecab016775e8c" +content-hash = "7344c1d148fc54b9812875e8d7e5d93adc9848c8085040e0f6d7f806e7034548" diff --git a/pyproject.toml b/pyproject.toml index be4d9990..63840b25 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ packages = [ [tool.poetry.dependencies] python = "^3.7" -temporalio = "^1.1.0" +temporalio = "^1.3.0" [tool.poetry.dev-dependencies] black = "^22.3.0" diff --git a/worker_versioning/README.md b/worker_versioning/README.md new file mode 100644 index 00000000..04f958bf --- /dev/null +++ b/worker_versioning/README.md @@ -0,0 +1,12 @@ +# Worker Versioning Sample + +This sample shows you how you can use the [Worker Versioning](https://docs.temporal.io/workers#worker-versioning) +feature to deploy incompatible changes to workflow code more easily. + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory: + + poetry run python example.py + +This will add some Build IDs to a Task Queue, and will also run Workers with those versions to show how you can +mark add versions, mark them as compatible (or not) with one another, and run Workers at specific versions. You'll +see that only the workers only process Workflow Tasks assigned versions they are compatible with. diff --git a/worker_versioning/__init__.py b/worker_versioning/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/worker_versioning/activities.py b/worker_versioning/activities.py new file mode 100644 index 00000000..4115e0fd --- /dev/null +++ b/worker_versioning/activities.py @@ -0,0 +1,11 @@ +from temporalio import activity + + +@activity.defn +async def greet(inp: str) -> str: + return f"Hi from {inp}" + + +@activity.defn +async def super_greet(inp: str, some_number: int) -> str: + return f"Hi from {inp} with {some_number}" diff --git a/worker_versioning/example.py b/worker_versioning/example.py new file mode 100644 index 00000000..97354303 --- /dev/null +++ b/worker_versioning/example.py @@ -0,0 +1,116 @@ +import asyncio +import uuid + +from temporalio.client import BuildIdOpAddNewCompatible, BuildIdOpAddNewDefault, Client +from temporalio.worker import Worker + +from worker_versioning.activities import greet, super_greet +from worker_versioning.workflow_v1 import MyWorkflow as MyWorkflowV1 +from worker_versioning.workflow_v1_1 import MyWorkflow as MyWorkflowV1_1 +from worker_versioning.workflow_v2 import MyWorkflow as MyWorkflowV2 + + +async def main(): + client = await Client.connect("localhost:7233") + task_queue = f"worker-versioning-{uuid.uuid4()}" + + # Start a 1.0 worker + async with Worker( + client, + task_queue=task_queue, + workflows=[MyWorkflowV1], + activities=[greet, super_greet], + build_id="1.0", + use_worker_versioning=True, + ): + # Add 1.0 as the default version for the queue + await client.update_worker_build_id_compatibility( + task_queue, BuildIdOpAddNewDefault("1.0") + ) + + # Start a workflow which will run on the 1.0 worker + handle = await client.start_workflow( + MyWorkflowV1.run, + task_queue=task_queue, + id=f"worker-versioning-v1-{uuid.uuid4()}", + ) + # Signal the workflow to proceed + await handle.signal(MyWorkflowV1.proceeder, "go") + + # Give a chance for the worker to process the signal + # TODO Better? + await asyncio.sleep(1) + + # Add 1.1 as the default version for the queue, compatible with 1.0 + await client.update_worker_build_id_compatibility( + task_queue, BuildIdOpAddNewCompatible("1.1", "1.0") + ) + + # Stop the old worker, and start a 1.1 worker. We do this to speed along the example, since the + # 1.0 worker may continue to process tasks briefly after we make 1.1 the new default. + async with Worker( + client, + task_queue=task_queue, + workflows=[MyWorkflowV1_1], + activities=[greet, super_greet], + build_id="1.1", + use_worker_versioning=True, + ): + # Continue driving the workflow. Take note that the new version of the workflow run by the 1.1 + # worker is the one that takes over! You might see a workflow task timeout, if the 1.0 worker is + # processing a task as the version update happens. That's normal. + await handle.signal(MyWorkflowV1.proceeder, "go") + + # Add a new *incompatible* version to the task queue, which will become the new overall default for the queue. + await client.update_worker_build_id_compatibility( + task_queue, BuildIdOpAddNewDefault("2.0") + ) + + # Start a 2.0 worker + async with Worker( + client, + task_queue=task_queue, + workflows=[MyWorkflowV2], + activities=[greet, super_greet], + build_id="2.0", + use_worker_versioning=True, + ): + # Start a new workflow. Note that it will run on the new 2.0 version, without the client invocation changing + # at all! Note here we can use `MyWorkflowV1.run` because the signature of the workflow has not changed. + handle2 = await client.start_workflow( + MyWorkflowV1.run, + task_queue=task_queue, + id=f"worker-versioning-v2-{uuid.uuid4()}", + ) + + # Drive both workflows once more before concluding them. The first workflow will continue running on the 1.1 + # worker. + await handle.signal(MyWorkflowV1.proceeder, "go") + await handle2.signal(MyWorkflowV1.proceeder, "go") + await handle.signal(MyWorkflowV1.proceeder, "finish") + await handle2.signal(MyWorkflowV1.proceeder, "finish") + + # Wait for both workflows to complete + await handle.result() + await handle2.result() + + # Lastly we'll demonstrate how you can use the gRPC api to determine if certain build IDs are ready to be + # retired. There's more information in the documentation, but here's a quick example that shows us how to + # tell when the 1.0 worker can be retired: + + # There is a 5 minute buffer before we will consider IDs no longer reachable by new workflows, to + # account for replication in multi-cluster setups. Uncomment the following line to wait long enough to see + # the 1.0 worker become unreachable. + # await asyncio.sleep(60 * 5) + reachability = await client.get_worker_task_reachability( + build_ids=["2.0", "1.0", "1.1"] + ) + + if not reachability.build_id_reachability["1.0"].task_queue_reachability[ + task_queue + ]: + print("1.0 is ready to be retired!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/worker_versioning/workflow_v1.py b/worker_versioning/workflow_v1.py new file mode 100644 index 00000000..1cef9095 --- /dev/null +++ b/worker_versioning/workflow_v1.py @@ -0,0 +1,27 @@ +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from worker_versioning.activities import greet + + +@workflow.defn +class MyWorkflow: + """The 1.0 version of the workflow we'll be making changes to""" + + should_finish: bool = False + + @workflow.run + async def run(self) -> str: + workflow.logger.info("Running workflow V1") + await workflow.wait_condition(lambda: self.should_finish) + return "Concluded workflow on V1" + + @workflow.signal + async def proceeder(self, inp: str): + await workflow.execute_activity( + greet, "V1", start_to_close_timeout=timedelta(seconds=5) + ) + if inp == "finish": + self.should_finish = True diff --git a/worker_versioning/workflow_v1_1.py b/worker_versioning/workflow_v1_1.py new file mode 100644 index 00000000..e2f22943 --- /dev/null +++ b/worker_versioning/workflow_v1_1.py @@ -0,0 +1,45 @@ +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from worker_versioning.activities import greet, super_greet + + +@workflow.defn +class MyWorkflow: + """ + The 1.1 version of the workflow, which is compatible with the first version. + + The compatible changes we've made are: + - Altering the log lines + - Using the `patched` API to properly introduce branching behavior while maintaining + compatibility + """ + + should_finish: bool = False + + @workflow.run + async def run(self) -> str: + workflow.logger.info("Running workflow V1.1") + await workflow.wait_condition(lambda: self.should_finish) + return "Concluded workflow on V1.1" + + @workflow.signal + async def proceeder(self, inp: str): + if workflow.patched("different-activity"): + await workflow.execute_activity( + super_greet, + args=["V1.1", 100], + start_to_close_timeout=timedelta(seconds=5), + ) + else: + # Note it is a valid compatible change to alter the input to an activity. However, because + # we're using the patched API, this branch would only be taken if the workflow was started on + # a v1 worker. + await workflow.execute_activity( + greet, "V1.1", start_to_close_timeout=timedelta(seconds=5) + ) + + if inp == "finish": + self.should_finish = True diff --git a/worker_versioning/workflow_v2.py b/worker_versioning/workflow_v2.py new file mode 100644 index 00000000..dcb0a20e --- /dev/null +++ b/worker_versioning/workflow_v2.py @@ -0,0 +1,36 @@ +import asyncio +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from worker_versioning.activities import greet + + +@workflow.defn +class MyWorkflow: + """ + The 2.0 version of the workflow, which is fully incompatible with the other workflows, since it + alters the sequence of commands without using `patched`. + """ + + should_finish: bool = False + + @workflow.run + async def run(self) -> str: + workflow.logger.info("Running workflow V2") + await workflow.wait_condition(lambda: self.should_finish) + return "Concluded workflow on V2" + + @workflow.signal + async def proceeder(self, inp: str): + await asyncio.sleep(1) + await workflow.execute_activity( + greet, "V2", start_to_close_timeout=timedelta(seconds=5) + ) + await workflow.execute_activity( + greet, "V2", start_to_close_timeout=timedelta(seconds=5) + ) + + if inp == "finish": + self.should_finish = True