Skip to content

Commit

Permalink
fix: update queue generation to be one sticky queue per worker host (#71
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Oracen authored May 25, 2023
1 parent d4e6c36 commit 82f5267
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 26 deletions.
9 changes: 4 additions & 5 deletions activity_sticky_queues/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
# Sticky Activity Queues

This sample is a Python implementation of the [TypeScript "Sticky Workers" example](https://github.com/temporalio/samples-typescript/tree/main/activities-sticky-queues), full credit for the design to the authors of that sample. A [sticky execution](https://docs.temporal.io/tasks#sticky-execution) is a job distribution design pattern where all workflow computational tasks are executed on a single worker. In the Go and Java SDKs this is explicitly supported via the Session option, but in other SDKs a different approach is required.
This sample is a Python implementation of the [TypeScript "Sticky Workers" example](https://github.com/temporalio/samples-typescript/tree/main/activities-sticky-queues), full credit for the design to the authors of that sample. A [sticky execution](https://docs.temporal.io/tasks#sticky-execution) is a job distribution design pattern where all workflow computational tasks are executed on a single worker. In the Go and Java SDKs this is explicitly supported via the Session option, but in other SDKs a different approach is required.

Typical use cases for sticky executions include tasks where interaction with a filesystem is required, such as data processing or interacting with legacy access structures. This example will write text files to folders corresponding to each worker, located in the `demo_fs` folder. In production, these folders would typically be independent machines in a worker cluster.

This strategy is:

- Create a `get_available_task_queue` activity that generates a unique task queue name, `unique_worker_task_queue`.
- For activities intended to be "sticky", only register them in one Worker, and have that be the only Worker listening on that `unique_worker_task_queue`. This will be run on a series of `FileProcessing` workflows.
- Execute workflows from the Client like normal. Check the Temporal Web UI to confirm tasks were staying with their respective worker.

It doesn't matter where the `get_available_task_queue` activity is run, so it can be "non sticky" as per Temporal default behavior. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045). Our example differs from the Node sample by running across 5 unique task queues.
It doesn't matter where the `get_available_task_queue` activity is run, so it can be "non sticky" as per Temporal default behavior. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045).

Activities have been artificially slowed with `time.sleep(3)` to simulate slow activities.

Expand All @@ -27,7 +28,7 @@ This will start the worker. Then, in another terminal, run the following to exec
#### Example output:

```bash
(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py
(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py
Output checksums:
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
Expand All @@ -48,5 +49,3 @@ All activities for the one workflow are running against the same task queue, whi
![image](./static/all-activitites-on-same-task-queue.png)

</details>


1 change: 1 addition & 0 deletions activity_sticky_queues/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async def get_available_task_queue() -> str:
async def download_file_to_worker_filesystem(details: DownloadObj) -> str:
"""Simulates downloading a file to a local filesystem"""
# FS ops
print(details.unique_worker_id, details.workflow_uuid)
path = create_filepath(details.unique_worker_id, details.workflow_uuid)
activity.logger.info(f"Downloading ${details.url} and saving to ${path}")

Expand Down
38 changes: 17 additions & 21 deletions activity_sticky_queues/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@ async def main():
random.seed(667)

# Create random task queues and build task queue selection function
task_queues: List[str] = [
f"activity_sticky_queue-host-{UUID(int=random.getrandbits(128))}"
for _ in range(5)
]
task_queue: str = f"activity_sticky_queue-host-{UUID(int=random.getrandbits(128))}"

@activity.defn(name="get_available_task_queue")
async def select_task_queue_random() -> str:
async def select_task_queue() -> str:
"""Randomly assign the job to a queue"""
return random.choice(task_queues)
return task_queue

# Start client
client = await Client.connect("localhost:7233")
Expand All @@ -40,25 +37,24 @@ async def select_task_queue_random() -> str:
client,
task_queue="activity_sticky_queue-distribution-queue",
workflows=[tasks.FileProcessing],
activities=[select_task_queue_random],
activities=[select_task_queue],
)
run_futures.append(handle.run())
print("Base worker started")

# Run the workers for the individual task queues
for queue_id in task_queues:
handle = Worker(
client,
task_queue=queue_id,
activities=[
tasks.download_file_to_worker_filesystem,
tasks.work_on_file_in_worker_filesystem,
tasks.clean_up_file_from_worker_filesystem,
],
)
run_futures.append(handle.run())
# Wait until interrupted
print(f"Worker {queue_id} started")
# Run unique task queue for this particular host
handle = Worker(
client,
task_queue=task_queue,
activities=[
tasks.download_file_to_worker_filesystem,
tasks.work_on_file_in_worker_filesystem,
tasks.clean_up_file_from_worker_filesystem,
],
)
run_futures.append(handle.run())
# Wait until interrupted
print(f"Worker {task_queue} started")

print("All workers started, ctrl+c to exit")
await asyncio.gather(*run_futures)
Expand Down

0 comments on commit 82f5267

Please sign in to comment.