Skip to content

Commit

Permalink
Worker Versioning sample (#78)
Browse files Browse the repository at this point in the history
* Upgrade sdk version to 1.3.0
  • Loading branch information
Sushisource authored Jul 24, 2023
1 parent 53c6063 commit 2cb0fdd
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 47 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Some examples require extra dependencies. See each sample's directory for specif
while running.
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
<!-- Keep this list in alphabetical order -->
* [activity_sticky_queue](activity_sticky_queue) - Uses unique task queues to ensure activities run on specific workers.
* [activity_sticky_queue](activity_sticky_queues) - Uses unique task queues to ensure activities run on specific workers.
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
Expand All @@ -63,6 +63,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.

## Test

Expand Down
54 changes: 9 additions & 45 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ packages = [

[tool.poetry.dependencies]
python = "^3.7"
temporalio = "^1.1.0"
temporalio = "^1.3.0"

[tool.poetry.dev-dependencies]
black = "^22.3.0"
Expand Down
12 changes: 12 additions & 0 deletions worker_versioning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Worker Versioning Sample

This sample shows you how you can use the [Worker Versioning](https://docs.temporal.io/workers#worker-versioning)
feature to deploy incompatible changes to workflow code more easily.

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory:

poetry run python example.py

This will add some Build IDs to a Task Queue, and will also run Workers with those versions to show how you can
mark add versions, mark them as compatible (or not) with one another, and run Workers at specific versions. You'll
see that only the workers only process Workflow Tasks assigned versions they are compatible with.
Empty file added worker_versioning/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions worker_versioning/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from temporalio import activity


@activity.defn
async def greet(inp: str) -> str:
return f"Hi from {inp}"


@activity.defn
async def super_greet(inp: str, some_number: int) -> str:
return f"Hi from {inp} with {some_number}"
116 changes: 116 additions & 0 deletions worker_versioning/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import asyncio
import uuid

from temporalio.client import BuildIdOpAddNewCompatible, BuildIdOpAddNewDefault, Client
from temporalio.worker import Worker

from worker_versioning.activities import greet, super_greet
from worker_versioning.workflow_v1 import MyWorkflow as MyWorkflowV1
from worker_versioning.workflow_v1_1 import MyWorkflow as MyWorkflowV1_1
from worker_versioning.workflow_v2 import MyWorkflow as MyWorkflowV2


async def main():
client = await Client.connect("localhost:7233")
task_queue = f"worker-versioning-{uuid.uuid4()}"

# Start a 1.0 worker
async with Worker(
client,
task_queue=task_queue,
workflows=[MyWorkflowV1],
activities=[greet, super_greet],
build_id="1.0",
use_worker_versioning=True,
):
# Add 1.0 as the default version for the queue
await client.update_worker_build_id_compatibility(
task_queue, BuildIdOpAddNewDefault("1.0")
)

# Start a workflow which will run on the 1.0 worker
handle = await client.start_workflow(
MyWorkflowV1.run,
task_queue=task_queue,
id=f"worker-versioning-v1-{uuid.uuid4()}",
)
# Signal the workflow to proceed
await handle.signal(MyWorkflowV1.proceeder, "go")

# Give a chance for the worker to process the signal
# TODO Better?
await asyncio.sleep(1)

# Add 1.1 as the default version for the queue, compatible with 1.0
await client.update_worker_build_id_compatibility(
task_queue, BuildIdOpAddNewCompatible("1.1", "1.0")
)

# Stop the old worker, and start a 1.1 worker. We do this to speed along the example, since the
# 1.0 worker may continue to process tasks briefly after we make 1.1 the new default.
async with Worker(
client,
task_queue=task_queue,
workflows=[MyWorkflowV1_1],
activities=[greet, super_greet],
build_id="1.1",
use_worker_versioning=True,
):
# Continue driving the workflow. Take note that the new version of the workflow run by the 1.1
# worker is the one that takes over! You might see a workflow task timeout, if the 1.0 worker is
# processing a task as the version update happens. That's normal.
await handle.signal(MyWorkflowV1.proceeder, "go")

# Add a new *incompatible* version to the task queue, which will become the new overall default for the queue.
await client.update_worker_build_id_compatibility(
task_queue, BuildIdOpAddNewDefault("2.0")
)

# Start a 2.0 worker
async with Worker(
client,
task_queue=task_queue,
workflows=[MyWorkflowV2],
activities=[greet, super_greet],
build_id="2.0",
use_worker_versioning=True,
):
# Start a new workflow. Note that it will run on the new 2.0 version, without the client invocation changing
# at all! Note here we can use `MyWorkflowV1.run` because the signature of the workflow has not changed.
handle2 = await client.start_workflow(
MyWorkflowV1.run,
task_queue=task_queue,
id=f"worker-versioning-v2-{uuid.uuid4()}",
)

# Drive both workflows once more before concluding them. The first workflow will continue running on the 1.1
# worker.
await handle.signal(MyWorkflowV1.proceeder, "go")
await handle2.signal(MyWorkflowV1.proceeder, "go")
await handle.signal(MyWorkflowV1.proceeder, "finish")
await handle2.signal(MyWorkflowV1.proceeder, "finish")

# Wait for both workflows to complete
await handle.result()
await handle2.result()

# Lastly we'll demonstrate how you can use the gRPC api to determine if certain build IDs are ready to be
# retired. There's more information in the documentation, but here's a quick example that shows us how to
# tell when the 1.0 worker can be retired:

# There is a 5 minute buffer before we will consider IDs no longer reachable by new workflows, to
# account for replication in multi-cluster setups. Uncomment the following line to wait long enough to see
# the 1.0 worker become unreachable.
# await asyncio.sleep(60 * 5)
reachability = await client.get_worker_task_reachability(
build_ids=["2.0", "1.0", "1.1"]
)

if not reachability.build_id_reachability["1.0"].task_queue_reachability[
task_queue
]:
print("1.0 is ready to be retired!")


if __name__ == "__main__":
asyncio.run(main())
27 changes: 27 additions & 0 deletions worker_versioning/workflow_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from worker_versioning.activities import greet


@workflow.defn
class MyWorkflow:
"""The 1.0 version of the workflow we'll be making changes to"""

should_finish: bool = False

@workflow.run
async def run(self) -> str:
workflow.logger.info("Running workflow V1")
await workflow.wait_condition(lambda: self.should_finish)
return "Concluded workflow on V1"

@workflow.signal
async def proceeder(self, inp: str):
await workflow.execute_activity(
greet, "V1", start_to_close_timeout=timedelta(seconds=5)
)
if inp == "finish":
self.should_finish = True
45 changes: 45 additions & 0 deletions worker_versioning/workflow_v1_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from worker_versioning.activities import greet, super_greet


@workflow.defn
class MyWorkflow:
"""
The 1.1 version of the workflow, which is compatible with the first version.
The compatible changes we've made are:
- Altering the log lines
- Using the `patched` API to properly introduce branching behavior while maintaining
compatibility
"""

should_finish: bool = False

@workflow.run
async def run(self) -> str:
workflow.logger.info("Running workflow V1.1")
await workflow.wait_condition(lambda: self.should_finish)
return "Concluded workflow on V1.1"

@workflow.signal
async def proceeder(self, inp: str):
if workflow.patched("different-activity"):
await workflow.execute_activity(
super_greet,
args=["V1.1", 100],
start_to_close_timeout=timedelta(seconds=5),
)
else:
# Note it is a valid compatible change to alter the input to an activity. However, because
# we're using the patched API, this branch would only be taken if the workflow was started on
# a v1 worker.
await workflow.execute_activity(
greet, "V1.1", start_to_close_timeout=timedelta(seconds=5)
)

if inp == "finish":
self.should_finish = True
36 changes: 36 additions & 0 deletions worker_versioning/workflow_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from worker_versioning.activities import greet


@workflow.defn
class MyWorkflow:
"""
The 2.0 version of the workflow, which is fully incompatible with the other workflows, since it
alters the sequence of commands without using `patched`.
"""

should_finish: bool = False

@workflow.run
async def run(self) -> str:
workflow.logger.info("Running workflow V2")
await workflow.wait_condition(lambda: self.should_finish)
return "Concluded workflow on V2"

@workflow.signal
async def proceeder(self, inp: str):
await asyncio.sleep(1)
await workflow.execute_activity(
greet, "V2", start_to_close_timeout=timedelta(seconds=5)
)
await workflow.execute_activity(
greet, "V2", start_to_close_timeout=timedelta(seconds=5)
)

if inp == "finish":
self.should_finish = True

0 comments on commit 2cb0fdd

Please sign in to comment.