From 6f9fe81d8bd31d70d1c1b654b221328b1b455f12 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 5 Oct 2021 18:48:27 +0200 Subject: [PATCH] more robust test_reschedule_concurrent_requests_deadlock --- distributed/stealing.py | 12 ++++++----- distributed/tests/test_steal.py | 35 ++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index c3cf93c31d3..3348d19231d 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -167,10 +167,10 @@ def steal_time_ratio(self, ts): return cost_multiplier, level - def move_task_request(self, ts, victim, thief): + def move_task_request(self, ts, victim, thief) -> str: try: if ts in self.in_flight: - return + return "in-flight" stimulus_id = f"steal-{time()}" if self.scheduler.validate: assert victim is ts.processing_on @@ -205,8 +205,10 @@ def move_task_request(self, ts, victim, thief): self.in_flight_occupancy[victim] -= victim_duration self.in_flight_occupancy[thief] += thief_duration + return stimulus_id except CommClosedError: logger.info("Worker comm %r closed while stealing: %r", victim, ts) + return "comm-closed" except Exception as e: logger.exception(e) if LOG_PDB: @@ -224,7 +226,7 @@ async def move_task_confirm(self, *, key, state, stimulus_id, worker=None): try: d = self.in_flight.pop(ts) if d["stimulus_id"] != stimulus_id: - self.log(("stale-response", worker, stimulus_id)) + self.log(("stale-response", key, state, worker, stimulus_id)) self.in_flight[ts] = d return except KeyError: @@ -436,9 +438,9 @@ def restart(self, scheduler): self.key_stealable.clear() def story(self, *keys): - keys = set(keys) + keys = {key.key if not isinstance(key, str) else key for key in keys} out = [] - for _, L in self.scheduler.get_event("stealing"): + for _, L in self.scheduler.get_events(topic="stealing"): if not isinstance(L, list): L = [L] for t in L: diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 323251f3274..2949a78fcfc 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -950,6 +950,28 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers): assert all(v == 0 for v in steal.in_flight_occupancy.values()) +@gen_cluster( + client=True, + config={ + "distributed.scheduler.work-stealing-interval": 10, + }, +) +async def test_get_story(c, s, *workers): + steal = s.extensions["stealing"] + futs = c.map( + slowinc, range(100), workers=[workers[0].address], allow_other_workers=True + ) + collect = c.submit(sum, futs) + await collect + key = next(iter(workers[1].tasks)) + ts = s.tasks[key] + msgs = steal.story(key) + msgs_ts = steal.story(ts) + assert msgs + assert msgs == msgs_ts + assert all(isinstance(m, tuple) for m in msgs) + + @gen_cluster( client=True, nthreads=[("", 1)] * 3, @@ -984,10 +1006,21 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers): s.set_restrictions(worker={victim_key: [wsB.address]}) s.reschedule(victim_key) assert wsB == victim_ts.processing_on + # move_task_request is not responsible for respecting worker restrictions steal.move_task_request(victim_ts, wsB, wsC) await c.gather(futs1) - assert victim_ts.who_has == {wsC} + # If this turns out to be overly flaky, the following may be relaxed ore + # removed. The point of this test is to not deadlock but verifying expected + # state is still a nice thing + + # Either the last request goes through or both have been rejected since the + # computation was already done by the time the request comes in. This is + # unfortunately not stable even if we increase the compute time + if victim_ts.who_has != {wsC}: + msgs = steal.story(victim_ts) + assert len(msgs) == 2 + assert all(msg[1][0] == "already-aborted" for msg in msgs) @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)