Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky test_defaults #6753

Closed
gjoseph92 opened this issue Jul 20, 2022 · 1 comment · Fixed by #6822
Closed

Flaky test_defaults #6753

gjoseph92 opened this issue Jul 20, 2022 · 1 comment · Fixed by #6822
Labels
flaky test Intermittent failures on CI.

Comments

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Jul 20, 2022

________________________________ test_defaults _________________________________
ConnectionRefusedError: [Errno 61] Connection refused

The above exception was the direct cause of the following exception:

addr = 'tcp://127.0.0.1:8786', timeout = 5, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:8786'
backend = <distributed.comm.tcp.TCPBackend object at 0x10ef87e80>
connector = <distributed.comm.tcp.TCPConnector object at 0x13f0ae230>
comm = None, time_left = <function connect.<locals>.time_left at 0x13da1d000>
backoff_base = 0.01

    async def connect(
        addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
    ):
        """
        Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
        and yield a ``Comm`` object.  If the connection attempt fails, it is
        retried until the *timeout* is expired.
        """
        if timeout is None:
            timeout = dask.config.get("distributed.comm.timeouts.connect")
        timeout = parse_timedelta(timeout, default="seconds")
    
        scheme, loc = parse_address(addr)
        backend = registry.get_backend(scheme)
        connector = backend.get_connector()
        comm = None
    
        start = time()
    
        def time_left():
            deadline = start + timeout
            return max(0, deadline - time())
    
        backoff_base = 0.01
        attempt = 0
    
        # Prefer multiple small attempts than one long attempt. This should protect
        # primarily from DNS race conditions
        # gh3104, gh4176, gh4167
        intermediate_cap = timeout / 5
        active_exception = None
        while time_left() > 0:
            try:
>               comm = await asyncio.wait_for(
                    connector.connect(loc, deserialize=deserialize, **connection_args),
                    timeout=min(intermediate_cap, time_left()),
                )

distributed/comm/core.py:291: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fut = <Task finished name='Task-16' coro=<BaseTCPConnector.connect() done, defined at /Users/runner/work/distributed/distrib...'in <distributed.comm.tcp.TCPConnector object at 0x13f0ae230>: ConnectionRefusedError: [Errno 61] Connection refused')>
timeout = 0.06839799880981445

    async def wait_for(fut, timeout):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        loop = events.get_running_loop()
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            await _cancel_and_wait(fut, loop=loop)
            try:
                return fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                if fut.done():
                    return fut.result()
                else:
                    fut.remove_done_callback(cb)
                    # We must ensure that the task is not running
                    # after wait_for() returns.
                    # See https://bugs.python.org/issue32751
                    await _cancel_and_wait(fut, loop=loop)
                    raise
    
            if fut.done():
>               return fut.result()

../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <distributed.comm.tcp.TCPConnector object at 0x13f0ae230>
address = '127.0.0.1:8786', deserialize = True
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
ip = '127.0.0.1', port = 8786, kwargs = {}

    async def connect(self, address, deserialize=True, **connection_args):
        self._check_encryption(address, connection_args)
        ip, port = parse_host_port(address)
        kwargs = self._get_connect_args(**connection_args)
    
        try:
            stream = await self.client.connect(
                ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
            )
            # Under certain circumstances tornado will have a closed connnection with an
            # error and not raise a StreamClosedError.
            #
            # This occurs with tornado 5.x and openssl 1.1+
            if stream.closed() and stream.error:
                raise StreamClosedError(stream.error)
    
        except StreamClosedError as e:
            # The socket connect() call failed
>           convert_stream_closed_error(self, e)

distributed/comm/tcp.py:461: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

obj = <distributed.comm.tcp.TCPConnector object at 0x13f0ae230>
exc = ConnectionRefusedError(61, 'Connection refused')

    def convert_stream_closed_error(obj, exc):
        """
        Re-raise StreamClosedError as CommClosedError.
        """
        if exc.real_error is not None:
            # The stream was closed because of an underlying OS error
            exc = exc.real_error
            if isinstance(exc, ssl.SSLError):
                if "UNKNOWN_CA" in exc.reason:
                    raise FatalCommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}")
>           raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
E           distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x13f0ae230>: ConnectionRefusedError: [Errno 61] Connection refused

distributed/comm/tcp.py:142: CommClosedError

The above exception was the direct cause of the following exception:

loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x13fa81b40>
requires_default_ports = None

    def test_defaults(loop, requires_default_ports):
        with popen(["dask-scheduler"]):
    
            async def f():
                # Default behaviour is to listen on all addresses
                await assert_can_connect_from_everywhere_4_6(8786, timeout=5.0)
    
>           with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop) as c:

distributed/cli/tests/test_dask_scheduler.py:49: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:940: in __init__
    self.start(timeout=timeout)
distributed/client.py:1098: in start
    sync(self.loop, self._start, **kwargs)
distributed/utils.py:405: in sync
    raise exc.with_traceback(tb)
distributed/utils.py:378: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/tornado/gen.py:762: in run
    value = future.result()
distributed/client.py:1178: in _start
    await self._ensure_connected(timeout=timeout)
distributed/client.py:1241: in _ensure_connected
    comm = await connect(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

addr = 'tcp://127.0.0.1:8786', timeout = 5, deserialize = True
handshake_overrides = None
connection_args = {'extra_conn_args': {}, 'require_encryption': False, 'ssl_context': None}
scheme = 'tcp', loc = '127.0.0.1:8786'
backend = <distributed.comm.tcp.TCPBackend object at 0x10ef87e80>
connector = <distributed.comm.tcp.TCPConnector object at 0x13f0ae230>
comm = None, time_left = <function connect.<locals>.time_left at 0x13da1d000>
backoff_base = 0.01

    async def connect(
        addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
    ):
        """
        Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
        and yield a ``Comm`` object.  If the connection attempt fails, it is
        retried until the *timeout* is expired.
        """
        if timeout is None:
            timeout = dask.config.get("distributed.comm.timeouts.connect")
        timeout = parse_timedelta(timeout, default="seconds")
    
        scheme, loc = parse_address(addr)
        backend = registry.get_backend(scheme)
        connector = backend.get_connector()
        comm = None
    
        start = time()
    
        def time_left():
            deadline = start + timeout
            return max(0, deadline - time())
    
        backoff_base = 0.01
        attempt = 0
    
        # Prefer multiple small attempts than one long attempt. This should protect
        # primarily from DNS race conditions
        # gh3104, gh4176, gh4167
        intermediate_cap = timeout / 5
        active_exception = None
        while time_left() > 0:
            try:
                comm = await asyncio.wait_for(
                    connector.connect(loc, deserialize=deserialize, **connection_args),
                    timeout=min(intermediate_cap, time_left()),
                )
                break
            except FatalCommClosedError:
                raise
            # Note: CommClosed inherits from OSError
            except (asyncio.TimeoutError, OSError) as exc:
                active_exception = exc
    
                # As descibed above, the intermediate timeout is used to distributed
                # initial, bulk connect attempts homogeneously. In particular with
                # the jitter upon retries we should not be worred about overloading
                # any more DNS servers
                intermediate_cap = timeout
                # FullJitter see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
    
                upper_cap = min(time_left(), backoff_base * (2**attempt))
                backoff = random.uniform(0, upper_cap)
                attempt += 1
                logger.debug(
                    "Could not connect to %s, waiting for %s before retrying", loc, backoff
                )
                await asyncio.sleep(backoff)
        else:
>           raise OSError(
                f"Timed out trying to connect to {addr} after {timeout} s"
            ) from active_exception
E           OSError: Timed out trying to connect to tcp://127.0.0.1:8786 after 5 s

distributed/comm/core.py:317: OSError
----------------------------- Captured stderr call -----------------------------
[2022](https://github.com/dask/distributed/runs/7497695350?check_suite_focus=true#step:11:2023)-07-25 10:12:54,719 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-25 10:12:56,733 - distributed.scheduler - INFO - End scheduler

Aborted!

https://github.com/dask/distributed/runs/7497695350?check_suite_focus=true#step:11:1891

@gjoseph92
Copy link
Collaborator Author

This one is interesting because

  1. We initially got a ConnectionRefusedError: [Errno 61] Connection refused error. Maybe the subprocess hasn't even started yet (since it's stuck in slow cython imports)?
  2. test_defaults is the very first test to run in test_dask_scheduler.py, and the first test to run in the whole not ci1 test suite.
  3. Even when it passes, it sometimes seems to leak 2 file descriptors: https://github.com/dask/distributed/runs/7395221485?check_suite_focus=true#step:11:3019,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant