Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swallow Python exceptions better on workflow GC from eviction #341

Merged
merged 4 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,16 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
# Set ourselves on our own loop
temporalio.workflow._Runtime.set_on_loop(self, self)

# After GC, Python raises GeneratorExit calls from all awaiting tasks.
# Then in a finally of such an await, another exception can swallow
# these causing even more issues. We will set ourselves as deleted so we
# can check in some places to swallow these errors on tear down.
self._deleting = False

def __del__(self) -> None:
# We have confirmed there are no super() versions of __del__
self._deleting = True

#### Activation functions ####
# These are in alphabetical order and besides "activate", all other calls
# are "_apply_" + the job field name.
Expand Down Expand Up @@ -629,14 +639,26 @@ def _apply_start_workflow(
# Async call to run on the scheduler thread. This will be wrapped in
# another function which applies exception handling.
async def run_workflow(input: ExecuteWorkflowInput) -> None:
result = await self._inbound.execute_workflow(input)
result_payloads = self._payload_converter.to_payloads([result])
if len(result_payloads) != 1:
raise ValueError(
f"Expected 1 result payload, got {len(result_payloads)}"
)
command = self._add_command()
command.complete_workflow_execution.result.CopyFrom(result_payloads[0])
try:
result = await self._inbound.execute_workflow(input)
result_payloads = self._payload_converter.to_payloads([result])
if len(result_payloads) != 1:
raise ValueError(
f"Expected 1 result payload, got {len(result_payloads)}"
)
command = self._add_command()
command.complete_workflow_execution.result.CopyFrom(result_payloads[0])
except BaseException as err:
# During tear down, generator exit and event loop exceptions can occur
if not self._deleting:
raise
if not isinstance(
err,
(GeneratorExit, temporalio.workflow._NotInWorkflowEventLoopError),
):
logger.debug(
"Ignoring exception while deleting workflow", exc_info=True
Comment on lines +659 to +660
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nit but IMO this is more like trace level

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was a tough call, but figured since I remove the two primary cases where this happens, the rest I think are fairly rare. And in fact, this can show a sign of a code smell, because it might mean you are swallowing an exception in a finally if it's not one of these two exception types.

)

# Schedule it
input = ExecuteWorkflowInput(
Expand Down Expand Up @@ -1260,6 +1282,16 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
else:
# All other exceptions fail the task
self._current_activation_error = err
except BaseException as err:
# During tear down, generator exit and no-runtime exceptions can appear
if not self._deleting:
raise
if not isinstance(
err, (GeneratorExit, temporalio.workflow._NotInWorkflowEventLoopError)
):
logger.debug(
"Ignoring exception while deleting workflow", exc_info=True
)

def _set_workflow_failure(self, err: temporalio.exceptions.FailureError) -> None:
# All other failure errors fail the workflow
Expand Down
8 changes: 7 additions & 1 deletion temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ class _Runtime(ABC):
def current() -> _Runtime:
loop = _Runtime.maybe_current()
if not loop:
raise RuntimeError("Not in workflow event loop")
raise _NotInWorkflowEventLoopError("Not in workflow event loop")
return loop

@staticmethod
Expand Down Expand Up @@ -3843,6 +3843,12 @@ def __init__(self, message: str) -> None:
self.message = message


class _NotInWorkflowEventLoopError(temporalio.exceptions.TemporalError):
def __init__(self, *args: object) -> None:
super().__init__("Not in workflow event loop")
self.message = "Not in workflow event loop"


class VersioningIntent(Enum):
"""Indicates whether the user intends certain commands to be run on a compatible worker Build
Id version or not.
Expand Down
69 changes: 69 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging.handlers
import pickle
import queue
import sys
import threading
import uuid
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -2843,3 +2844,71 @@ async def test_manual_result_type(client: Client):
assert res3 == {"some_string": "from-query"}
res4 = await handle.query("some_query", result_type=ManualResultType)
assert res4 == ManualResultType(some_string="from-query")


@workflow.defn
class SwallowGeneratorExitWorkflow:
def __init__(self) -> None:
self._signal_count = 0

@workflow.run
async def run(self) -> None:
try:
# Wait for signal count to reach 2
await workflow.wait_condition(lambda: self._signal_count > 1)
finally:
# This finally, on eviction, is actually called because the above
# await raises GeneratorExit. Then this will raise a
# _NotInWorkflowEventLoopError swallowing that.
await workflow.wait_condition(lambda: self._signal_count > 2)

@workflow.signal
async def signal(self) -> None:
self._signal_count += 1

@workflow.query
async def signal_count(self) -> int:
return self._signal_count


async def test_swallow_generator_exit(client: Client):
if sys.version_info < (3, 8):
pytest.skip("sys.unraisablehook not in 3.7")
# This test simulates GeneratorExit and GC issues by forcing eviction on
# each step
async with new_worker(
client, SwallowGeneratorExitWorkflow, max_cached_workflows=0
) as worker:
# Put a hook to catch unraisable exceptions
old_hook = sys.unraisablehook
hook_calls: List[Any] = []
sys.unraisablehook = hook_calls.append
try:
handle = await client.start_workflow(
SwallowGeneratorExitWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

async def signal_count() -> int:
return await handle.query(SwallowGeneratorExitWorkflow.signal_count)

# Confirm signal count as 0
await assert_eq_eventually(0, signal_count)

# Send signal and confirm it's at 1
await handle.signal(SwallowGeneratorExitWorkflow.signal)
await assert_eq_eventually(1, signal_count)

await handle.signal(SwallowGeneratorExitWorkflow.signal)
await assert_eq_eventually(2, signal_count)

await handle.signal(SwallowGeneratorExitWorkflow.signal)
await assert_eq_eventually(3, signal_count)

await handle.result()
finally:
sys.unraisablehook = old_hook

# Confirm no unraisable exceptions
assert not hook_calls