Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow proper stack trace on eviction deadlock #530

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,10 @@ async def _handle_activation(
# TODO(cretz): Should we build a complex mechanism to continually
# try the eviction until it succeeds?
if cache_remove_job:
logger.exception("Failed running eviction job, not evicting")
logger.exception(
"Failed running eviction job, not evicting. "
+ "Since eviction could not be processed, this worker cannot complete and the slot will remain forever used."
)
Comment on lines +264 to +267
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it'd make more sense to just panic and crash at this point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is not really about changing this behavior, but we can have a general discussion on that if we want though I don't think it should hold up this issue.

To do this, we might need some Core support. Today Core won't shutdown a worker if there is an outstanding eviction, and we don't want to return an eviction if it can't properly tear down because we don't want to accept more work for this workflow. What will Core do if we fail the eviction instead of hang it?

Also, a deadlocked workflow fatal'ing the worker is a bit rough on users. Unfortunately deadlocked many times means failed-eviction-in-cache-forever from a Python POV. Technically we could move can't-evict workflows off to some other dict, but that's just shifting the memory burden, it doesn't help the end user. This whole situation should be rare, hence why I was hesitant to build in some continually-retry-eviction type of logic in case the user's non-deterministic stuff started working (rare/unlikely).

Now that I think about it, I think other SDKs like Go and Java have this issue too. If they can't tear down their coroutines/threads and it hangs, there's nothing they can do, but I don't think they fatal the worker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you fail the eviction I believe it just gets re-issued. We could make a special case for this but I'd rather not.

And yeah, fair point about the other SDKs and how this is likely when you do deadlock. I'm fine with just eating up the slot.

self._could_not_evict_count += 1
return

Expand Down
4 changes: 2 additions & 2 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def activate(
# have a different workflow/event-loop going.
if self._deleting and self._tasks:
raise RuntimeError(
f"Cache removal processed, but {len(self._tasks)} tasks remain. "
f"Eviction processed, but {len(self._tasks)} tasks remain. "
+ f"Stack traces below:\n\n{self._stack_trace()}"
)

Expand Down Expand Up @@ -1776,7 +1776,7 @@ async def _signal_external_workflow(

def _stack_trace(self) -> str:
stacks = []
for task in self._tasks:
for task in list(self._tasks):
# TODO(cretz): These stacks are not very clean currently
frames = []
for frame in task.get_stack():
Expand Down
2 changes: 1 addition & 1 deletion temporalio/worker/workflow_sandbox/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,4 @@ def _run_code(self, code: str, **extra_globals: Any) -> None:
finally:
temporalio.workflow.unsafe._set_in_sandbox(False)
for k, v in extra_globals.items():
del self.globals_and_locals[k]
self.globals_and_locals.pop(k, None)
2 changes: 1 addition & 1 deletion tests/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def assert_eq_eventually(
await asyncio.sleep(interval.total_seconds())
assert (
expected == last_value
), "timed out waiting for equal, asserted against last value"
), f"timed out waiting for equal, asserted against last value of {last_value}"


async def worker_versioning_enabled(client: Client) -> bool:
Expand Down
62 changes: 61 additions & 1 deletion tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2467,6 +2467,8 @@ async def test_workflow_deadlock(client: Client):
async with new_worker(
client, DeadlockedWorkflow, disable_safe_workflow_eviction=True
) as worker:
if worker._workflow_worker:
worker._workflow_worker._deadlock_timeout_seconds = 1
deadlock_thread_event.clear()
handle = await client.start_workflow(
DeadlockedWorkflow.run,
Expand All @@ -2488,7 +2490,7 @@ async def last_history_task_failure() -> str:

try:
await assert_eq_eventually(
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 2 second(s)",
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)",
last_history_task_failure,
timeout=timedelta(seconds=5),
interval=timedelta(seconds=1),
Expand All @@ -2497,6 +2499,64 @@ async def last_history_task_failure() -> str:
deadlock_thread_event.set()


@workflow.defn
class EvictionDeadlockWorkflow:
def __init__(self) -> None:
self.val = 1

async def wait_until_positive(self):
while True:
await workflow.wait_condition(lambda: self.val > 0)
self.val = -self.val

async def wait_until_negative(self):
while True:
await workflow.wait_condition(lambda: self.val < 0)
self.val = -self.val

@workflow.run
async def run(self):
await asyncio.gather(self.wait_until_negative(), self.wait_until_positive())


async def test_workflow_eviction_deadlock(client: Client):
# We are running the worker, but we can't ever shut it down on eviction
# error so we send shutdown in the background and leave this worker dangling
worker = new_worker(client, EvictionDeadlockWorkflow)
if worker._workflow_worker:
worker._workflow_worker._deadlock_timeout_seconds = 1
worker_task = asyncio.create_task(worker.run())

# Run workflow that deadlocks
handle = await client.start_workflow(
EvictionDeadlockWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

async def last_history_task_failure() -> str:
resp = await client.workflow_service.get_workflow_execution_history(
GetWorkflowExecutionHistoryRequest(
namespace=client.namespace,
execution=WorkflowExecution(workflow_id=handle.id),
),
)
for event in reversed(resp.history.events):
if event.event_type == EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED:
return event.workflow_task_failed_event_attributes.failure.message
return "<no failure>"

await assert_eq_eventually(
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)",
last_history_task_failure,
timeout=timedelta(seconds=5),
interval=timedelta(seconds=1),
)

# Send cancel but don't wait
worker_task.cancel()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm finding that this test passes even with the bug fix reverted.

(I know that some people disagree, but personally I think that it's a good idea to structure PRs like this as an initial commit with a failing test, followed by a commit that fixes the failure.)

sdk-python(eviction-deadlock-stack અ ) .venv/bin/pytest tests/worker/test_workflow.py::test_workflow_eviction_deadlock                                                                               09:46:21 [3/5530]
================================================================================================ test session starts =================================================================================================
platform darwin -- Python 3.10.13, pytest-7.4.3, pluggy-1.0.0
rootdir: /Users/dan/src/temporalio/sdk-python
configfile: pyproject.toml
plugins: timeout-2.2.0, asyncio-0.21.1
timeout: 600.0s
timeout method: signal
timeout func_only: True
asyncio: mode=auto
collected 1 item

tests/worker/test_workflow.py::test_workflow_eviction_deadlock
--------------------------------------------------------------------------------------------------- live log call ----------------------------------------------------------------------------------------------------
09:46:20 [   ERROR] Failed handling activation on workflow with run ID 70ba6973-f045-46e2-b5c0-78a944e41904 (_workflow.py:271)
Traceback (most recent call last):
  File "/Users/dan/.pyenv/versions/3.10.13/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow.py", line 249, in _handle_activation
    completion = await asyncio.wait_for(
  File "/Users/dan/.pyenv/versions/3.10.13/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow.py", line 253, in _handle_activation
    raise RuntimeError(
RuntimeError: [TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)
09:46:20 [   ERROR] Failed running eviction job, not evicting. Since eviction could not be processed, this worker cannot complete and the slot will remain forever used. (_workflow.py:264)
Traceback (most recent call last):
  File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow.py", line 249, in _handle_activation
    completion = await asyncio.wait_for(
  File "/Users/dan/.pyenv/versions/3.10.13/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/Users/dan/.pyenv/versions/3.10.13/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/workflow_sandbox/_runner.py", line 157, in activate
    self._run_code(
  File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/workflow_sandbox/_runner.py", line 172, in _run_code
    exec(code, self.globals_and_locals, self.globals_and_locals)
  File "<string>", line 2, in <module>
  File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/workflow_sandbox/_in_sandbox.py", line 81, in activate
    return self.instance.activate(act)
  File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py", line 361, in activate
    + f"Stack traces below:\n\n{self._stack_trace()}"
  File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py", line 1779, in _stack_trace
    for task in self._tasks:
RuntimeError: Set changed size during iteration
09:46:21 [    INFO] Worker cancelled, shutting down (_worker.py:479)
09:46:21 [    INFO] Beginning worker shutdown, will wait 0:00:00 before cancelling activities (_worker.py:485)
09:46:21 [ WARNING] Shutting down workflow worker, but 1 workflow(s) could not be evicted previously, so the shutdown will hang (_workflow.py:172)
PASSED

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there's no easy way without extracting logs or adding other testing-only helpers to assert which reason exists for eviction failure. The test in this case was more for observed behavior as I struggled to justify adding injection points for this test. But if we think it's important, I can add hooks in the runtime code for this one test.


class PatchWorkflowBase:
def __init__(self) -> None:
self._result = "<unset>"
Expand Down
Loading