Skip to content

Commit

Permalink
Change exception_blame back to TaskState and make stimulus_id's 'str …
Browse files Browse the repository at this point in the history
…| None'
  • Loading branch information
sjperkins committed Mar 17, 2022
1 parent 5307c0e commit 30dba93
Showing 1 changed file with 21 additions and 20 deletions.
41 changes: 21 additions & 20 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,7 @@ class TaskState:
_exception_text: str
_traceback: object
_traceback_text: str
_exception_blame: "TaskState | None"
_exception_blame: "TaskState" # TaskState | None
_erred_on: set
_suspicious: Py_ssize_t
_host_restrictions: set # set[str] | None
Expand Down Expand Up @@ -2269,7 +2269,9 @@ def new_task(
# State Transitions #
#####################

def _transition(self, key, finish: str, *args, stimulus_id: str = None, **kwargs):
def _transition(
self, key, finish: str, *args, stimulus_id: str | None = None, **kwargs
):
"""Transition a key from its current state to the finish state
Examples
Expand Down Expand Up @@ -2457,7 +2459,7 @@ def _transitions(self, recommendations: dict, client_msgs: dict, worker_msgs: di
for key in keys:
scheduler.validate_key(key)

def transition_released_waiting(self, key, stimulus_id: str = None):
def transition_released_waiting(self, key, stimulus_id: str | None = None):
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2511,7 +2513,7 @@ def transition_released_waiting(self, key, stimulus_id: str = None):
pdb.set_trace()
raise

def transition_no_worker_waiting(self, key, stimulus_id: str = None):
def transition_no_worker_waiting(self, key, stimulus_id: str | None = None):
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2565,7 +2567,7 @@ def transition_no_worker_memory(
type=None,
typename: str = None,
worker=None,
stimulus_id: str = None,
stimulus_id: str | None = None,
):
try:
ws: WorkerState = self._workers_dv[worker]
Expand Down Expand Up @@ -2727,7 +2729,7 @@ def set_duration_estimate(self, ts: TaskState, ws: WorkerState) -> double:

return total_duration

def transition_waiting_processing(self, key, stimulus_id: str = None):
def transition_waiting_processing(self, key, stimulus_id: str | None = None):
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2779,7 +2781,7 @@ def transition_waiting_memory(
type=None,
typename: str = None,
worker=None,
stimulus_id: str = None,
stimulus_id: str | None = None,
**kwargs,
):
try:
Expand Down Expand Up @@ -2827,7 +2829,7 @@ def transition_processing_memory(
typename: str = None,
worker=None,
startstops=None,
stimulus_id: str = None,
stimulus_id: str | None = None,
**kwargs,
):
ws: WorkerState
Expand Down Expand Up @@ -2917,7 +2919,7 @@ def transition_processing_memory(
raise

def transition_memory_released(
self, key, safe: bint = False, stimulus_id: str = None
self, key, safe: bint = False, stimulus_id: str | None = None
):
ws: WorkerState
try:
Expand Down Expand Up @@ -2990,7 +2992,7 @@ def transition_memory_released(
pdb.set_trace()
raise

def transition_released_erred(self, key, stimulus_id: str = None):
def transition_released_erred(self, key, stimulus_id: str | None = None):
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand All @@ -3006,9 +3008,6 @@ def transition_released_erred(self, key, stimulus_id: str = None):
assert not ts._waiting_on
assert not ts._waiters

if not ts._exception_blame:
raise RuntimeError(f"_exception_blame not supplied for TaskState {ts}")

failing_ts = ts._exception_blame

for dts in ts._dependents:
Expand Down Expand Up @@ -3038,7 +3037,7 @@ def transition_released_erred(self, key, stimulus_id: str = None):
pdb.set_trace()
raise

def transition_erred_released(self, key, stimulus_id: str = None):
def transition_erred_released(self, key, stimulus_id: str | None = None):
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -3088,7 +3087,7 @@ def transition_erred_released(self, key, stimulus_id: str = None):
pdb.set_trace()
raise

def transition_waiting_released(self, key, stimulus_id: str = None):
def transition_waiting_released(self, key, stimulus_id: str | None = None):
try:
ts: TaskState = self._tasks[key]
recommendations: dict = {}
Expand Down Expand Up @@ -3125,7 +3124,7 @@ def transition_waiting_released(self, key, stimulus_id: str = None):
pdb.set_trace()
raise

def transition_processing_released(self, key, stimulus_id: str = None):
def transition_processing_released(self, key, stimulus_id: str | None = None):
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -3268,7 +3267,7 @@ def transition_processing_erred(
pdb.set_trace()
raise

def transition_no_worker_released(self, key, stimulus_id: str = None):
def transition_no_worker_released(self, key, stimulus_id: str | None = None):
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -3311,7 +3310,7 @@ def remove_key(self, key):
ts._exception_blame = ts._exception = ts._traceback = None
self._task_metadata.pop(key, None)

def transition_memory_forgotten(self, key, stimulus_id: str = None):
def transition_memory_forgotten(self, key, stimulus_id: str | None = None):
ws: WorkerState
try:
ts: TaskState = self._tasks[key]
Expand Down Expand Up @@ -3353,7 +3352,7 @@ def transition_memory_forgotten(self, key, stimulus_id: str = None):
pdb.set_trace()
raise

def transition_released_forgotten(self, key, stimulus_id: str = None):
def transition_released_forgotten(self, key, stimulus_id: str | None = None):
try:
ts: TaskState = self._tasks[key]
recommendations: dict = {}
Expand Down Expand Up @@ -7522,7 +7521,9 @@ async def unregister_nanny_plugin(self, comm, name):
)
return responses

def transition(self, key, finish: str, *args, stimulus_id: str = None, **kwargs):
def transition(
self, key, finish: str, *args, stimulus_id: str | None = None, **kwargs
):
"""Transition a key from its current state to the finish state
Examples
Expand Down

0 comments on commit 30dba93

Please sign in to comment.