Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.11] Add test.support.busy_retry() (#93770) #110341

Merged
merged 1 commit into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions Doc/library/test.rst
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,51 @@ The :mod:`test.support` module defines the following constants:

The :mod:`test.support` module defines the following functions:

.. function:: busy_retry(timeout, err_msg=None, /, *, error=True)

Run the loop body until ``break`` stops the loop.

After *timeout* seconds, raise an :exc:`AssertionError` if *error* is true,
or just stop the loop if *error* is false.

Example::

for _ in support.busy_retry(support.SHORT_TIMEOUT):
if check():
break

Example of error=False usage::

for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')

.. function:: sleeping_retry(timeout, err_msg=None, /, *, init_delay=0.010, max_delay=1.0, error=True)

Wait strategy that applies exponential backoff.

Run the loop body until ``break`` stops the loop. Sleep at each loop
iteration, but not at the first iteration. The sleep delay is doubled at
each iteration (up to *max_delay* seconds).

See :func:`busy_retry` documentation for the parameters usage.

Example raising an exception after SHORT_TIMEOUT seconds::

for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if check():
break

Example of error=False usage::

for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')

.. function:: is_resource_enabled(resource)

Return ``True`` if *resource* is enabled and available. The list of
Expand Down
60 changes: 25 additions & 35 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4376,18 +4376,13 @@ def test_shared_memory_cleaned_after_process_termination(self):
p.terminate()
p.wait()

deadline = time.monotonic() + support.LONG_TIMEOUT
t = 0.1
while time.monotonic() < deadline:
time.sleep(t)
t = min(t*2, 5)
err_msg = ("A SharedMemory segment was leaked after "
"a process was abruptly terminated")
for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg):
try:
smm = shared_memory.SharedMemory(name, create=False)
except FileNotFoundError:
break
else:
raise AssertionError("A SharedMemory segment was leaked after"
" a process was abruptly terminated.")

if os.name == 'posix':
# Without this line it was raising warnings like:
Expand Down Expand Up @@ -5458,20 +5453,18 @@ def create_and_register_resource(rtype):
p.terminate()
p.wait()

deadline = time.monotonic() + support.LONG_TIMEOUT
while time.monotonic() < deadline:
time.sleep(.5)
err_msg = (f"A {rtype} resource was leaked after a process was "
f"abruptly terminated")
for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
err_msg):
try:
_resource_unlink(name2, rtype)
except OSError as e:
# docs say it should be ENOENT, but OSX seems to give
# EINVAL
self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
break
else:
raise AssertionError(
f"A {rtype} resource was leaked after a process was "
f"abruptly terminated.")

err = p.stderr.read().decode('utf-8')
p.stderr.close()
expected = ('resource_tracker: There appear to be 2 leaked {} '
Expand Down Expand Up @@ -5707,18 +5700,17 @@ def wait_proc_exit(self):
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395).
join_process(self.proc)

start_time = time.monotonic()
t = 0.01
while len(multiprocessing.active_children()) > 1:
time.sleep(t)
t *= 2
dt = time.monotonic() - start_time
if dt >= 5.0:
test.support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt} seconds")
for _ in support.sleeping_retry(5.0, error=False):
if len(multiprocessing.active_children()) <= 1:
break
else:
dt = time.monotonic() - start_time
support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt:.1f} seconds")

def run_worker(self, worker, obj):
self.proc = multiprocessing.Process(target=worker, args=(obj, ))
Expand Down Expand Up @@ -6031,17 +6023,15 @@ def tearDownClass(cls):
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395)
start_time = time.monotonic()
t = 0.01
while len(multiprocessing.active_children()) > 1:
time.sleep(t)
t *= 2
dt = time.monotonic() - start_time
if dt >= 5.0:
test.support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt} seconds")
for _ in support.sleeping_retry(5.0, error=False):
if len(multiprocessing.active_children()) <= 1:
break
else:
dt = time.monotonic() - start_time
support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt:.1f} seconds")

gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0:
Expand Down
6 changes: 2 additions & 4 deletions Lib/test/fork_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ def test_wait(self):
self.threads.append(thread)

# busy-loop to wait for threads
deadline = time.monotonic() + support.SHORT_TIMEOUT
while len(self.alive) < NUM_THREADS:
time.sleep(0.1)
if deadline < time.monotonic():
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if len(self.alive) >= NUM_THREADS:
break

a = sorted(self.alive.keys())
Expand Down
76 changes: 76 additions & 0 deletions Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2324,6 +2324,82 @@ def requires_venv_with_pip():
return unittest.skipUnless(ctypes, 'venv: pip requires ctypes')


def busy_retry(timeout, err_msg=None, /, *, error=True):
"""
Run the loop body until "break" stops the loop.

After *timeout* seconds, raise an AssertionError if *error* is true,
or just stop if *error is false.

Example:

for _ in support.busy_retry(support.SHORT_TIMEOUT):
if check():
break

Example of error=False usage:

for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')

"""
if timeout <= 0:
raise ValueError("timeout must be greater than zero")

start_time = time.monotonic()
deadline = start_time + timeout

while True:
yield

if time.monotonic() >= deadline:
break

if error:
dt = time.monotonic() - start_time
msg = f"timeout ({dt:.1f} seconds)"
if err_msg:
msg = f"{msg}: {err_msg}"
raise AssertionError(msg)


def sleeping_retry(timeout, err_msg=None, /,
*, init_delay=0.010, max_delay=1.0, error=True):
"""
Wait strategy that applies exponential backoff.

Run the loop body until "break" stops the loop. Sleep at each loop
iteration, but not at the first iteration. The sleep delay is doubled at
each iteration (up to *max_delay* seconds).

See busy_retry() documentation for the parameters usage.

Example raising an exception after SHORT_TIMEOUT seconds:

for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if check():
break

Example of error=False usage:

for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')
"""

delay = init_delay
for _ in busy_retry(timeout, err_msg, error=error):
yield

time.sleep(delay)
delay = min(delay * 2, max_delay)


@contextlib.contextmanager
def adjust_int_max_str_digits(max_digits):
"""Temporarily change the integer string conversion length limit."""
Expand Down
11 changes: 5 additions & 6 deletions Lib/test/test__xxsubinterpreters.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ def _wait_for_interp_to_run(interp, timeout=None):
# run subinterpreter eariler than the main thread in multiprocess.
if timeout is None:
timeout = support.SHORT_TIMEOUT
start_time = time.monotonic()
deadline = start_time + timeout
while not interpreters.is_running(interp):
if time.monotonic() > deadline:
raise RuntimeError('interp is not running')
time.sleep(0.010)
for _ in support.sleeping_retry(timeout, error=False):
if interpreters.is_running(interp):
break
else:
raise RuntimeError('interp is not running')


@contextlib.contextmanager
Expand Down
9 changes: 4 additions & 5 deletions Lib/test/test_concurrent_futures/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,10 @@ def test_initializer(self):
future.result()

# At some point, the executor should break
t1 = time.monotonic()
while not self.executor._broken:
if time.monotonic() - t1 > 5:
self.fail("executor not broken after 5 s.")
time.sleep(0.01)
for _ in support.sleeping_retry(5, "executor not broken"):
if self.executor._broken:
break

# ... and from this point submit() is guaranteed to fail
with self.assertRaises(BrokenExecutor):
self.executor.submit(get_init_status)
Expand Down
25 changes: 11 additions & 14 deletions Lib/test/test_multiprocessing_main_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import sys
import time
from multiprocessing import Pool, set_start_method
from test import support

# We use this __main__ defined function in the map call below in order to
# check that multiprocessing in correctly running the unguarded
Expand All @@ -59,13 +60,11 @@ def f(x):
results = []
with Pool(5) as pool:
pool.map_async(f, [1, 2, 3], callback=results.extend)
start_time = time.monotonic()
while not results:
time.sleep(0.05)
# up to 1 min to report the results
dt = time.monotonic() - start_time
if dt > 60.0:
raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)

# up to 1 min to report the results
for _ in support.sleeping_retry(60, "Timed out waiting for results"):
if results:
break

results.sort()
print(start_method, "->", results)
Expand All @@ -86,19 +85,17 @@ def f(x):
import sys
import time
from multiprocessing import Pool, set_start_method
from test import support

start_method = sys.argv[1]
set_start_method(start_method)
results = []
with Pool(5) as pool:
pool.map_async(int, [1, 4, 9], callback=results.extend)
start_time = time.monotonic()
while not results:
time.sleep(0.05)
# up to 1 min to report the results
dt = time.monotonic() - start_time
if dt > 60.0:
raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
# up to 1 min to report the results
for _ in support.sleeping_retry(60, "Timed out waiting for results"):
if results:
break

results.sort()
print(start_method, "->", results)
Expand Down
25 changes: 13 additions & 12 deletions Lib/test/test_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,13 +813,14 @@ def test_itimer_virtual(self):
signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
signal.setitimer(self.itimer, 0.3, 0.2)

start_time = time.monotonic()
while time.monotonic() - start_time < 60.0:
for _ in support.busy_retry(60.0, error=False):
# use up some virtual time by doing real work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_vtalrm handler stopped this itimer
else: # Issue 8424
# sig_vtalrm handler stopped this itimer
break
else:
# bpo-8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")

Expand All @@ -833,13 +834,14 @@ def test_itimer_prof(self):
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)

start_time = time.monotonic()
while time.monotonic() - start_time < 60.0:
for _ in support.busy_retry(60.0, error=False):
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
# sig_prof handler stopped this itimer
break
else:
# bpo-8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")

Expand Down Expand Up @@ -1308,8 +1310,6 @@ def handler(signum, frame):
self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL

expected_sigs = 0
deadline = time.monotonic() + support.SHORT_TIMEOUT

while expected_sigs < N:
# Hopefully the SIGALRM will be received somewhere during
# initial processing of SIGUSR1.
Expand All @@ -1318,8 +1318,9 @@ def handler(signum, frame):

expected_sigs += 2
# Wait for handlers to run to avoid signal coalescing
while len(sigs) < expected_sigs and time.monotonic() < deadline:
time.sleep(1e-5)
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if len(sigs) >= expected_sigs:
break

# All ITIMER_REAL signals should have been delivered to the
# Python handler
Expand Down
Loading