From f9ac377626ff993569b532e693acf3f08a6c1f43 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Wed, 4 Oct 2023 12:58:49 +0200 Subject: [PATCH] [3.11] Add test.support.busy_retry() (#93770) (#110341) Add test.support.busy_retry() (#93770) Add busy_retry() and sleeping_retry() functions to test.support. (cherry picked from commit 7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16) --- Doc/library/test.rst | 45 +++++++++++ Lib/test/_test_multiprocessing.py | 60 ++++++--------- Lib/test/fork_wait.py | 6 +- Lib/test/support/__init__.py | 76 +++++++++++++++++++ Lib/test/test__xxsubinterpreters.py | 11 ++- Lib/test/test_concurrent_futures/test_init.py | 9 +-- .../test_multiprocessing_main_handling.py | 25 +++--- Lib/test/test_signal.py | 25 +++--- Lib/test/test_ssl.py | 5 +- Lib/test/test_support.py | 12 +-- Lib/test/test_wait3.py | 5 +- Lib/test/test_wait4.py | 5 +- 12 files changed, 185 insertions(+), 99 deletions(-) diff --git a/Doc/library/test.rst b/Doc/library/test.rst index 427953e0077aab..d3eb0ae00a08dc 100644 --- a/Doc/library/test.rst +++ b/Doc/library/test.rst @@ -404,6 +404,51 @@ The :mod:`test.support` module defines the following constants: The :mod:`test.support` module defines the following functions: +.. function:: busy_retry(timeout, err_msg=None, /, *, error=True) + + Run the loop body until ``break`` stops the loop. + + After *timeout* seconds, raise an :exc:`AssertionError` if *error* is true, + or just stop the loop if *error* is false. + + Example:: + + for _ in support.busy_retry(support.SHORT_TIMEOUT): + if check(): + break + + Example of error=False usage:: + + for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False): + if check(): + break + else: + raise RuntimeError('my custom error') + +.. function:: sleeping_retry(timeout, err_msg=None, /, *, init_delay=0.010, max_delay=1.0, error=True) + + Wait strategy that applies exponential backoff. + + Run the loop body until ``break`` stops the loop. Sleep at each loop + iteration, but not at the first iteration. The sleep delay is doubled at + each iteration (up to *max_delay* seconds). + + See :func:`busy_retry` documentation for the parameters usage. + + Example raising an exception after SHORT_TIMEOUT seconds:: + + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if check(): + break + + Example of error=False usage:: + + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): + if check(): + break + else: + raise RuntimeError('my custom error') + .. function:: is_resource_enabled(resource) Return ``True`` if *resource* is enabled and available. The list of diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 6249062639b8d4..6459757eb93d51 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4376,18 +4376,13 @@ def test_shared_memory_cleaned_after_process_termination(self): p.terminate() p.wait() - deadline = time.monotonic() + support.LONG_TIMEOUT - t = 0.1 - while time.monotonic() < deadline: - time.sleep(t) - t = min(t*2, 5) + err_msg = ("A SharedMemory segment was leaked after " + "a process was abruptly terminated") + for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg): try: smm = shared_memory.SharedMemory(name, create=False) except FileNotFoundError: break - else: - raise AssertionError("A SharedMemory segment was leaked after" - " a process was abruptly terminated.") if os.name == 'posix': # Without this line it was raising warnings like: @@ -5458,9 +5453,10 @@ def create_and_register_resource(rtype): p.terminate() p.wait() - deadline = time.monotonic() + support.LONG_TIMEOUT - while time.monotonic() < deadline: - time.sleep(.5) + err_msg = (f"A {rtype} resource was leaked after a process was " + f"abruptly terminated") + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, + err_msg): try: _resource_unlink(name2, rtype) except OSError as e: @@ -5468,10 +5464,7 @@ def create_and_register_resource(rtype): # EINVAL self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL)) break - else: - raise AssertionError( - f"A {rtype} resource was leaked after a process was " - f"abruptly terminated.") + err = p.stderr.read().decode('utf-8') p.stderr.close() expected = ('resource_tracker: There appear to be 2 leaked {} ' @@ -5707,18 +5700,17 @@ def wait_proc_exit(self): # but this can take a bit on slow machines, so wait a few seconds # if there are other children too (see #17395). join_process(self.proc) + start_time = time.monotonic() - t = 0.01 - while len(multiprocessing.active_children()) > 1: - time.sleep(t) - t *= 2 - dt = time.monotonic() - start_time - if dt >= 5.0: - test.support.environment_altered = True - support.print_warning(f"multiprocessing.Manager still has " - f"{multiprocessing.active_children()} " - f"active children after {dt} seconds") + for _ in support.sleeping_retry(5.0, error=False): + if len(multiprocessing.active_children()) <= 1: break + else: + dt = time.monotonic() - start_time + support.environment_altered = True + support.print_warning(f"multiprocessing.Manager still has " + f"{multiprocessing.active_children()} " + f"active children after {dt:.1f} seconds") def run_worker(self, worker, obj): self.proc = multiprocessing.Process(target=worker, args=(obj, )) @@ -6031,17 +6023,15 @@ def tearDownClass(cls): # but this can take a bit on slow machines, so wait a few seconds # if there are other children too (see #17395) start_time = time.monotonic() - t = 0.01 - while len(multiprocessing.active_children()) > 1: - time.sleep(t) - t *= 2 - dt = time.monotonic() - start_time - if dt >= 5.0: - test.support.environment_altered = True - support.print_warning(f"multiprocessing.Manager still has " - f"{multiprocessing.active_children()} " - f"active children after {dt} seconds") + for _ in support.sleeping_retry(5.0, error=False): + if len(multiprocessing.active_children()) <= 1: break + else: + dt = time.monotonic() - start_time + support.environment_altered = True + support.print_warning(f"multiprocessing.Manager still has " + f"{multiprocessing.active_children()} " + f"active children after {dt:.1f} seconds") gc.collect() # do garbage collection if cls.manager._number_of_objects() != 0: diff --git a/Lib/test/fork_wait.py b/Lib/test/fork_wait.py index 4d3dbd8e83f5a9..c565f593559483 100644 --- a/Lib/test/fork_wait.py +++ b/Lib/test/fork_wait.py @@ -54,10 +54,8 @@ def test_wait(self): self.threads.append(thread) # busy-loop to wait for threads - deadline = time.monotonic() + support.SHORT_TIMEOUT - while len(self.alive) < NUM_THREADS: - time.sleep(0.1) - if deadline < time.monotonic(): + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): + if len(self.alive) >= NUM_THREADS: break a = sorted(self.alive.keys()) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 2e6518cf241f14..6ee525c061f624 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -2324,6 +2324,82 @@ def requires_venv_with_pip(): return unittest.skipUnless(ctypes, 'venv: pip requires ctypes') +def busy_retry(timeout, err_msg=None, /, *, error=True): + """ + Run the loop body until "break" stops the loop. + + After *timeout* seconds, raise an AssertionError if *error* is true, + or just stop if *error is false. + + Example: + + for _ in support.busy_retry(support.SHORT_TIMEOUT): + if check(): + break + + Example of error=False usage: + + for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False): + if check(): + break + else: + raise RuntimeError('my custom error') + + """ + if timeout <= 0: + raise ValueError("timeout must be greater than zero") + + start_time = time.monotonic() + deadline = start_time + timeout + + while True: + yield + + if time.monotonic() >= deadline: + break + + if error: + dt = time.monotonic() - start_time + msg = f"timeout ({dt:.1f} seconds)" + if err_msg: + msg = f"{msg}: {err_msg}" + raise AssertionError(msg) + + +def sleeping_retry(timeout, err_msg=None, /, + *, init_delay=0.010, max_delay=1.0, error=True): + """ + Wait strategy that applies exponential backoff. + + Run the loop body until "break" stops the loop. Sleep at each loop + iteration, but not at the first iteration. The sleep delay is doubled at + each iteration (up to *max_delay* seconds). + + See busy_retry() documentation for the parameters usage. + + Example raising an exception after SHORT_TIMEOUT seconds: + + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if check(): + break + + Example of error=False usage: + + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): + if check(): + break + else: + raise RuntimeError('my custom error') + """ + + delay = init_delay + for _ in busy_retry(timeout, err_msg, error=error): + yield + + time.sleep(delay) + delay = min(delay * 2, max_delay) + + @contextlib.contextmanager def adjust_int_max_str_digits(max_digits): """Temporarily change the integer string conversion length limit.""" diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 5d0ed9ea14ac7f..f20aae8e21c66f 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -45,12 +45,11 @@ def _wait_for_interp_to_run(interp, timeout=None): # run subinterpreter eariler than the main thread in multiprocess. if timeout is None: timeout = support.SHORT_TIMEOUT - start_time = time.monotonic() - deadline = start_time + timeout - while not interpreters.is_running(interp): - if time.monotonic() > deadline: - raise RuntimeError('interp is not running') - time.sleep(0.010) + for _ in support.sleeping_retry(timeout, error=False): + if interpreters.is_running(interp): + break + else: + raise RuntimeError('interp is not running') @contextlib.contextmanager diff --git a/Lib/test/test_concurrent_futures/test_init.py b/Lib/test/test_concurrent_futures/test_init.py index 138d6ee545fd92..ebab9437b9c4e1 100644 --- a/Lib/test/test_concurrent_futures/test_init.py +++ b/Lib/test/test_concurrent_futures/test_init.py @@ -78,11 +78,10 @@ def test_initializer(self): future.result() # At some point, the executor should break - t1 = time.monotonic() - while not self.executor._broken: - if time.monotonic() - t1 > 5: - self.fail("executor not broken after 5 s.") - time.sleep(0.01) + for _ in support.sleeping_retry(5, "executor not broken"): + if self.executor._broken: + break + # ... and from this point submit() is guaranteed to fail with self.assertRaises(BrokenExecutor): self.executor.submit(get_init_status) diff --git a/Lib/test/test_multiprocessing_main_handling.py b/Lib/test/test_multiprocessing_main_handling.py index 510d8d3a7597e1..35e9cd64fa6c01 100644 --- a/Lib/test/test_multiprocessing_main_handling.py +++ b/Lib/test/test_multiprocessing_main_handling.py @@ -40,6 +40,7 @@ import sys import time from multiprocessing import Pool, set_start_method +from test import support # We use this __main__ defined function in the map call below in order to # check that multiprocessing in correctly running the unguarded @@ -59,13 +60,11 @@ def f(x): results = [] with Pool(5) as pool: pool.map_async(f, [1, 2, 3], callback=results.extend) - start_time = time.monotonic() - while not results: - time.sleep(0.05) - # up to 1 min to report the results - dt = time.monotonic() - start_time - if dt > 60.0: - raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt) + + # up to 1 min to report the results + for _ in support.sleeping_retry(60, "Timed out waiting for results"): + if results: + break results.sort() print(start_method, "->", results) @@ -86,19 +85,17 @@ def f(x): import sys import time from multiprocessing import Pool, set_start_method +from test import support start_method = sys.argv[1] set_start_method(start_method) results = [] with Pool(5) as pool: pool.map_async(int, [1, 4, 9], callback=results.extend) - start_time = time.monotonic() - while not results: - time.sleep(0.05) - # up to 1 min to report the results - dt = time.monotonic() - start_time - if dt > 60.0: - raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt) + # up to 1 min to report the results + for _ in support.sleeping_retry(60, "Timed out waiting for results"): + if results: + break results.sort() print(start_method, "->", results) diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index f6632896100a66..a00c7e539400d9 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -813,13 +813,14 @@ def test_itimer_virtual(self): signal.signal(signal.SIGVTALRM, self.sig_vtalrm) signal.setitimer(self.itimer, 0.3, 0.2) - start_time = time.monotonic() - while time.monotonic() - start_time < 60.0: + for _ in support.busy_retry(60.0, error=False): # use up some virtual time by doing real work _ = pow(12345, 67890, 10000019) if signal.getitimer(self.itimer) == (0.0, 0.0): - break # sig_vtalrm handler stopped this itimer - else: # Issue 8424 + # sig_vtalrm handler stopped this itimer + break + else: + # bpo-8424 self.skipTest("timeout: likely cause: machine too slow or load too " "high") @@ -833,13 +834,14 @@ def test_itimer_prof(self): signal.signal(signal.SIGPROF, self.sig_prof) signal.setitimer(self.itimer, 0.2, 0.2) - start_time = time.monotonic() - while time.monotonic() - start_time < 60.0: + for _ in support.busy_retry(60.0, error=False): # do some work _ = pow(12345, 67890, 10000019) if signal.getitimer(self.itimer) == (0.0, 0.0): - break # sig_prof handler stopped this itimer - else: # Issue 8424 + # sig_prof handler stopped this itimer + break + else: + # bpo-8424 self.skipTest("timeout: likely cause: machine too slow or load too " "high") @@ -1308,8 +1310,6 @@ def handler(signum, frame): self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL expected_sigs = 0 - deadline = time.monotonic() + support.SHORT_TIMEOUT - while expected_sigs < N: # Hopefully the SIGALRM will be received somewhere during # initial processing of SIGUSR1. @@ -1318,8 +1318,9 @@ def handler(signum, frame): expected_sigs += 2 # Wait for handlers to run to avoid signal coalescing - while len(sigs) < expected_sigs and time.monotonic() < deadline: - time.sleep(1e-5) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): + if len(sigs) >= expected_sigs: + break # All ITIMER_REAL signals should have been delivered to the # Python handler diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 965c2728914b50..1bfec237484124 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -2279,11 +2279,8 @@ def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs): # A simple IO loop. Call func(*args) depending on the error we get # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs. timeout = kwargs.get('timeout', support.SHORT_TIMEOUT) - deadline = time.monotonic() + timeout count = 0 - while True: - if time.monotonic() > deadline: - self.fail("timeout") + for _ in support.busy_retry(timeout): errno = None count += 1 try: diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 01ba88ce42c5f6..fec96bef4bbd22 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -10,7 +10,6 @@ import sysconfig import tempfile import textwrap -import time import unittest import warnings @@ -462,18 +461,12 @@ def test_reap_children(self): # child process: do nothing, just exit os._exit(0) - t0 = time.monotonic() - deadline = time.monotonic() + support.SHORT_TIMEOUT - was_altered = support.environment_altered try: support.environment_altered = False stderr = io.StringIO() - while True: - if time.monotonic() > deadline: - self.fail("timeout") - + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): with support.swap_attr(support.print_warning, 'orig_stderr', stderr): support.reap_children() @@ -482,9 +475,6 @@ def test_reap_children(self): if support.environment_altered: break - # loop until the child process completed - time.sleep(0.100) - msg = "Warning -- reap_children() reaped child process %s" % pid self.assertIn(msg, stderr.getvalue()) self.assertTrue(support.environment_altered) diff --git a/Lib/test/test_wait3.py b/Lib/test/test_wait3.py index 4ec7690ac19bbf..15d66ae825abf5 100644 --- a/Lib/test/test_wait3.py +++ b/Lib/test/test_wait3.py @@ -4,7 +4,6 @@ import os import subprocess import sys -import time import unittest from test.fork_wait import ForkWait from test import support @@ -20,14 +19,12 @@ def wait_impl(self, cpid, *, exitcode): # This many iterations can be required, since some previously run # tests (e.g. test_ctypes) could have spawned a lot of children # very quickly. - deadline = time.monotonic() + support.SHORT_TIMEOUT - while time.monotonic() <= deadline: + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): # wait3() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status, rusage = os.wait3(os.WNOHANG) if spid == cpid: break - time.sleep(0.1) self.assertEqual(spid, cpid) self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) diff --git a/Lib/test/test_wait4.py b/Lib/test/test_wait4.py index 24f1aaec60c56b..f66c0db1c20e6e 100644 --- a/Lib/test/test_wait4.py +++ b/Lib/test/test_wait4.py @@ -2,7 +2,6 @@ """ import os -import time import sys import unittest from test.fork_wait import ForkWait @@ -22,14 +21,12 @@ def wait_impl(self, cpid, *, exitcode): # Issue #11185: wait4 is broken on AIX and will always return 0 # with WNOHANG. option = 0 - deadline = time.monotonic() + support.SHORT_TIMEOUT - while time.monotonic() <= deadline: + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): # wait4() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status, rusage = os.wait4(cpid, option) if spid == cpid: break - time.sleep(0.1) self.assertEqual(spid, cpid) self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) self.assertTrue(rusage)