Skip to content

Commit

Permalink
Rename Sticky Activities sample (#75)
Browse files Browse the repository at this point in the history
Updated to worker_specific_task_queues
  • Loading branch information
lorensr authored Nov 12, 2023
1 parent 092ac9c commit ba5a87f
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 63 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ Some examples require extra dependencies. See each sample's directory for specif
while running.
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
<!-- Keep this list in alphabetical order -->
* [activity_sticky_queue](activity_sticky_queues) - Uses unique task queues to ensure activities run on specific workers.
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
Expand All @@ -65,6 +64,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.

## Test
Expand Down
51 changes: 0 additions & 51 deletions activity_sticky_queues/README.md

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pathlib import Path
from unittest import mock

from activity_sticky_queues import tasks
from worker_specific_task_queues import tasks

RETURNED_PATH = "valid/path"
tasks._get_delay_secs = mock.MagicMock(return_value=0.0001)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from activity_sticky_queues import tasks
from worker_specific_task_queues import tasks

CHECKSUM = "a checksum"
RETURNED_PATH = "valid/path"
Expand Down
56 changes: 56 additions & 0 deletions worker_specific_task_queues/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Worker-Specific Task Queues

Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker. In the Go SDK, this is explicitly supported via the Session option, but in other SDKs a different approach is required.

Typical use cases include tasks where interaction with a filesystem is required, such as data processing or interacting with legacy access structures. This example will write text files to folders corresponding to each worker, located in the `demo_fs` folder. In production, these folders would typically be independent machines in a worker cluster.

This strategy is:

- Each Worker process runs two `Worker`s:
- One `Worker` listens on the `worker_specific_task_queue-distribution-queue` Task Queue.
- Another `Worker` listens on a uniquely generated Task Queue.
- The Workflow and the first Activity are run on `worker_specific_task_queue-distribution-queue`.
- The first Activity returns one of the uniquely generated Task Queues (that only one Worker is listening on—i.e. the **Worker-specific Task Queue**).
- The rest of the Activities do the file processing and are run on the Worker-specific Task Queue.

Check the Temporal Web UI to confirm tasks were staying with their respective worker.

It doesn't matter where the `get_available_task_queue` activity is run, so it can be executed on the shared Task Queue. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045).

Activities have been artificially slowed with `time.sleep(3)` to simulate doing more work.

### Running This Sample

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute the workflow:

poetry run python starter.py

#### Example output:

```bash
(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py
Output checksums:
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
```

<details>
<summary>Checking the history to see where activities are run</summary>
All activities for the one workflow are running against the same task queue, which corresponds to unique workers:

![image](./static/all-activitites-on-same-task-queue.png)

</details>
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from temporalio.client import Client

from activity_sticky_queues.tasks import FileProcessing
from worker_specific_task_queues.tasks import FileProcessing


async def main():
Expand All @@ -15,8 +15,8 @@ async def main():
for idx in range(10):
result = client.execute_workflow(
FileProcessing.run,
id=f"activity_sticky_queue-workflow-id-{idx}",
task_queue="activity_sticky_queue-distribution-queue",
id=f"worker_specific_task_queue-workflow-id-{idx}",
task_queue="worker_specific_task_queue-distribution-queue",
)
await asyncio.sleep(0.1)
futures.append(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ class FileProcessing:
async def run(self) -> str:
"""Workflow implementing the basic file processing example.
First, a worker is selected randomly. This is the "sticky worker" on which
the workflow runs. This consists of a file download and some processing task,
with a file cleanup if an error occurs.
First, a task queue is selected randomly. A single worker is listening on
this queue, so when we execute all the file processing activities on this
queue, they will all be run on the same worker, and all be able to access
the same file on disk. The activities download the file, do some processing
task on the file, and clean up the file.
"""
workflow.logger.info("Searching for available worker")
unique_worker_task_queue = await workflow.execute_activity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from temporalio.client import Client
from temporalio.worker import Worker

from activity_sticky_queues import tasks
from worker_specific_task_queues import tasks

interrupt_event = asyncio.Event()

Expand All @@ -21,7 +21,9 @@ async def main():
random.seed(667)

# Create random task queues and build task queue selection function
task_queue: str = f"activity_sticky_queue-host-{UUID(int=random.getrandbits(128))}"
task_queue: str = (
f"worker_specific_task_queue-host-{UUID(int=random.getrandbits(128))}"
)

@activity.defn(name="get_available_task_queue")
async def select_task_queue() -> str:
Expand All @@ -35,7 +37,7 @@ async def select_task_queue() -> str:
run_futures = []
handle = Worker(
client,
task_queue="activity_sticky_queue-distribution-queue",
task_queue="worker_specific_task_queue-distribution-queue",
workflows=[tasks.FileProcessing],
activities=[select_task_queue],
)
Expand Down

0 comments on commit ba5a87f

Please sign in to comment.