-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker may message or reconnect to scheduler while worker is closing #6354
Comments
As a corollary: if we go with #6350, we'll need to remove the Because we need Luckily, there are only a couple places where we use
Related to #6363 |
I think this is mostly resolved by removing worker reconnection #6361. The worker may still try to heartbeat the scheduler during this degenerate period, but now the scheduler will just ignore it. distributed/distributed/scheduler.py Lines 3446 to 3449 in d32f4b0
If the worker sends the scheduler a message over batch comms though, it will still be handled. I haven't seen this happen in practice, but I'm sure it could happen. Closing, since it's no effectively a duplicate of #6390 (which still needs to happen). |
There is a race condition where, after
Scheduler.remove_worker
completes and the worker has been removed from the scheduler's perspective, comms with the worker are still open. So:Basically
remove_worker
results in partially-removed state (WorkerState
and other objects are removed, but connections are not).Full trace-through
Copied from #6263 (comment)
close_worker
callsremove_worker
. (It also redundantly enqueues a close message to the worker—remove_worker
is about to do the same—though I don't think this makes a difference here.)remove_worker
does a bunch of stuff, including deleting theBatchedSend
to that worker which has the{op: close}
message queued on it. It does not wait until the BatchedSend queue is flushed.remove_worker
removes theWorkerState
entry for that workerAt this point, the scheduler has removed all knowledge of the worker. However, the
close
message hasn't even been sent to it yet—it's still queued inside a BatchedSend, which may not send the message for up to 5ms more. And even after the message has been sent, thenWorker.close
still has to get scheduled on the worker event loop and run (which could take a while Worker event loop performance hampered by GIL/convoy effect #6325). There are multiple sources of truth for when a worker is gone. In some places, it's whetheraddr in Scheduler.workers
, oraddr in Scheduler.stream_comms
. In others, it's whether a comm to that worker is closed. The entry being removed from the dict and the comm being closed are disjoint events.Thus, between when
Scheduler.remove_worker
ends and all comms to the worker comm actually close, we are in a degenerate state and exposed to race conditions. The scheduler forgets about the worker before closing communications to that worker. Or even confirming that the worker has received the message to close. Explicitly flushing and closing theBatchedSend
would alleviate this, though not resolve it: while waiting for our send side to close, we could still receive messages from the now-removed worker.Simply moving the flush-and-close to the top of
Scheduler.remove_worker
—before anything else happens—I think would fix the problem. Then, we wouldn't be removing state related to the worker until we were guaranteed the worker connection was closed.After
remove_worker
has run, but before the close message actually gets sent over the BatchedSend /Worker.close
starts running, the worker send another heartbeat to the scheduler.From the scheduler's perspective, this worker doesn't exist, so it replies "missing" to the worker.
Though the worker is in state
closing_gracefully
, it still tries to re-register.Worker.batched_stream.start
with a new comm object, even thoughWorker.batched_stream
is already running with the previous comm object. This the double-start bug I pointed out in https://github.com/dask/distributed/pull/6272/files#r866082750. This will swap out thecomm
that theBatchedSend
is using, and launch a secondbackground_send
coroutine. Surprisingly, I think having multiple background_sends racing to process one BatchedSend is still safe, just silly.BatchedSend
has now lost its reference to the original comm. However, oneWorker.handle_scheduler
is still running with that original comm, and now one is running with the new comm too. Since there's still a reference to the old comm, it isn't closed.Scheduler.handle_worker
is still running. Even though, from the scheduler's perspective, the worker it's supposed to be handling doesn't exist anymore. Yay [Discussion] Structured concurrency #6201 — if we had a handle on this coroutine, we could have cancelled it inremove_worker
.The scheduler handles this "new" worker connecting. (This passes through the buggy code of Scheduler worker reconnect drops messages #6341, though I don't think that actually makes a difference in this case.) This is where all of the "Unexpected worker completed task" messages come from, followed by another "Register worker" and "Starting worker compute stream".
Eventually,
Worker.close
actually closes (one of) its batched comms to the scheduler.This triggers a second
remove_worker
on the scheduler, removing the entry for the "new" worker.Scheduler.handle_worker
running for the old comm too. I presume it eventually stops when the worker actually shuts down and severs the connection? When it does, it probably won't runremove_worker
another time, because the address is already gone fromScheduler.stream_comms
.Even if we do #6350 (which will resolve the heartbeat-reconnect issue), we still need to ensure that
remove_worker
closes the connection to the worker before removing other state.I think the easiest way to do that is simply to
await self.stream_comms[address].close()
at the top ofremove_worker
, before removing other state. There are still some race conditions with concurrentremove_worker
calls to work out, though.The text was updated successfully, but these errors were encountered: