Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure steal requests from same-IP but distinct workers are rejected #6585

Merged
merged 2 commits into from
Jun 22, 2022

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Jun 16, 2022

Closes #6356
Closes #6198
Closes #6263

#6356 describes a situation where a worker may die and a new worker with the same IP would connect to the scheduler. This could mess up our stealing logic since the WorkerState objects we're referencing there would reference the wrong worker, i.e. state between the scheduler and stealing extension would drift.

With the test in this draft PR I could prove that this is indeed the case. However, so far nothing bad happens. Upon task completion, the scheduler would issue a Unexpected worker completed task message and send a cancel-compute event to the worker. The worker would ignore this event since the task is in state memory.

Before I fix the state drift I want to poke at this further since this is supposedly responsible for a couple of deadlocks. I would like to confirm that this is the only trigger for these deadlocks and there is nothing else going on.

This could definitely explain how a dead worker is shown on the dashboard #6198 even if the deadlock was unrelated

FWIW I was always suspicious why this Unexpected worker completed task was necessary and struggled to reproduce it. This finally sheds some light onto it and I actually hope that we can get rid of this message and therefore the cancel-compute event entirely.

@github-actions
Copy link
Contributor

github-actions bot commented Jun 16, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±0         15 suites  ±0   7h 29m 35s ⏱️ + 12m 45s
  2 880 tests +1    2 762 ✔️  - 27    84 💤 ±0  30 +25  4 🔥 +3 
21 338 runs  +7  20 304 ✔️  - 55  964 💤  - 2  64 +59  6 🔥 +5 

For more details on these failures and errors, see this check.

Results for commit b20ada6. ± Comparison against base commit e6cc40a.

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member Author

fjetter commented Jun 16, 2022

I missed the comment about the worker closing again in #6263 (comment)
Closing the worker after steal confirmation will deadlock 🎉 That's also matches the observation of this happening frequently in adaptive situations.

@fjetter fjetter requested review from gjoseph92 and crusaderky June 16, 2022 12:06
Comment on lines 151 to 152
def remove_worker(self, scheduler: Scheduler, worker: str):
del self.stealable[worker]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I thought about modifying / tracking removed workers and match up with in_flight. That led me towards a "remember cancellation" mechanism which was way too complicated.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about something like (with appropriate KeyError handling)

levels = self.stealable.pop(worker)
for level_i, level_tasks in enumerate(levels):
    for ts in level_tasks:
        self.stealable_all[level_i].remove(ts)
        self.key_stealable.pop(ts, None)
        self.in_flight.pop(ts)  # I don't know??

# Maybe??:
self.in_flight_occupancy.pop(worker)  # this may cause KeyErrors in other places, need to handle
if not self.in_flight:
    self.in_flight_occupancy.clear()
    self._in_flight_event.set()

I'm just copying from

def remove_key_from_stealable(self, ts):
result = self.key_stealable.pop(ts, None)
if result is None:
return
worker, level = result
try:
self.stealable[worker][level].remove(ts)
except KeyError:
pass
try:
self.stealable_all[level].remove(ts)
except KeyError:
pass

and mixing in

elif start == "processing":
ts = self.scheduler.tasks[key]
self.remove_key_from_stealable(ts)
d = self.in_flight.pop(ts, None)
if d:
thief = d["thief"]
victim = d["victim"]
self.in_flight_occupancy[thief] -= d["thief_duration"]
self.in_flight_occupancy[victim] += d["victim_duration"]
if not self.in_flight:
self.in_flight_occupancy.clear()
self._in_flight_event.set()

(but I'm not sure if the second part is necessary, because it would happen anyway when remove_worker transitions any tasks processing on the worker to released.)

I don't know if this is necessary or not.

Copy link
Member Author

@fjetter fjetter Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried something very similar and ended up deleting it again because of complexity.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I'm a little hesitant about not clearing out the state properly, but I guess it's ok.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also #6600 I would like to clarify the discussion on that ticket before we implement any more complex logic for work stealing

@crusaderky
Copy link
Collaborator

There are failing tests

@fjetter
Copy link
Member Author

fjetter commented Jun 16, 2022

I wonder why the FutureWarning didn't fail for me locally. Is there anything I need to enable for this to cause a failure?

distributed/stealing.py Show resolved Hide resolved
distributed/core.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably less important, but could the metrics get thrown off here by stale workers?

if d:
thief = d["thief"]
victim = d["victim"]
self.in_flight_occupancy[thief] -= d["thief_duration"]
self.in_flight_occupancy[victim] += d["victim_duration"]
if not self.in_flight:
self.in_flight_occupancy.clear()
self._in_flight_event.set()

Comment on lines 151 to 152
def remove_worker(self, scheduler: Scheduler, worker: str):
del self.stealable[worker]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about something like (with appropriate KeyError handling)

levels = self.stealable.pop(worker)
for level_i, level_tasks in enumerate(levels):
    for ts in level_tasks:
        self.stealable_all[level_i].remove(ts)
        self.key_stealable.pop(ts, None)
        self.in_flight.pop(ts)  # I don't know??

# Maybe??:
self.in_flight_occupancy.pop(worker)  # this may cause KeyErrors in other places, need to handle
if not self.in_flight:
    self.in_flight_occupancy.clear()
    self._in_flight_event.set()

I'm just copying from

def remove_key_from_stealable(self, ts):
result = self.key_stealable.pop(ts, None)
if result is None:
return
worker, level = result
try:
self.stealable[worker][level].remove(ts)
except KeyError:
pass
try:
self.stealable_all[level].remove(ts)
except KeyError:
pass

and mixing in

elif start == "processing":
ts = self.scheduler.tasks[key]
self.remove_key_from_stealable(ts)
d = self.in_flight.pop(ts, None)
if d:
thief = d["thief"]
victim = d["victim"]
self.in_flight_occupancy[thief] -= d["thief_duration"]
self.in_flight_occupancy[victim] += d["victim_duration"]
if not self.in_flight:
self.in_flight_occupancy.clear()
self._in_flight_event.set()

(but I'm not sure if the second part is necessary, because it would happen anyway when remove_worker transitions any tasks processing on the worker to released.)

I don't know if this is necessary or not.

distributed/stealing.py Show resolved Hide resolved
distributed/tests/test_steal.py Show resolved Hide resolved
distributed/tests/test_steal.py Outdated Show resolved Hide resolved
while len_before == len(s.events["stealing"]):
await asyncio.sleep(0.1)

assert victim_ts.processing_on != wsB
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should it be processing on? Because wsA confirmed the steal request, so wsA has released the task. Stealing called Scheduler.reschedule which transitioned it to released:

def reschedule(self, key=None, worker=None):
"""Reschedule a task
Things may have shifted and this task may now be better suited to run
elsewhere
"""
try:
ts = self.tasks[key]
except KeyError:
logger.warning(
"Attempting to reschedule task {}, which was not "
"found on the scheduler. Aborting reschedule.".format(key)
)
return
if ts.state != "processing":
return
if worker and ts.processing_on.address != worker:
return
self.transitions({key: "released"}, f"reschedule-{time()}")

So I think I'd expect something like assert victim_ts.state == "released". So if reschedule goes through a full transitions cycle, you'd actually expect it to go processing->released->waiting->processing and be assigned to a worker (probably wsB2 since it's idle).

I guess I'd expect something like assert victim_ts.processing_on in (wsA, wsB2) and assert victim_ts.state == "processing". We want to be sure we don't forget about this task, since that would cause a deadlock too.

Copy link
Member Author

@fjetter fjetter Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm asserting that it's not the stale WorkerState. Everything else is beyond the scope of this unit test. We'll reschedule the task and the scheduler can make a decision. I don't care as long as I get my result, i.e. the futures complete.
I prefer not asserting too much here. From a high level, I don't even care at this point that we're going through full rescheduling.

distributed/tests/test_steal.py Outdated Show resolved Hide resolved
@fjetter
Copy link
Member Author

fjetter commented Jun 16, 2022

It's probably less important, but could the metrics get thrown off here by stale workers?

As long as in_flight_occupancy and in_flight track the same stale worker, this update is correct.

@fjetter
Copy link
Member Author

fjetter commented Jun 16, 2022

I removed __eq__ and __hash__ of WorkerState as discussed above. This makes this == other and this is other the same which is, I believe, what we're looking for here.

@fjetter
Copy link
Member Author

fjetter commented Jun 16, 2022

If any other cosmetic changes pop up, feel free to push on this branch so we can merge this asap

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have permission to push to your branch. Could you add

diff --git a/distributed/stealing.py b/distributed/stealing.py
index 2c78704c..30b99175 100644
--- a/distributed/stealing.py
+++ b/distributed/stealing.py
@@ -332,12 +332,12 @@ class WorkStealing(SchedulerPlugin):
                 state in _WORKER_STATE_UNDEFINED
                 # If our steal information is somehow stale we need to reschedule
                 or state in _WORKER_STATE_CONFIRM
-                and thief != self.scheduler.workers.get(thief.address)
+                and thief is not self.scheduler.workers.get(thief.address)
             ):
                 self.log(
                     (
                         "reschedule",
-                        thief.address not in self.scheduler.workers,
+                        thief is not self.scheduler.workers.get(thief.address),
                         *_log_msg,
                     )
                 )

Comment on lines 151 to 152
def remove_worker(self, scheduler: Scheduler, worker: str):
del self.stealable[worker]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I'm a little hesitant about not clearing out the state properly, but I guess it's ok.

distributed/stealing.py Show resolved Hide resolved
@gjoseph92
Copy link
Collaborator

It seems you have some tests that are actually failing, distributed/tests/test_scheduler.py::test_feed at least.

@fjetter fjetter added deadlock The cluster appears to not make any progress stealing labels Jun 20, 2022
if not isinstance(other, WorkerState):
return False
return hash(self) == hash(other)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#6593 instead makes it explicit, and explains why the decision. I think we should agree on one or the other and be consistent.

def __hash__(self) -> int:
    # TODO eplain
    return id(self)

def __eq__(self, other: object) -> bool:
    # TODO explain
    return other is self

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#6593 adds to worker_state_machine.WorkerState.validate_state a check that you can never have two instances of a TaskState with the same key in any of its sets. I think scheduler.SchedulerState.validate_state should have the same logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely convinced that this is necessary. There are many collections and this particular problem was not even a collection on the scheduler itself but rather an extension.
Specifically, without https://github.com/dask/distributed/pull/6585/files#r899011537 this condition would not even be true and I don't think we should implement the full remove_worker cleanup on the stealing extension

distributed/stealing.py Outdated Show resolved Hide resolved
@fjetter
Copy link
Member Author

fjetter commented Jun 21, 2022

The failing test_feed is actually interesting since is performs a cloudpickle roundtrip and asserts that WorkerStates are rountrip-able, i.e. they compare equal after the roundtrip.

If we go for a compare by python ID approach, this is clearly not working any longer. I would even go as far as to say that if we compare by python ID we should not allow any serialization of the object. By extension, the same would be true for TaskState objects.
For a cluster-dump-like or raise exception use case, this would not matter but if we allow for serialization, I could easily see this being used in client<->scheduler or scheduler<->worker comm and then we should ensure this compares properly.

Right now, we're not actually relying on this being serializable. The only place where we're serializing this right now is when we're raising a KilledWorker exception and we're using the special method WorkerState.clean before so we could easily replace this with a dict represenation.

@fjetter fjetter force-pushed the work_stealing_same_ip branch 2 times, most recently from 5ba8296 to ba31256 Compare June 21, 2022 10:30
@fjetter
Copy link
Member Author

fjetter commented Jun 21, 2022

I opted for a different approach and am using the Worker.id / Server.id attribute now to uniquely identify the WorkerState object. This is based on a type 4 UUID such that I have sufficient confidence in it's uniqueness.
I believe this is sufficient to solve the problem discussed since it is primarily introduced because the stealing extension did not properly clean up state or verify that received stealing confirmations belong to the correct worker. Unless we implemented a singletone/multiton metaclass this couldn't have been avoided and this is out of scope for this PR

The scheduler is verifying that addresses are not overwritten so the Scheduler.workers will always be the source of truth.
By changing the hash / eq implementation to use the unique server ID every extension has the possibility to verify whether its state is still correct. I acknowledge that this is not an ideal state but it does fix the deadlock and I would like to move on with this fix.
If there is need for a more sophisticated solution, I suggest to take care of this in a follow up PR

@gjoseph92
Copy link
Collaborator

I kinda dislike the UUID. There simply never should be multiple WorkerState instances that refer to the same logical worker. So adding infrastructure to make this possible seems both unnecessary and possibly encouraging future misuse. If there's a test verifying that WorkerState are still equal after roundtrip pickling, but we don't rely on that behavior anywhere, then we should simply change that test.

diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py
index 82d40fd6..75bcd297 100644
--- a/distributed/tests/test_scheduler.py
+++ b/distributed/tests/test_scheduler.py
@@ -423,14 +423,14 @@ async def test_scheduler_init_pulls_blocked_handlers_from_config(s):
 @gen_cluster()
 async def test_feed(s, a, b):
     def func(scheduler):
-        return dumps(dict(scheduler.workers))
+        return dumps(list(scheduler.workers))
 
     comm = await connect(s.address)
     await comm.write({"op": "feed", "function": dumps(func), "interval": 0.01})
 
     for i in range(5):
         response = await comm.read()
-        expected = dict(s.workers)
+        expected = list(s.workers)
         assert cloudpickle.loads(response) == expected
 
     await comm.close()

@fjetter fjetter force-pushed the work_stealing_same_ip branch from 818d207 to fa090b6 Compare June 22, 2022 08:49
@fjetter
Copy link
Member Author

fjetter commented Jun 22, 2022

I kinda dislike the UUID.

I consider the UUID useful beyond this deadlock fix. It is the only entity that can uniquely identify a server across the entire cluster. It can be logged, it can be shared, it is meaningful outside of the scheduler process. This is also how we're identifying clients inside and outside of the scheduler. Adding the scheduler_id to the register-worker handshake and storing it as part of the WorkerState object makes sense regardless of this deadlock fix.

I would like us to focus on fixing this issue now and not escalate into whether or not certain objects are serializable or roundtrip-able or not. This question popped up a couple of times recently and there doesn't appear to be proper consensus, e.g.

I don't want this conversation blocking the fix of this issue. We can still come back and remove __eq__, __hash__, etc. if we choose to do so.

@crusaderky
Copy link
Collaborator

crusaderky commented Jun 22, 2022

I dislike the UUID for TaskStates. It's a fairly expensive function (3us on my beefed-up host), which would make it unsavory for worker_state_machine.TaskState and a big no-no for scheduler.TaskState.

I think I would be happy to just state "thou cannot round-trip a TaskState from scheduler to worker and back". scheduler.TaskState and worker_state_machine.TaskState are very different and I don't envision a refactor to merge them anyway.

On the other hand, I think that a full pickle dump of the worker_state_machine.WorkerState and everything it contains (except data) would be exceptionally useful - more so than the current YAML one - and something to potentially look into in the short term future.

@crusaderky
Copy link
Collaborator

The current code is prone to hash collisions.
On server.py:512, please change

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, WorkerState):
            return False
        return hash(self) == hash(other)

to

    def __eq__(self, other: object) -> bool:
        return isinstance(other, WorkerState) and other.server_id == self.server_id

If you put this one change in, I'm happy to sign off the PR.

@fjetter
Copy link
Member Author

fjetter commented Jun 22, 2022

I dislike the UUID for TaskStates

Yes. This is a hard No for TaskStates. I'm simply using an already generated UUID in this case. I would like to keep this ID part of the WorkerState even if we decide to forbid any kind of serialization and we start comparing by IDs

If you put this one change in, I'm happy to sign off the PR.

Done. While I'm not overly concerned about hash collisions, it is indeed not a great idea to reuse hash for equal since afaik dicts are using equal to resolve hash collisions...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deadlock The cluster appears to not make any progress stealing
Projects
None yet
3 participants