From 2adffa9d4e36e48946e65de161f9048690bbe1dc Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Sat, 13 Nov 2021 10:53:20 +0100 Subject: [PATCH 01/19] helper to probe sshd MaxSessions --- setup.py | 1 + tardis/utilities/executors/sshexecutor.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/setup.py b/setup.py index 86f0705d..db2183a9 100644 --- a/setup.py +++ b/setup.py @@ -77,6 +77,7 @@ def get_cryptography_version(): "aioprometheus>=21.9.0", "kubernetes_asyncio", "pydantic", + "asyncstdlib", ], extras_require={ "docs": [ diff --git a/tardis/utilities/executors/sshexecutor.py b/tardis/utilities/executors/sshexecutor.py index 3003b0e7..82b7883a 100644 --- a/tardis/utilities/executors/sshexecutor.py +++ b/tardis/utilities/executors/sshexecutor.py @@ -5,6 +5,22 @@ import asyncio import asyncssh +import asyncstdlib as a + + +async def probe_max_session(connection: asyncssh.SSHClientConnection): + """ + Probe the sshd `MaxSessions`, i.e. the multiplexing limit per connection + """ + sessions = 0 + async with a.ExitStack() as aes: + try: + while True: + await aes.enter_context(await connection.create_process()) + sessions += 1 + except asyncssh.ChannelOpenError: + pass + return sessions @enable_yaml_load("!SSHExecutor") From e4d88b420e362a86afd7e143c7a458e6f6c20d24 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Sat, 13 Nov 2021 11:08:36 +0100 Subject: [PATCH 02/19] ensure connection to reset is the active one --- tardis/utilities/executors/sshexecutor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tardis/utilities/executors/sshexecutor.py b/tardis/utilities/executors/sshexecutor.py index 82b7883a..37f17123 100644 --- a/tardis/utilities/executors/sshexecutor.py +++ b/tardis/utilities/executors/sshexecutor.py @@ -54,6 +54,7 @@ async def ssh_connection(self): @property def lock(self): + """Lock protecting the connection""" # Create lock once tardis event loop is running. # To avoid got Future attached to a different loop exception if self._lock is None: @@ -75,8 +76,10 @@ async def run_command(self, command, stdin_input=None): stderr=pe.stderr, ) from pe except asyncssh.ChannelOpenError as coe: - # Broken connection will be replaced by a new connection during next call - self._ssh_connection = None + # clear broken connection to be replaced by a new connection during next run + async with self.lock: + if ssh_connection is self._ssh_connection: + self._ssh_connection = None raise CommandExecutionFailure( message=f"Could not run command {command} due to SSH failure: {coe}", exit_code=255, From 3500ccfb3f7bc0b720900d87db8c27ba43ba4c59 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Sat, 13 Nov 2021 11:29:02 +0100 Subject: [PATCH 03/19] ssh connection is bounded to MaxSessions --- tardis/utilities/executors/sshexecutor.py | 81 ++++++++++++++--------- 1 file changed, 49 insertions(+), 32 deletions(-) diff --git a/tardis/utilities/executors/sshexecutor.py b/tardis/utilities/executors/sshexecutor.py index 37f17123..e4cfab06 100644 --- a/tardis/utilities/executors/sshexecutor.py +++ b/tardis/utilities/executors/sshexecutor.py @@ -1,3 +1,4 @@ +from typing import Optional from ...configuration.utilities import enable_yaml_load from ...exceptions.executorexceptions import CommandExecutionFailure from ...interfaces.executor import Executor @@ -27,7 +28,10 @@ async def probe_max_session(connection: asyncssh.SSHClientConnection): class SSHExecutor(Executor): def __init__(self, **parameters): self._parameters = parameters - self._ssh_connection = None + # the current SSH connection or None if it must be (re-)established + self._ssh_connection: Optional[asyncssh.SSHClientConnection] = None + # the bound on MaxSession running concurrently + self._session_bound: Optional[asyncio.Semaphore] = None self._lock = None async def _establish_connection(self): @@ -44,13 +48,26 @@ async def _establish_connection(self): return await asyncssh.connect(**self._parameters) @property - async def ssh_connection(self): + @a.contextmanager + async def bounded_connection(self): + """ + Get the current connection with a single reserved session slot + + This is a context manager that guards the current + :py:class:`~asyncssh.SSHClientConnection` + so that only `MaxSessions` commands run at once. + """ if self._ssh_connection is None: async with self.lock: - # check that connection has not yet been initialize in a different task + # check that connection has not been initialized in a different task while self._ssh_connection is None: self._ssh_connection = await self._establish_connection() - return self._ssh_connection + max_session = await probe_max_session(self._ssh_connection) + self._session_bound = asyncio.Semaphore(value=max_session) + assert self._ssh_connection is not None + assert self._session_bound is not None + async with self._session_bound: + yield self._ssh_connection @property def lock(self): @@ -62,33 +79,33 @@ def lock(self): return self._lock async def run_command(self, command, stdin_input=None): - ssh_connection = await self.ssh_connection - try: - response = await ssh_connection.run( - command, check=True, input=stdin_input and stdin_input.encode() - ) - except asyncssh.ProcessError as pe: - raise CommandExecutionFailure( - message=f"Run command {command} via SSHExecutor failed", - exit_code=pe.exit_status, - stdin=stdin_input, - stdout=pe.stdout, - stderr=pe.stderr, - ) from pe - except asyncssh.ChannelOpenError as coe: - # clear broken connection to be replaced by a new connection during next run - async with self.lock: + async with self.bounded_connection as ssh_connection: + try: + response = await ssh_connection.run( + command, check=True, input=stdin_input and stdin_input.encode() + ) + except asyncssh.ProcessError as pe: + raise CommandExecutionFailure( + message=f"Run command {command} via SSHExecutor failed", + exit_code=pe.exit_status, + stdin=stdin_input, + stdout=pe.stdout, + stderr=pe.stderr, + ) from pe + except asyncssh.ChannelOpenError as coe: + # clear broken connection to get it replaced + # by a new connection during next command if ssh_connection is self._ssh_connection: self._ssh_connection = None - raise CommandExecutionFailure( - message=f"Could not run command {command} due to SSH failure: {coe}", - exit_code=255, - stdout="", - stderr="SSH Broken Connection", - ) from coe - else: - return AttributeDict( - stdout=response.stdout, - stderr=response.stderr, - exit_code=response.exit_status, - ) + raise CommandExecutionFailure( + message=f"Could not run command {command} due to SSH failure: {coe}", + exit_code=255, + stdout="", + stderr="SSH Broken Connection", + ) from coe + else: + return AttributeDict( + stdout=response.stdout, + stderr=response.stderr, + exit_code=response.exit_status, + ) From 4204a0115ce9a879bbd8d1680c898c06e82b2ac0 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Sun, 14 Nov 2021 08:53:48 +0100 Subject: [PATCH 04/19] Faking MaxSessions for SSH MickConnection --- .../executors_t/test_sshexecutor.py | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index 37a44769..b1398e62 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -10,18 +10,32 @@ import asyncio import yaml +import contextlib class MockConnection(object): - def __init__(self, exception=None, **kwargs): + def __init__(self, exception=None, __max_sessions=10, **kwargs): self.exception = exception and exception(**kwargs) + self.max_sessions = __max_sessions + self.current_sessions = 0 + + @contextlib.contextmanager + def _multiplex_session(self): + if self.current_sessions >= self.max_sessions: + raise ChannelOpenError(code=2, reason='open failed') + self.current_sessions += 1 + try: + yield + finally: + self.current_sessions -= 1 async def run(self, command, input=None, **kwargs): - if self.exception: - raise self.exception - return AttributeDict( - stdout=input and input.decode(), stderr="TestError", exit_status=0 - ) + with self._multiplex_session: + if self.exception: + raise self.exception + return AttributeDict( + stdout=input and input.decode(), stderr="TestError", exit_status=0 + ) class TestSSHExecutor(TestCase): From 8d82e591e7a4b524f98095be5680897d469e9d4b Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Sun, 14 Nov 2021 08:58:23 +0100 Subject: [PATCH 05/19] testing process creation --- tests/utilities_t/executors_t/test_sshexecutor.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index b1398e62..d32d9c3d 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -11,6 +11,7 @@ import asyncio import yaml import contextlib +import asyncstdlib as a class MockConnection(object): @@ -22,7 +23,7 @@ def __init__(self, exception=None, __max_sessions=10, **kwargs): @contextlib.contextmanager def _multiplex_session(self): if self.current_sessions >= self.max_sessions: - raise ChannelOpenError(code=2, reason='open failed') + raise ChannelOpenError(code=2, reason="open failed") self.current_sessions += 1 try: yield @@ -30,13 +31,21 @@ def _multiplex_session(self): self.current_sessions -= 1 async def run(self, command, input=None, **kwargs): - with self._multiplex_session: + with self._multiplex_session(): if self.exception: raise self.exception return AttributeDict( stdout=input and input.decode(), stderr="TestError", exit_status=0 ) + async def create_process(self): + @a.contextmanager + async def fake_process(): + with self._multiplex_session(): + yield + + return fake_process() + class TestSSHExecutor(TestCase): mock_asyncssh = None From d87da2389f2b6406a5ba553509c69d240226a2d8 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Sun, 14 Nov 2021 09:05:31 +0100 Subject: [PATCH 06/19] adjusted connection test to bounded interface --- tests/utilities_t/executors_t/test_sshexecutor.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index d32d9c3d..4fb8f022 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -103,18 +103,16 @@ def test_establish_connection(self): self.mock_asyncssh.connect.side_effect = None def test_connection_property(self): - async def helper_coroutine(): - return await self.executor.ssh_connection + async def force_connection(): + async with self.executor.bounded_connection as connection: + return connection self.assertIsNone(self.executor._ssh_connection) - run_async(helper_coroutine) - + run_async(force_connection) self.assertIsInstance(self.executor._ssh_connection, MockConnection) - current_ssh_connection = self.executor._ssh_connection - - run_async(helper_coroutine) - + run_async(force_connection) + # make sure the connection is not needlessly replaced self.assertEqual(self.executor._ssh_connection, current_ssh_connection) def test_lock(self): From e071fc7cd37349c0e9fc43db98ed7b2cb8e7abcd Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Sun, 14 Nov 2021 09:10:35 +0100 Subject: [PATCH 07/19] line length --- tardis/utilities/executors/sshexecutor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tardis/utilities/executors/sshexecutor.py b/tardis/utilities/executors/sshexecutor.py index e4cfab06..23f1bb79 100644 --- a/tardis/utilities/executors/sshexecutor.py +++ b/tardis/utilities/executors/sshexecutor.py @@ -98,7 +98,9 @@ async def run_command(self, command, stdin_input=None): if ssh_connection is self._ssh_connection: self._ssh_connection = None raise CommandExecutionFailure( - message=f"Could not run command {command} due to SSH failure: {coe}", + message=( + f"Could not run command {command} due to SSH failure: {coe}" + ), exit_code=255, stdout="", stderr="SSH Broken Connection", From 15ce362027174902e38c1092df3822095841f771 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Sun, 14 Nov 2021 09:34:16 +0100 Subject: [PATCH 08/19] force session and bound to remain in sync --- tardis/utilities/executors/sshexecutor.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tardis/utilities/executors/sshexecutor.py b/tardis/utilities/executors/sshexecutor.py index 23f1bb79..23725938 100644 --- a/tardis/utilities/executors/sshexecutor.py +++ b/tardis/utilities/executors/sshexecutor.py @@ -66,8 +66,9 @@ async def bounded_connection(self): self._session_bound = asyncio.Semaphore(value=max_session) assert self._ssh_connection is not None assert self._session_bound is not None - async with self._session_bound: - yield self._ssh_connection + bound, session = self._session_bound, self._ssh_connection + async with bound: + yield session @property def lock(self): From e99fa9a7ac68650b0d3ad00b2932e1092bc3cf89 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Mon, 15 Nov 2021 08:56:20 +0100 Subject: [PATCH 09/19] added change fragment --- docs/source/changes/218.respect_ssh_maxsessions.yaml | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 docs/source/changes/218.respect_ssh_maxsessions.yaml diff --git a/docs/source/changes/218.respect_ssh_maxsessions.yaml b/docs/source/changes/218.respect_ssh_maxsessions.yaml new file mode 100644 index 00000000..d0bb08db --- /dev/null +++ b/docs/source/changes/218.respect_ssh_maxsessions.yaml @@ -0,0 +1,10 @@ +category: changed +summary: "SSHExecutor respects the remote MaxSessions via queueing" +description: | + The SSHExecutor now is aware of sshd MaxSessions, which is a limit on the concurrent + operations per connection. If more operations are to be run at once, operations are + queued until a session becomes available. +pull requests: +- 218 +issues: +- 217 From 3268dfa2ce12cef09e83e8b86c0042062e4b03cc Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 18 Nov 2021 09:30:41 +0100 Subject: [PATCH 10/19] add explanation as comment --- tardis/utilities/executors/sshexecutor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tardis/utilities/executors/sshexecutor.py b/tardis/utilities/executors/sshexecutor.py index 23725938..67aa8bf7 100644 --- a/tardis/utilities/executors/sshexecutor.py +++ b/tardis/utilities/executors/sshexecutor.py @@ -14,6 +14,10 @@ async def probe_max_session(connection: asyncssh.SSHClientConnection): Probe the sshd `MaxSessions`, i.e. the multiplexing limit per connection """ sessions = 0 + # It does not actually matter what kind of session we open here, but: + # - it should stay open without a separate task to manage it + # - it should reliably and promptly clean up when done probing + # `create_process` is a bit heavy but does all that. async with a.ExitStack() as aes: try: while True: From 201eb242358aa3888d8a664af27bc99b55354d85 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 18 Nov 2021 09:52:12 +0100 Subject: [PATCH 11/19] using full async names --- tardis/utilities/executors/sshexecutor.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tardis/utilities/executors/sshexecutor.py b/tardis/utilities/executors/sshexecutor.py index 67aa8bf7..40c6785c 100644 --- a/tardis/utilities/executors/sshexecutor.py +++ b/tardis/utilities/executors/sshexecutor.py @@ -6,7 +6,10 @@ import asyncio import asyncssh -import asyncstdlib as a +from asyncstdlib import ( + ExitStack as AsyncExitStack, + contextmanager as asynccontextmanager, +) async def probe_max_session(connection: asyncssh.SSHClientConnection): @@ -18,7 +21,7 @@ async def probe_max_session(connection: asyncssh.SSHClientConnection): # - it should stay open without a separate task to manage it # - it should reliably and promptly clean up when done probing # `create_process` is a bit heavy but does all that. - async with a.ExitStack() as aes: + async with AsyncExitStack() as aes: try: while True: await aes.enter_context(await connection.create_process()) @@ -52,7 +55,7 @@ async def _establish_connection(self): return await asyncssh.connect(**self._parameters) @property - @a.contextmanager + @asynccontextmanager async def bounded_connection(self): """ Get the current connection with a single reserved session slot From 78c43407b412b153a3c8da898f216510cd8215ee Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 18 Nov 2021 10:24:25 +0100 Subject: [PATCH 12/19] added test for probed sessions --- tests/utilities_t/executors_t/test_sshexecutor.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index 4fb8f022..319d1636 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -1,6 +1,6 @@ from tests.utilities.utilities import async_return, run_async from tardis.utilities.attributedict import AttributeDict -from tardis.utilities.executors.sshexecutor import SSHExecutor +from tardis.utilities.executors.sshexecutor import SSHExecutor, probe_max_session from tardis.exceptions.executorexceptions import CommandExecutionFailure from asyncssh import ChannelOpenError, ConnectionLost, DisconnectError, ProcessError @@ -47,6 +47,18 @@ async def fake_process(): return fake_process() +class TestSSHExecutorUtilities(TestCase): + def test_max_sessions(self): + with self.subTest(sessions="default"): + self.assertEqual(10, run_async(probe_max_session, MockConnection())) + for expected in (1, 9, 11, 20, 100): + with self.subTest(sessions=expected): + self.assertEqual( + expected, + run_async(probe_max_session, MockConnection(None, expected)), + ) + + class TestSSHExecutor(TestCase): mock_asyncssh = None From 2ce4675fc69b3b60e39b52da789c790d3973c80e Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 18 Nov 2021 11:36:02 +0100 Subject: [PATCH 13/19] MockConnection understands sleep commands --- tests/utilities_t/executors_t/test_sshexecutor.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index 319d1636..aa9c7f94 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -34,6 +34,11 @@ async def run(self, command, input=None, **kwargs): with self._multiplex_session(): if self.exception: raise self.exception + if command.startswith("sleep"): + _, duration = command.split() + await asyncio.sleep(float(duration)) + elif command != "Test": + raise ValueError(f"Unsupported mock command: {command}") return AttributeDict( stdout=input and input.decode(), stderr="TestError", exit_status=0 ) From 92298f4439fc6fe25c027173daa8bf678b565785 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 18 Nov 2021 11:36:18 +0100 Subject: [PATCH 14/19] testing MaxSessions queueing --- .../executors_t/test_sshexecutor.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index aa9c7f94..2db78752 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -135,6 +135,38 @@ async def force_connection(): def test_lock(self): self.assertIsInstance(self.executor.lock, asyncio.Lock) + def test_connection_queueing(self): + async def max_sessions(tries: int): + return max(await asyncio.gather(*(count_sessions() for _ in range(tries)))) + + # There is no way to directly count how many "commands" are running at once: + # if we do something "while" `run_command` is active, we don't know whether + # it actually runs or is queued. + # This approach exploits that MaxSessions queueing will start n=MaxSessions + # commands immediately, but the n+1'th command will run `delay` seconds later. + # As each command adds itself and then waits for `delay / 2` before removing + # itself, there is a window of roughly [delay, delay * 1.5] during which all + # n sessions are counted. + # Note that for this to work, `delay` must be larger than `asyncio`'s task + # switching speed. + async def count_sessions(): + nonlocal current_sessions + delay = 0.05 + await self.executor.run_command(f"sleep {delay}") + current_sessions += 1 + await asyncio.sleep(delay / 2) + count = current_sessions + current_sessions -= 1 + return count + + current_sessions = 0 + for sessions in (1, 8, 12, 20): + with self.subTest(sessions=sessions): + self.assertEqual( + min(sessions, 10), + run_async(max_sessions, sessions), + ) + def test_run_command(self): self.assertIsNone(run_async(self.executor.run_command, command="Test").stdout) self.mock_asyncssh.connect.assert_called_with( From 2250d9719236856ab9f6e4d2c8c96f4bdad3522d Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 18 Nov 2021 11:43:12 +0100 Subject: [PATCH 15/19] wording/formatting --- tests/utilities_t/executors_t/test_sshexecutor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index 2db78752..eda8feb9 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -142,11 +142,12 @@ async def max_sessions(tries: int): # There is no way to directly count how many "commands" are running at once: # if we do something "while" `run_command` is active, we don't know whether # it actually runs or is queued. + # # This approach exploits that MaxSessions queueing will start n=MaxSessions # commands immediately, but the n+1'th command will run `delay` seconds later. # As each command adds itself and then waits for `delay / 2` before removing # itself, there is a window of roughly [delay, delay * 1.5] during which all - # n sessions are counted. + # first n sessions are counted. # Note that for this to work, `delay` must be larger than `asyncio`'s task # switching speed. async def count_sessions(): From 67ff4313b82a32cf8c4b7f7ccd2fda7fb142491b Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 18 Nov 2021 12:08:28 +0100 Subject: [PATCH 16/19] explicit async utility naming --- tests/utilities_t/executors_t/test_sshexecutor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index eda8feb9..fda96ad6 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -11,7 +11,7 @@ import asyncio import yaml import contextlib -import asyncstdlib as a +from asyncstdlib import contextmanager as asynccontextmanager class MockConnection(object): @@ -44,7 +44,7 @@ async def run(self, command, input=None, **kwargs): ) async def create_process(self): - @a.contextmanager + @asynccontextmanager async def fake_process(): with self._multiplex_session(): yield From b915e30d3c6ddb498c90fe4734212b8d6ba705c9 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 18 Nov 2021 18:58:12 +0100 Subject: [PATCH 17/19] parameterised max sessions --- tests/utilities_t/executors_t/test_sshexecutor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index fda96ad6..ee2a5b4d 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -14,8 +14,11 @@ from asyncstdlib import contextmanager as asynccontextmanager +DEFAULT_MAX_SESSIONS = 10 + + class MockConnection(object): - def __init__(self, exception=None, __max_sessions=10, **kwargs): + def __init__(self, exception=None, __max_sessions=DEFAULT_MAX_SESSIONS, **kwargs): self.exception = exception and exception(**kwargs) self.max_sessions = __max_sessions self.current_sessions = 0 @@ -55,7 +58,9 @@ async def fake_process(): class TestSSHExecutorUtilities(TestCase): def test_max_sessions(self): with self.subTest(sessions="default"): - self.assertEqual(10, run_async(probe_max_session, MockConnection())) + self.assertEqual( + DEFAULT_MAX_SESSIONS, run_async(probe_max_session, MockConnection()) + ) for expected in (1, 9, 11, 20, 100): with self.subTest(sessions=expected): self.assertEqual( From 3bbadb96e95fe6c1020c1ea09a1a9e0afe679ff1 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 18 Nov 2021 18:58:48 +0100 Subject: [PATCH 18/19] simplified queueing test --- .../executors_t/test_sshexecutor.py | 45 +++++++------------ 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index ee2a5b4d..e3612571 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -141,36 +141,25 @@ def test_lock(self): self.assertIsInstance(self.executor.lock, asyncio.Lock) def test_connection_queueing(self): - async def max_sessions(tries: int): - return max(await asyncio.gather(*(count_sessions() for _ in range(tries)))) - - # There is no way to directly count how many "commands" are running at once: - # if we do something "while" `run_command` is active, we don't know whether - # it actually runs or is queued. - # - # This approach exploits that MaxSessions queueing will start n=MaxSessions - # commands immediately, but the n+1'th command will run `delay` seconds later. - # As each command adds itself and then waits for `delay / 2` before removing - # itself, there is a window of roughly [delay, delay * 1.5] during which all - # first n sessions are counted. - # Note that for this to work, `delay` must be larger than `asyncio`'s task - # switching speed. - async def count_sessions(): - nonlocal current_sessions - delay = 0.05 - await self.executor.run_command(f"sleep {delay}") - current_sessions += 1 - await asyncio.sleep(delay / 2) - count = current_sessions - current_sessions -= 1 - return count - - current_sessions = 0 - for sessions in (1, 8, 12, 20): + async def is_queued(n: int): + """Check whether the n'th command runs is queued or immediately""" + background = [ + asyncio.create_task(self.executor.run_command("sleep 5")) + for _ in range(n - 1) + ] + # probe can only finish in time if it is not queued + probe = asyncio.create_task(self.executor.run_command("sleep 0.01")) + await asyncio.sleep(0.05) + queued = not probe.done() + for task in background + [probe]: + task.cancel() + return queued + + for sessions in (1, 8, 10, 12, 20): with self.subTest(sessions=sessions): self.assertEqual( - min(sessions, 10), - run_async(max_sessions, sessions), + sessions > DEFAULT_MAX_SESSIONS, + run_async(is_queued, sessions), ) def test_run_command(self): From 2d31c693b6090d0fcb0d5219c3593403fc63e67e Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Thu, 18 Nov 2021 19:05:26 +0100 Subject: [PATCH 19/19] py3.6 ain't dead till it's dead --- tests/utilities_t/executors_t/test_sshexecutor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/utilities_t/executors_t/test_sshexecutor.py b/tests/utilities_t/executors_t/test_sshexecutor.py index e3612571..d47b65ce 100644 --- a/tests/utilities_t/executors_t/test_sshexecutor.py +++ b/tests/utilities_t/executors_t/test_sshexecutor.py @@ -144,11 +144,11 @@ def test_connection_queueing(self): async def is_queued(n: int): """Check whether the n'th command runs is queued or immediately""" background = [ - asyncio.create_task(self.executor.run_command("sleep 5")) + asyncio.ensure_future(self.executor.run_command("sleep 5")) for _ in range(n - 1) ] # probe can only finish in time if it is not queued - probe = asyncio.create_task(self.executor.run_command("sleep 0.01")) + probe = asyncio.ensure_future(self.executor.run_command("sleep 0.01")) await asyncio.sleep(0.05) queued = not probe.done() for task in background + [probe]: