-
-
Notifications
You must be signed in to change notification settings - Fork 725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
make ConnectionPool.remove cancel connection attempts #7547
make ConnectionPool.remove cancel connection attempts #7547
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 24 files ± 0 24 suites ±0 10h 39m 2s ⏱️ - 19m 34s Results for commit 9ddee12. ± Comparison against base commit fef78d4. ♻️ This comment has been updated with latest results. |
802b146
to
9ddee12
Compare
|
||
rpc = await ConnectionPool(limit=1) | ||
|
||
with mock.patch("distributed.core.connect", connect): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of mocking, you could also just connect to a server with either a very slow or blocked listener or one that is not replying on the handshake
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought mocking was cleaner here, I'll look into using a slow server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also the other tests monkeypatch with monkeypatch.setitem(backends, "tcp", SlowBackend())
, so I still think using mock.patch
on the connect
function is cleanest here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when testing this:
@gen_test()
async def test_remove_cancels_connect_attempts():
loop = asyncio.get_running_loop()
connect_started = asyncio.Event()
connect_finished = asyncio.Event()
class BrokenHandshakeListener(TCPListener):
async def on_connection(self, comm):
try:
connect_started.set()
await comm.read(1)
finally:
connect_finished.set()
async with BrokenHandshakeListener(
address="tcp://",
comm_handler=lambda: None,
) as listener:
rpc = await ConnectionPool(limit=1)
async def connect_to_server():
with pytest.raises(CommClosedError, match="Address removed."):
await rpc.connect(listener.contact_address)
async def remove_address():
await connect_started.wait()
rpc.remove(listener.contact_address)
await asyncio.gather(
connect_to_server(),
remove_address(),
)
await connect_finished.wait()
I hit a race condition in this asyncio.wait_for where the cancellation is ignored:
distributed/distributed/comm/core.py
Lines 291 to 295 in 9ddee12
comm = await asyncio.wait_for( | |
connector.connect(loc, deserialize=deserialize, **connection_args), | |
timeout=min(intermediate_cap, time_left()), | |
) | |
break |
https://github.com/python/cpython/blob/924a3bfa28578802eb9ca77a66fb5d4762a62f14/Lib/asyncio/tasks.py#L472
this is because the connector.connect
and on_connection
tasks resume in the same even loop cycle and so the cancellation arrives just as the connect() -> asyncio.wait_for()
coroutine is about to be resumed, this is very unlikely in production because the code will be waiting in comm.read()
or asyncio.sleep(backoff)
and can be resolved in the test by adding an asyncio.sleep(0.5)
before calling rpc.remove()
:
async def remove_address():
await connect_started.wait()
await asyncio.sleep(0.5) # avoid issuing a .cancel() after connect but before comm.read()
rpc.remove(listener.contact_address)
or do you think it's best to leave this with a mocked connect function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should come back to this once #7571 is done. Triggering this edge case is interesting and I believe we've encountered this a couple of times in the past (in CI).
No need for any action on this PR.
|
||
rpc = await ConnectionPool(limit=1) | ||
|
||
with mock.patch("distributed.core.connect", connect): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should come back to this once #7571 is done. Triggering this edge case is interesting and I believe we've encountered this a couple of times in the past (in CI).
No need for any action on this PR.
part of #6790 (comment)
pre-commit run --all-files