Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include stack trace on deadlock detection exception (re) #626

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 96 additions & 4 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,22 @@
import concurrent.futures
import logging
import os
import sys
from datetime import timezone
from typing import Callable, Dict, List, MutableMapping, Optional, Sequence, Set, Type
from threading import get_ident
from types import TracebackType
from typing import (
Callable,
Dict,
List,
Literal,
MutableMapping,
Optional,
Sequence,
Set,
Tuple,
Type,
)

import temporalio.activity
import temporalio.api.common.v1
Expand Down Expand Up @@ -250,9 +264,10 @@ async def _handle_activation(
activate_task, self._deadlock_timeout_seconds
)
except asyncio.TimeoutError:
raise RuntimeError(
f"[TMPRL1101] Potential deadlock detected, workflow didn't yield within {self._deadlock_timeout_seconds} second(s)"
)
raise _DeadlockError.from_deadlocked_workflow(
workflow, self._deadlock_timeout_seconds
) from None

except Exception as err:
# We cannot fail a cache eviction, we must just log and not complete
# the activation (failed or otherwise). This should only happen in
Expand All @@ -268,6 +283,9 @@ async def _handle_activation(
self._could_not_evict_count += 1
return

if isinstance(err, _DeadlockError):
err.swap_traceback()

logger.exception(
"Failed handling activation on workflow with run ID %s", act.run_id
)
Expand Down Expand Up @@ -421,3 +439,77 @@ def nondeterminism_as_workflow_fail_for_types(self) -> Set[str]:
for typ in v.failure_exception_types
)
)


class _DeadlockError(Exception):
"""Exception class for deadlocks. Contains functionality to swap the default traceback for another."""

def __init__(self, message: str, replacement_tb: Optional[TracebackType] = None):
"""Create a new DeadlockError, with message `msg` and optionally a traceback `tb` to be swapped in later.

Args:
message: Message to be presented through exception.
replacement_tb: Optional TracebackType to be swapped later.
"""
super().__init__(message)
self._new_tb = replacement_tb

def swap_traceback(self) -> None:
"""Swap the current traceback for the replacement passed during construction. Used to work around Python adding the current frame to the stack trace.

Returns:
None
"""
if self._new_tb:
self.__traceback__ = self._new_tb
self._new_tb = None

@classmethod
def from_deadlocked_workflow(
cls, workflow: WorkflowInstance, timeout: Optional[int]
):
msg = f"[TMPRL1101] Potential deadlock detected: workflow didn't yield within {timeout} second(s)."
tid = workflow.get_thread_id()
if not tid:
return cls(msg)

try:
tb = cls._gen_tb_helper(tid)
if tb:
return cls(msg, tb)
return cls(f"{msg} (no frames available)")
except Exception as err:
return cls(f"{msg} (failed getting frames: {err})")

@staticmethod
def _gen_tb_helper(
tid: int,
) -> Optional[TracebackType]:
"""Take a thread id and construct a stack trace.

Returns:
<Optional[TracebackType]> the traceback that was constructed, None if the thread could not be found.
"""
frame = sys._current_frames().get(tid)
if not frame:
return None

# not using traceback.extract_stack() because it obfuscates the frame objects (specifically f_lasti)
thread_frames = [frame]
while frame.f_back:
frame = frame.f_back
thread_frames.append(frame)

thread_frames.reverse()

size = 0
tb = None
for frm in thread_frames:
tb = TracebackType(tb, frm, frm.f_lasti, frm.f_lineno)
size += sys.getsizeof(tb)

while size > 200000 and tb:
size -= sys.getsizeof(tb)
tb = tb.tb_next

return tb
18 changes: 18 additions & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
import random
import sys
import threading
import traceback
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -158,6 +159,16 @@ def activate(
"""
raise NotImplementedError

def get_thread_id(self) -> Optional[int]:
"""Return the thread identifier that this workflow is running on.

Not an abstractmethod because it is not mandatory to implement. Used primarily for getting the frames of a deadlocked thread.

Returns:
Thread ID if the workflow is running, None if not.
"""
return None


class UnsandboxedWorkflowRunner(WorkflowRunner):
"""Workflow runner that does not do any sandboxing."""
Expand Down Expand Up @@ -300,6 +311,12 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
# We only create the metric meter lazily
self._metric_meter: Optional[_ReplaySafeMetricMeter] = None

# For tracking the thread this workflow is running on (primarily for deadlock situations)
self._current_thread_id: Optional[int] = None

def get_thread_id(self) -> Optional[int]:
return self._current_thread_id

#### Activation functions ####
# These are in alphabetical order and besides "activate", all other calls
# are "_apply_" + the job field name.
Expand All @@ -320,6 +337,7 @@ def activate(
self._time_ns = act.timestamp.ToNanoseconds()
self._is_replaying = act.is_replaying

self._current_thread_id = threading.get_ident()
activation_err: Optional[Exception] = None
try:
# Split into job sets with patches, then signals + updates, then
Expand Down
10 changes: 9 additions & 1 deletion temporalio/worker/workflow_sandbox/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

from __future__ import annotations

import threading
from datetime import datetime, timedelta, timezone
from typing import Any, Sequence, Type
from typing import Any, Optional, Sequence, Type

import temporalio.bridge.proto.workflow_activation
import temporalio.bridge.proto.workflow_completion
Expand Down Expand Up @@ -112,6 +113,8 @@ def __init__(
self.runner_class = runner_class
self.importer = Importer(restrictions, RestrictionContext())

self._current_thread_id: Optional[int] = None

# Create the instance
self.globals_and_locals = {
"__file__": "workflow_sandbox.py",
Expand Down Expand Up @@ -169,8 +172,13 @@ def _run_code(self, code: str, **extra_globals: Any) -> None:
self.globals_and_locals[k] = v
try:
temporalio.workflow.unsafe._set_in_sandbox(True)
self._current_thread_id = threading.get_ident()
exec(code, self.globals_and_locals, self.globals_and_locals)
finally:
temporalio.workflow.unsafe._set_in_sandbox(False)
self._current_thread_id = None
for k, v in extra_globals.items():
self.globals_and_locals.pop(k, None)

def get_thread_id(self) -> Optional[int]:
return self._current_thread_id
36 changes: 34 additions & 2 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,38 @@ async def status() -> str:


async def test_workflow_enhanced_stack_trace(client: Client):
"""Expected format of __enhanced_stack_trace:

EnhancedStackTrace : {

sdk (StackTraceSDKInfo) : {
name: string,
version: string
},

sources (map<string, StackTraceFileSlice>) : {
filename: (StackTraceFileSlice) {
line_offset: int,
content: string
},
...
},

stacks (StackTrace[]) : [
(StackTraceFileLocation) {
file_path: string,
line: int,
column: int,
function_name: string,
internal_code: bool
},
...
]
}

More details available in API repository: temporal/api/sdk/v1/enhanced_stack_trace.proto
"""

async with new_worker(
client, StackTraceWorkflow, LongSleepWorkflow, activities=[wait_cancel]
) as worker:
Expand Down Expand Up @@ -2570,7 +2602,7 @@ async def last_history_task_failure() -> str:

try:
await assert_eq_eventually(
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)",
"[TMPRL1101] Potential deadlock detected: workflow didn't yield within 1 second(s).",
last_history_task_failure,
timeout=timedelta(seconds=5),
interval=timedelta(seconds=1),
Expand Down Expand Up @@ -2627,7 +2659,7 @@ async def last_history_task_failure() -> str:
return "<no failure>"

await assert_eq_eventually(
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)",
"[TMPRL1101] Potential deadlock detected: workflow didn't yield within 1 second(s).",
last_history_task_failure,
timeout=timedelta(seconds=5),
interval=timedelta(seconds=1),
Expand Down
Loading