Skip to content

Commit

Permalink
Format warning messages
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Jun 19, 2024
1 parent 70d818a commit bea62d5
Showing 1 changed file with 42 additions and 12 deletions.
54 changes: 42 additions & 12 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import collections
import contextvars
import inspect
import json
import logging
import random
import sys
Expand Down Expand Up @@ -459,18 +460,7 @@ def activate(
i += 1

if seen_completion:
if any(
defn.unfinished_handlers_policy
== temporalio.workflow.UnfinishedHandlersPolicy.WARN_AND_ABANDON
for _, defn in self._in_progress_updates.values()
):
warnings.warn(UNFINISHED_UPDATE_HANDLERS_WARNING, RuntimeWarning)
if any(
defn.unfinished_handlers_policy
== temporalio.workflow.UnfinishedHandlersPolicy.WARN_AND_ABANDON
for _, defn in self._in_progress_signals.values()
):
warnings.warn(UNFINISHED_SIGNAL_HANDLERS_WARNING, RuntimeWarning)
self._warn_if_unfinished_handlers()
return self._current_completion

def _apply(
Expand Down Expand Up @@ -1661,6 +1651,46 @@ def _is_workflow_failure_exception(self, err: BaseException) -> bool:
)
)

def _warn_if_unfinished_handlers(self) -> None:
updates_warning = self._make_unfinished_update_handlers_warning()
if updates_warning:
warnings.warn(updates_warning, RuntimeWarning)

signals_warning = self._make_unfinished_signal_handlers_warning()
if signals_warning:
warnings.warn(signals_warning, RuntimeWarning)

def _make_unfinished_update_handlers_warning(self) -> Optional[str]:
warnable_jobs = [
job
for job, defn in self._in_progress_updates.values()
if defn.unfinished_handlers_policy
== temporalio.workflow.UnfinishedHandlersPolicy.WARN_AND_ABANDON
]
if not warnable_jobs:
return None
handler_information = (
"The following updates were unfinished (and warnings were not disabled for their handler): "
+ json.dumps([{"name": j.name, "id": j.id} for j in warnable_jobs])
)
return UNFINISHED_UPDATE_HANDLERS_WARNING + " " + handler_information

def _make_unfinished_signal_handlers_warning(self) -> Optional[str]:
warnable_jobs = [
job
for job, defn in self._in_progress_signals.values()
if defn.unfinished_handlers_policy
== temporalio.workflow.UnfinishedHandlersPolicy.WARN_AND_ABANDON
]
if not warnable_jobs:
return None
names = collections.Counter(j.signal_name for j in warnable_jobs)
handler_information = (
"The following signals were unfinished (and warnings were not disabled for their handler): "
+ json.dumps([{"name": name, "count": count} for name, count in names.most_common()])
)
return UNFINISHED_SIGNAL_HANDLERS_WARNING + " " + handler_information

def _next_seq(self, type: str) -> int:
seq = self._curr_seqs.get(type, 0) + 1
self._curr_seqs[type] = seq
Expand Down

0 comments on commit bea62d5

Please sign in to comment.