Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-114911: Add CPUStopwatch test helper #114912

Merged
merged 3 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2381,6 +2381,46 @@ def sleeping_retry(timeout, err_msg=None, /,
delay = min(delay * 2, max_delay)


class CPUStopwatch:
"""Context manager to roughly time a CPU-bound operation.

Disables GC. Uses CPU time if it can (i.e. excludes sleeps & time of
other processes).

N.B.:
- This *includes* time spent in other threads.
- Some systems only have a coarse resolution; check
stopwatch.clock_info.rseolution if.

Usage:

with ProcessStopwatch() as stopwatch:
...
elapsed = stopwatch.seconds
resolution = stopwatch.clock_info.resolution
"""
def __enter__(self):
get_time = time.process_time
clock_info = time.get_clock_info('process_time')
if get_time() <= 0: # some platforms like WASM lack process_time()
get_time = time.monotonic
clock_info = time.get_clock_info('monotonic')
self.context = disable_gc()
self.context.__enter__()
self.get_time = get_time
self.clock_info = clock_info
self.start_time = get_time()
return self

def __exit__(self, *exc):
try:
end_time = self.get_time()
finally:
result = self.context.__exit__(*exc)
self.seconds = end_time - self.start_time
return result


@contextlib.contextmanager
def adjust_int_max_str_digits(max_digits):
"""Temporarily change the integer string conversion length limit."""
Expand Down
58 changes: 26 additions & 32 deletions Lib/test/test_int.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,84 +664,78 @@ def test_denial_of_service_prevented_int_to_str(self):
"""Regression test: ensure we fail before performing O(N**2) work."""
maxdigits = sys.get_int_max_str_digits()
assert maxdigits < 50_000, maxdigits # A test prerequisite.
get_time = time.process_time
if get_time() <= 0: # some platforms like WASM lack process_time()
get_time = time.monotonic

huge_int = int(f'0x{"c"*65_000}', base=16) # 78268 decimal digits.
digits = 78_268
with support.adjust_int_max_str_digits(digits):
start = get_time()
with (
support.adjust_int_max_str_digits(digits),
support.CPUStopwatch() as sw_convert):
huge_decimal = str(huge_int)
seconds_to_convert = get_time() - start
self.assertEqual(len(huge_decimal), digits)
# Ensuring that we chose a slow enough conversion to measure.
# It takes 0.1 seconds on a Zen based cloud VM in an opt build.
# Some OSes have a low res 1/64s timer, skip if hard to measure.
if seconds_to_convert < 1/64:
if sw_convert.seconds < sw_convert.clock_info.resolution * 2:
raise unittest.SkipTest('"slow" conversion took only '
f'{seconds_to_convert} seconds.')
f'{sw_convert.seconds} seconds.')

# We test with the limit almost at the size needed to check performance.
# The performant limit check is slightly fuzzy, give it a some room.
with support.adjust_int_max_str_digits(int(.995 * digits)):
with self.assertRaises(ValueError) as err:
start = get_time()
with (
self.assertRaises(ValueError) as err,
support.CPUStopwatch() as sw_fail_huge):
str(huge_int)
seconds_to_fail_huge = get_time() - start
self.assertIn('conversion', str(err.exception))
self.assertLessEqual(seconds_to_fail_huge, seconds_to_convert/2)
self.assertLessEqual(sw_fail_huge.seconds, sw_convert.seconds/2)

# Now we test that a conversion that would take 30x as long also fails
# in a similarly fast fashion.
extra_huge_int = int(f'0x{"c"*500_000}', base=16) # 602060 digits.
with self.assertRaises(ValueError) as err:
start = get_time()
with (
self.assertRaises(ValueError) as err,
support.CPUStopwatch() as sw_fail_extra_huge):
# If not limited, 8 seconds said Zen based cloud VM.
str(extra_huge_int)
seconds_to_fail_extra_huge = get_time() - start
self.assertIn('conversion', str(err.exception))
self.assertLess(seconds_to_fail_extra_huge, seconds_to_convert/2)
self.assertLess(sw_fail_extra_huge.seconds, sw_convert.seconds/2)

def test_denial_of_service_prevented_str_to_int(self):
"""Regression test: ensure we fail before performing O(N**2) work."""
maxdigits = sys.get_int_max_str_digits()
assert maxdigits < 100_000, maxdigits # A test prerequisite.
get_time = time.process_time
if get_time() <= 0: # some platforms like WASM lack process_time()
get_time = time.monotonic

digits = 133700
huge = '8'*digits
with support.adjust_int_max_str_digits(digits):
start = get_time()
with (
support.adjust_int_max_str_digits(digits),
support.CPUStopwatch() as sw_convert):
int(huge)
seconds_to_convert = get_time() - start
# Ensuring that we chose a slow enough conversion to measure.
# It takes 0.1 seconds on a Zen based cloud VM in an opt build.
# Some OSes have a low res 1/64s timer, skip if hard to measure.
if seconds_to_convert < 1/64:
if sw_convert.seconds < sw_convert.clock_info.resolution * 2:
raise unittest.SkipTest('"slow" conversion took only '
f'{seconds_to_convert} seconds.')
f'{sw_convert.seconds} seconds.')

with support.adjust_int_max_str_digits(digits - 1):
with self.assertRaises(ValueError) as err:
start = get_time()
with (
self.assertRaises(ValueError) as err,
support.CPUStopwatch() as sw_fail_huge):
int(huge)
seconds_to_fail_huge = get_time() - start
self.assertIn('conversion', str(err.exception))
self.assertLessEqual(seconds_to_fail_huge, seconds_to_convert/2)
self.assertLessEqual(sw_fail_huge.seconds, sw_convert.seconds/2)

# Now we test that a conversion that would take 30x as long also fails
# in a similarly fast fashion.
extra_huge = '7'*1_200_000
with self.assertRaises(ValueError) as err:
start = get_time()
with (
self.assertRaises(ValueError) as err,
support.CPUStopwatch() as sw_fail_extra_huge):
# If not limited, 8 seconds in the Zen based cloud VM.
int(extra_huge)
seconds_to_fail_extra_huge = get_time() - start
self.assertIn('conversion', str(err.exception))
self.assertLessEqual(seconds_to_fail_extra_huge, seconds_to_convert/2)
self.assertLessEqual(sw_fail_extra_huge.seconds, sw_convert.seconds/2)

def test_power_of_two_bases_unlimited(self):
"""The limit does not apply to power of 2 bases."""
Expand Down
19 changes: 9 additions & 10 deletions Lib/test/test_re.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from test.support import (gc_collect, bigmemtest, _2G,
cpython_only, captured_stdout,
check_disallow_instantiation, is_emscripten, is_wasi,
warnings_helper, SHORT_TIMEOUT)
warnings_helper, SHORT_TIMEOUT, CPUStopwatch)
import locale
import re
import string
Expand Down Expand Up @@ -2284,17 +2284,16 @@ def test_bug_40736(self):

def test_search_anchor_at_beginning(self):
s = 'x'*10**7
start = time.perf_counter()
for p in r'\Ay', r'^y':
self.assertIsNone(re.search(p, s))
self.assertEqual(re.split(p, s), [s])
self.assertEqual(re.findall(p, s), [])
self.assertEqual(list(re.finditer(p, s)), [])
self.assertEqual(re.sub(p, '', s), s)
t = time.perf_counter() - start
with CPUStopwatch() as stopwatch:
for p in r'\Ay', r'^y':
self.assertIsNone(re.search(p, s))
self.assertEqual(re.split(p, s), [s])
self.assertEqual(re.findall(p, s), [])
self.assertEqual(list(re.finditer(p, s)), [])
self.assertEqual(re.sub(p, '', s), s)
# Without optimization it takes 1 second on my computer.
# With optimization -- 0.0003 seconds.
self.assertLess(t, 0.1)
self.assertLess(stopwatch.seconds, 0.1)

def test_possessive_quantifiers(self):
"""Test Possessive Quantifiers
Expand Down
Loading