Skip to content

Commit

Permalink
more robust test_reschedule_concurrent_requests_deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Oct 8, 2021
1 parent 7398e31 commit 6f9fe81
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
12 changes: 7 additions & 5 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ def steal_time_ratio(self, ts):

return cost_multiplier, level

def move_task_request(self, ts, victim, thief):
def move_task_request(self, ts, victim, thief) -> str:
try:
if ts in self.in_flight:
return
return "in-flight"
stimulus_id = f"steal-{time()}"
if self.scheduler.validate:
assert victim is ts.processing_on
Expand Down Expand Up @@ -205,8 +205,10 @@ def move_task_request(self, ts, victim, thief):

self.in_flight_occupancy[victim] -= victim_duration
self.in_flight_occupancy[thief] += thief_duration
return stimulus_id
except CommClosedError:
logger.info("Worker comm %r closed while stealing: %r", victim, ts)
return "comm-closed"
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand All @@ -224,7 +226,7 @@ async def move_task_confirm(self, *, key, state, stimulus_id, worker=None):
try:
d = self.in_flight.pop(ts)
if d["stimulus_id"] != stimulus_id:
self.log(("stale-response", worker, stimulus_id))
self.log(("stale-response", key, state, worker, stimulus_id))
self.in_flight[ts] = d
return
except KeyError:
Expand Down Expand Up @@ -436,9 +438,9 @@ def restart(self, scheduler):
self.key_stealable.clear()

def story(self, *keys):
keys = set(keys)
keys = {key.key if not isinstance(key, str) else key for key in keys}
out = []
for _, L in self.scheduler.get_event("stealing"):
for _, L in self.scheduler.get_events(topic="stealing"):
if not isinstance(L, list):
L = [L]
for t in L:
Expand Down
35 changes: 34 additions & 1 deletion distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,28 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers):
assert all(v == 0 for v in steal.in_flight_occupancy.values())


@gen_cluster(
client=True,
config={
"distributed.scheduler.work-stealing-interval": 10,
},
)
async def test_get_story(c, s, *workers):
steal = s.extensions["stealing"]
futs = c.map(
slowinc, range(100), workers=[workers[0].address], allow_other_workers=True
)
collect = c.submit(sum, futs)
await collect
key = next(iter(workers[1].tasks))
ts = s.tasks[key]
msgs = steal.story(key)
msgs_ts = steal.story(ts)
assert msgs
assert msgs == msgs_ts
assert all(isinstance(m, tuple) for m in msgs)


@gen_cluster(
client=True,
nthreads=[("", 1)] * 3,
Expand Down Expand Up @@ -984,10 +1006,21 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers):
s.set_restrictions(worker={victim_key: [wsB.address]})
s.reschedule(victim_key)
assert wsB == victim_ts.processing_on
# move_task_request is not responsible for respecting worker restrictions
steal.move_task_request(victim_ts, wsB, wsC)
await c.gather(futs1)

assert victim_ts.who_has == {wsC}
# If this turns out to be overly flaky, the following may be relaxed ore
# removed. The point of this test is to not deadlock but verifying expected
# state is still a nice thing

# Either the last request goes through or both have been rejected since the
# computation was already done by the time the request comes in. This is
# unfortunately not stable even if we increase the compute time
if victim_ts.who_has != {wsC}:
msgs = steal.story(victim_ts)
assert len(msgs) == 2
assert all(msg[1][0] == "already-aborted" for msg in msgs)


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
Expand Down

0 comments on commit 6f9fe81

Please sign in to comment.