Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker may message or reconnect to scheduler while worker is closing #6354

Closed
gjoseph92 opened this issue May 17, 2022 · 2 comments
Closed
Labels
bug Something is broken stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@gjoseph92
Copy link
Collaborator

There is a race condition where, after Scheduler.remove_worker completes and the worker has been removed from the scheduler's perspective, comms with the worker are still open. So:

  • Subsequent heartbeats from the worker cause the scheduler to treat it as a new worker connecting
  • Any messages might send will still be processed, and could cause errors (if handlers expect the worker to exist, and it doesn't).

Basically remove_worker results in partially-removed state (WorkerState and other objects are removed, but connections are not).

Full trace-through

Copied from #6263 (comment)

  1. close_worker calls remove_worker. (It also redundantly enqueues a close message to the workerremove_worker is about to do the same—though I don't think this makes a difference here.)

  2. remove_worker does a bunch of stuff, including deleting the BatchedSend to that worker which has the {op: close} message queued on it. It does not wait until the BatchedSend queue is flushed.

  3. remove_worker removes the WorkerState entry for that worker

  4. At this point, the scheduler has removed all knowledge of the worker. However, the close message hasn't even been sent to it yet—it's still queued inside a BatchedSend, which may not send the message for up to 5ms more. And even after the message has been sent, then Worker.close still has to get scheduled on the worker event loop and run (which could take a while Worker event loop performance hampered by GIL/convoy effect #6325). There are multiple sources of truth for when a worker is gone. In some places, it's whether addr in Scheduler.workers, or addr in Scheduler.stream_comms. In others, it's whether a comm to that worker is closed. The entry being removed from the dict and the comm being closed are disjoint events.

    Thus, between when Scheduler.remove_worker ends and all comms to the worker comm actually close, we are in a degenerate state and exposed to race conditions. The scheduler forgets about the worker before closing communications to that worker. Or even confirming that the worker has received the message to close. Explicitly flushing and closing the BatchedSend would alleviate this, though not resolve it: while waiting for our send side to close, we could still receive messages from the now-removed worker.

    Simply moving the flush-and-close to the top of Scheduler.remove_worker—before anything else happens—I think would fix the problem. Then, we wouldn't be removing state related to the worker until we were guaranteed the worker connection was closed.

  5. After remove_worker has run, but before the close message actually gets sent over the BatchedSend / Worker.close starts running, the worker send another heartbeat to the scheduler.

  6. From the scheduler's perspective, this worker doesn't exist, so it replies "missing" to the worker.

  7. Though the worker is in state closing_gracefully, it still tries to re-register.

    1. Fun fact (not relevant to this, but yet another broken thing in BatchedSend land): this is going to call Worker.batched_stream.start with a new comm object, even though Worker.batched_stream is already running with the previous comm object. This the double-start bug I pointed out in https://github.com/dask/distributed/pull/6272/files#r866082750. This will swap out the comm that the BatchedSend is using, and launch a second background_send coroutine. Surprisingly, I think having multiple background_sends racing to process one BatchedSend is still safe, just silly.
    2. The BatchedSend has now lost its reference to the original comm. However, one Worker.handle_scheduler is still running with that original comm, and now one is running with the new comm too. Since there's still a reference to the old comm, it isn't closed.
    3. Because the worker's old comm wasn't closed, the old Scheduler.handle_worker is still running. Even though, from the scheduler's perspective, the worker it's supposed to be handling doesn't exist anymore. Yay [Discussion] Structured concurrency #6201 — if we had a handle on this coroutine, we could have cancelled it in remove_worker.
  8. The scheduler handles this "new" worker connecting. (This passes through the buggy code of Scheduler worker reconnect drops messages #6341, though I don't think that actually makes a difference in this case.) This is where all of the "Unexpected worker completed task" messages come from, followed by another "Register worker" and "Starting worker compute stream".

  9. Eventually, Worker.close actually closes (one of) its batched comms to the scheduler.

  10. This triggers a second remove_worker on the scheduler, removing the entry for the "new" worker.

    1. There's probably still a Scheduler.handle_worker running for the old comm too. I presume it eventually stops when the worker actually shuts down and severs the connection? When it does, it probably won't run remove_worker another time, because the address is already gone from Scheduler.stream_comms.

Even if we do #6350 (which will resolve the heartbeat-reconnect issue), we still need to ensure that remove_worker closes the connection to the worker before removing other state.

I think the easiest way to do that is simply to await self.stream_comms[address].close() at the top of remove_worker, before removing other state. There are still some race conditions with concurrent remove_worker calls to work out, though.

@gjoseph92 gjoseph92 added bug Something is broken stability Issue or feature related to cluster stability (e.g. deadlock) labels May 17, 2022
@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented May 18, 2022

As a corollary: if we go with #6350, we'll need to remove the close=False option from Scheduler.remove_worker. Having a comm still open to a worker, but not having the worker in self.workers, is exactly the sort of half-removed state we're going to avoid here.

Because we need remove_worker to always close the comm, and with #6350, the worker will always shut itself down when the comm is closed, there's no point in a close=False option—whether or not we send the {"op": "close"} message, the worker is going to shut down.

Luckily, there are only a couple places where we use close=False:

  • gather, which is already handled by cada973
  • restart (which can easily be changed, just nanny-related)

Related to #6363

@gjoseph92
Copy link
Collaborator Author

I think this is mostly resolved by removing worker reconnection #6361. The worker may still try to heartbeat the scheduler during this degenerate period, but now the scheduler will just ignore it.

ws = self.workers.get(address)
if ws is None:
logger.warning(f"Received heartbeat from unregistered worker {address!r}.")
return {"status": "missing"}

If the worker sends the scheduler a message over batch comms though, it will still be handled. I haven't seen this happen in practice, but I'm sure it could happen.

Closing, since it's no effectively a duplicate of #6390 (which still needs to happen).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
Development

No branches or pull requests

1 participant