Skip to content

Commit

Permalink
Ensure a strong reference to asyncio Task for auto-heartbeater (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfgeorge-vl authored Jan 12, 2023
1 parent af94df0 commit f21615a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
27 changes: 13 additions & 14 deletions custom_decorator/activity_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,27 @@ def auto_heartbeater(fn: F) -> F:
# available via our wrapper, so we use the functools wraps decorator
@wraps(fn)
async def wrapper(*args, **kwargs):
done = asyncio.Event()
# Heartbeat twice as often as the timeout
heartbeat_timeout = activity.info().heartbeat_timeout
heartbeat_task = None
if heartbeat_timeout:
asyncio.create_task(
heartbeat_every(heartbeat_timeout.total_seconds() / 2, done)
# Heartbeat twice as often as the timeout
heartbeat_task = asyncio.create_task(
heartbeat_every(heartbeat_timeout.total_seconds() / 2)
)
try:
return await fn(*args, **kwargs)
finally:
done.set()
if heartbeat_task:
heartbeat_task.cancel()
# Wait for heartbeat cancellation to complete
await asyncio.wait([heartbeat_task])

return cast(F, wrapper)


async def heartbeat_every(
delay: float, done_event: asyncio.Event, *details: Any
) -> None:
async def heartbeat_every(delay: float, *details: Any) -> None:
# Heartbeat every so often while not cancelled
while not done_event.is_set():
try:
await asyncio.wait_for(done_event.wait(), delay)
except asyncio.TimeoutError:
print(f"Heartbeating at {datetime.now()}")
activity.heartbeat(*details)
while True:
await asyncio.sleep(delay)
print(f"Heartbeating at {datetime.now()}")
activity.heartbeat(*details)
9 changes: 8 additions & 1 deletion hello/hello_async_activity_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ async def compose_greeting(self, input: ComposeGreetingInput) -> str:
# Schedule a task to complete this asynchronously. This could be done in
# a completely different process or system.
print("Completing activity asynchronously")
asyncio.create_task(self.complete_greeting(activity.info().task_token, input))
# Tasks stored by asyncio are weak references and therefore can get GC'd
# which can cause warnings like "Task was destroyed but it is pending!".
# So we store the tasks ourselves.
# See https://docs.python.org/3/library/asyncio-task.html#creating-tasks,
# https://bugs.python.org/issue21163 and others.
_ = asyncio.create_task(
self.complete_greeting(activity.info().task_token, input)
)

# Raise the complete-async error which will complete this function but
# does not consider the activity complete from the workflow perspective
Expand Down

0 comments on commit f21615a

Please sign in to comment.