Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make ConnectionPool.remove cancel connection attempts #7547

Merged

Conversation

graingert
Copy link
Member

part of #6790 (comment)

  • Tests added / passed
  • Passes pre-commit run --all-files

@github-actions
Copy link
Contributor

github-actions bot commented Feb 15, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       24 files  ±  0         24 suites  ±0   10h 39m 2s ⏱️ - 19m 34s
  3 337 tests +  1    3 235 ✔️ +  6     102 💤  - 1  0  - 4 
39 343 runs  +12  37 490 ✔️ +17  1 853 💤  - 1  0  - 4 

Results for commit 9ddee12. ± Comparison against base commit fef78d4.

♻️ This comment has been updated with latest results.

@graingert graingert force-pushed the connection-pool-remove-cancel-connection-attempts branch from 802b146 to 9ddee12 Compare February 16, 2023 14:08

rpc = await ConnectionPool(limit=1)

with mock.patch("distributed.core.connect", connect):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of mocking, you could also just connect to a server with either a very slow or blocked listener or one that is not replying on the handshake

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought mocking was cleaner here, I'll look into using a slow server

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also the other tests monkeypatch with monkeypatch.setitem(backends, "tcp", SlowBackend()), so I still think using mock.patch on the connect function is cleanest here

Copy link
Member Author

@graingert graingert Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when testing this:

@gen_test()
async def test_remove_cancels_connect_attempts():
    loop = asyncio.get_running_loop()
    connect_started = asyncio.Event()
    connect_finished = asyncio.Event()

    class BrokenHandshakeListener(TCPListener):
        async def on_connection(self, comm):
            try:
                connect_started.set()
                await comm.read(1)
            finally:
                connect_finished.set()

    async with BrokenHandshakeListener(
        address="tcp://",
        comm_handler=lambda: None,
    ) as listener:
        rpc = await ConnectionPool(limit=1)

        async def connect_to_server():
            with pytest.raises(CommClosedError, match="Address removed."):
                await rpc.connect(listener.contact_address)

        async def remove_address():
            await connect_started.wait()
            rpc.remove(listener.contact_address)

        await asyncio.gather(
            connect_to_server(),
            remove_address(),
        )

        await connect_finished.wait()

I hit a race condition in this asyncio.wait_for where the cancellation is ignored:

comm = await asyncio.wait_for(
connector.connect(loc, deserialize=deserialize, **connection_args),
timeout=min(intermediate_cap, time_left()),
)
break

https://github.com/python/cpython/blob/924a3bfa28578802eb9ca77a66fb5d4762a62f14/Lib/asyncio/tasks.py#L472

this is because the connector.connect and on_connection tasks resume in the same even loop cycle and so the cancellation arrives just as the connect() -> asyncio.wait_for() coroutine is about to be resumed, this is very unlikely in production because the code will be waiting in comm.read() or asyncio.sleep(backoff) and can be resolved in the test by adding an asyncio.sleep(0.5) before calling rpc.remove():

        async def remove_address():
            await connect_started.wait()
            await asyncio.sleep(0.5)  # avoid issuing a .cancel() after connect but before comm.read()
            rpc.remove(listener.contact_address)

or do you think it's best to leave this with a mocked connect function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should come back to this once #7571 is done. Triggering this edge case is interesting and I believe we've encountered this a couple of times in the past (in CI).

No need for any action on this PR.

@graingert graingert marked this pull request as ready for review February 16, 2023 16:00

rpc = await ConnectionPool(limit=1)

with mock.patch("distributed.core.connect", connect):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should come back to this once #7571 is done. Triggering this edge case is interesting and I believe we've encountered this a couple of times in the past (in CI).

No need for any action on this PR.

@fjetter fjetter merged commit 87b799b into dask:main Feb 22, 2023
@graingert graingert deleted the connection-pool-remove-cancel-connection-attempts branch February 22, 2023 14:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants