-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove report and safe from Worker.close #6363
Conversation
Flaky tests due to |
Edit: Got it I am struggling to write a proper reproducer for #6320 A simple way of reproducing this w/out any patching is to provoke some kind of exception during startup, e.g.
|
1259504
to
9e343f0
Compare
@pentschev would you be able to look at this/test it? |
Thanks @fjetter for working on this fix. I tested this with the original test I mentioned and that fixed the issue. However, we have a similar one for import pytest
from dask.distributed import LocalCluster, Nanny
class MyPlugin:
def setup(self, worker=None):
import my_nonexistent_library # noqa
# Intentionally not using @gen_test to skip cleanup checks
async def test_localcluster_start_exception():
class MyCluster(LocalCluster):
def __init__(self):
super().__init__(
n_workers=0,
asynchronous=True,
)
self.scale(3)
def new_worker_spec(self):
i = len(self.worker_spec)
return {i: {"cls": Nanny, "options": {"plugins": {MyPlugin()}}}}
with pytest.raises(ModuleNotFoundError):
await MyCluster() |
One pattern is following the async def __aexit__(self, *exc_info) -> None:
if exc_info == (None, None, None):
await self.close()
else:
self.abort() |
@pentschev the test you provided does not hang for me, it just fails because it is expecting the wrong exception. There is a change in behavior in that this will no longer raise an Adjusting your example using the new import pytest
from dask.distributed import LocalCluster, Nanny
class MyPlugin:
def setup(self, worker=None):
import my_nonexistent_library # noqa
# Intentionally not using @gen_test to skip cleanup checks
@pytest.mark.asyncio
async def test_localcluster_start_exception():
class MyCluster(LocalCluster):
def __init__(self):
super().__init__(
n_workers=0,
asynchronous=True,
)
self.scale(3)
def new_worker_spec(self):
i = len(self.worker_spec)
return {i: {"cls": Nanny, "options": {"plugins": {MyPlugin()}}}}
with raises_with_cause(RuntimeError, None, ImportError, None):
async with MyCluster():
return |
859e196
to
4e491e3
Compare
I'll try to get the test in as well but I don't like it leaking resources. I'll have a quick look if this can be resolved easily |
async def test_localcluster_start_exception(): | ||
with raises_with_cause(RuntimeError, None, ImportError, None): | ||
async with LocalCluster( | ||
plugins={MyPlugin()}, | ||
): | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a simplified version of the LocalCluster test reproducer. The subclassing suggested in #6363 (comment) is raising an exception during init which avoids us going through proper cleanup by the contextmanager, i.e. resources are never closed. The interesting bit, however, is the plugin for the workers which can be just passed through as a kwarg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @pentschev
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW failure scenarios for worker startup in a SpecCluster is a bit messy, see #5919
|
||
@gen_test( | ||
clean_kwargs={ | ||
# FIXME: This doesn't close the LoopRunner properly, leaving a thread around |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@graingert this is leaving the thread of a loop runner alive. I didn't want to mess with this but it may interest you
@quasiben @pentschev Can somebody confirm if this is works for you? |
Both ubu failures are #6395 |
FWIW I feel comfortable merging this without successful OSX builds in case this is blocking our release procedure |
@quasiben @pentschev @jakirkham could you provide some feedback on if this PR is good to go? My plan is to merge this in a bit and then release |
As of dask/distributed#6363, there is a change in behavior on how plugin errors are raised.
Yes, this seems to work. I opened rapidsai/dask-cuda#914 to address this change in Dask-CUDA. |
Great, thanks for confirming @pentschev. I'll merge this in after CI finishes so this is included in the release today |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few concerns about this. I've marked the most urgent ones with
comm.send({"op": "close", "report": False}) | ||
# This closes the Worker and ensures that if a Nanny is around, | ||
# it is closed as well | ||
comm.send({"op": "terminate"}) | ||
comm.send({"op": "close-stream"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This close-stream
is unnecessary. Worker.close
will close the stream itself:
distributed/distributed/worker.py
Lines 1543 to 1545 in 33fc50c
if self.batched_stream: | |
with suppress(TimeoutError): | |
await self.batched_stream.close(timedelta(seconds=timeout)) |
Doesn't hurt to leave it though if you want to play it safe.
@@ -3399,15 +3386,16 @@ async def close(self, fast=False, close_workers=False): | |||
logger.info("Scheduler closing all comms") | |||
|
|||
futures = [] | |||
for w, comm in list(self.stream_comms.items()): | |||
for _, comm in list(self.stream_comms.items()): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of scope, but I'm curious why we don't just use Scheduler.remove_worker
here. It's going to run anyway when the stream comms close:
distributed/distributed/scheduler.py
Lines 4826 to 4831 in 33fc50c
try: | |
await self.handle_stream(comm=comm, extra={"worker": worker}) | |
finally: | |
if worker in self.stream_comms: | |
worker_comm.abort() | |
await self.remove_worker(address=worker, stimulus_id=stimulus_id) |
When we fix up remove_worker
to handle closing the comms, and to work properly concurrently #6390, we'll probably want to have exactly one way to close and remove a worker, and reuse it here.
elif ws.status == Status.paused: | ||
self.running.remove(ws) | ||
elif ws.status == Status.closing: | ||
await self.remove_worker( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove_worker(..., close=False)
#6354, though they're lessened by having removed reconnect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW once we're in this state, the worker is already in Worker.close
. Worker.close will eventually close the stream once it is done
I am very certain that I added this code path to ensure tests pass. I believe it was connected to a test in deploy but I do not recall what the problem was exactly.
I remember it being connected to removing the worker from self.running
w/out properly removing everything else.
@@ -1704,6 +1707,7 @@ def check_thread_leak(): | |||
|
|||
bad_thread = bad_threads[0] | |||
call_stacks = profile.call_stack(sys._current_frames()[bad_thread.ident]) | |||
breakpoint() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
breakpoint() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except OSError as e: | ||
# Scheduler is gone. Respect distributed.comm.timeouts.connect | ||
if "Timed out trying to connect" in str(e): | ||
logger.info("Timed out while trying to connect during heartbeat") | ||
await self.close(report=False) | ||
await self.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await self.close() | |
await self.terminate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, it's not an infinite restart loop (just because Worker._register_with_scheduler
will actually never give up trying to connect to the scheduler), but it does mean the worker will restart if the scheduler crashes and wait forever for it to show up.
In 3 terminals, run:
dask-scheduler & echo $!
then
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1s" dask-worker localhost:8786 --nanny
then
sudo kill -9 <scheduler PID printed in first terminal>
The scheduler will die without sending the terminate
signal to workers. The worker will close, the nanny will restart it, and it will then loop forever trying to connect to the nonexistent scheduler.
Is this acceptable? You could argue that if the scheduler crashes, maybe something should bring a new one back up, so it makes sense to restart. But that's assuming a lot about your deployment system, and it should probably be the deployment system's job to do that restarting. To me this is a regression that could lead to wasted resources for anyone counting on the current behavior if the scheduler crashes.
ws = self.workers[worker] | ||
self.worker_send(worker, {"op": "close", "nanny": bool(ws.nanny)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ws = self.workers[worker] | |
self.worker_send(worker, {"op": "close", "nanny": bool(ws.nanny)}) | |
self.worker_send(worker, {"op": "terminate"}) |
The worker can figure out what to do with the nanny on its own.
I'd like to use the new semantics and interfaces properly. terminate
and close
are similar-sounding words, but they now mean very different things. terminate
means actually shut down. close
means restart if there's a nanny, otherwise shut down.
I don't think the naming of these is clear enough, but what we want to do here, semantically, is terminate
, so we should use that interface directly.
@@ -4183,7 +4171,7 @@ async def remove_worker(self, address, stimulus_id, safe=False, close=True): | |||
logger.info("Remove worker %s", ws) | |||
if close: | |||
with suppress(AttributeError, CommClosedError): | |||
self.stream_comms[address].send({"op": "close", "report": False}) | |||
self.stream_comms[address].send({"op": "close"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scheduler.remove_worker
is called in many places. You could argue that most of them warrant a restart, since it's in response to a comm error or something, but are we sure about that?
Here at least, it doesn't (though this case should be changed anyway #6227):
distributed/distributed/scheduler.py
Lines 6127 to 6130 in 33fc50c
if remove: | |
await self.remove_worker( | |
address=ws.address, safe=True, stimulus_id=stimulus_id | |
) |
@@ -1219,7 +1220,7 @@ async def heartbeat(self): | |||
logger.error( | |||
f"Scheduler was unaware of this worker {self.address!r}. Shutting down." | |||
) | |||
await self.close(report=False) | |||
await self.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the following self.close()
and self.close()
in handle_scheduler
effectively do #6387. I wanted to do that a bit more carefully and add tests.
self, | ||
timeout=30, | ||
executor_wait=True, | ||
nanny=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
close
is being used, without passing nanny=True
explicitly (both in worker.py
and in scheduler.py
) and I am not convinced that they're all situations in which the worker process should be restarted if there's a nanny around.
I'm a little worried about this exacerbating problems with Scheduler.remove_worker
#6390, but I'm most concerned about verifying that it doesn't create an infinite restart loop.
JFYI it sounds like we are delaying the release until the review comments above are addressed |
As of dask/distributed#6363, there is a change in behavior on how plugin errors are raised. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: #914
report
flag and the unusedsafe
flag fromWorker.close
.The report flag is a recurring problem and from what I understand the reason for #6320 (comment)
Reporting is bad because it opens a dedicated RPC connection which may no longer be possible which then blocks for
timeouts.connect
seconds.I am not entirely convinced that reporting itself is even necessary outside of unit tests. Regardless, we do have an implicit stream report built in by now via the worker status update call.
close_workers
flag fromScheduler.close
Regardless of the close_workers flag, Scheduler.close always terminates all connected workers and their nannies assuming there is a stream_comm still around. Given the assumption that
set(stream_comms) == set(self.workers)
this flag is redundant and only triggers multiple close requests on the remotesCloses #6320
Note: The way we're using
close
is a bit confusing. The base classServer
is using close but all subclasses overwrite this and introduce other signatures. The defaults for these signatures is not always the "robust" way. For instance, reporting by default can block unnecessarily which is something we should avoid. I don't think we should have all these toggles in there. We are likely looking for afast
orforce
flag but not for many individuals like (wait_on_executor, report, close_workers, remove, etc.). Introducing afast
orforce
flag everywhere would be my preference but I'm trying to limit the scope of this PR.TODO
dask-worker
process remains alive afterNanny
exception onplugins=
#6320