Skip to content

Commit

Permalink
merge 3.3-slp (Stackless python#117, StacklessTestCase)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anselm Kruis committed Mar 17, 2017
2 parents efa2504 + 5ea55fe commit e0f01d1
Showing 1 changed file with 287 additions and 10 deletions.
297 changes: 287 additions & 10 deletions Stackless/unittests/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io
import contextlib
import gc
import os
import functools
from test.support import run_unittest

# emit warnings about uncollectable objects
Expand Down Expand Up @@ -65,10 +67,75 @@ def require_one_thread(testcase):
return testcase


def get_current_watchdog_list():
def testcase_leaks_references(leak_reason, soft_switching=None):
"""Skip test, which leak references during leak tests.
If a test leaks references, which happens if the thread of a tasklet with a C-stack dies,
you must decorate the test case with this decorator.
Note: If you know the leaking object, you can try to use the hackish function
decref_leaked_object()
"""
def decorator(testcase):
@functools.wraps(testcase)
def wrapper(self):
if soft_switching is not None and stackless.enable_softswitch(None) != soft_switching:
# the leak happens only if soft switching is enables/disabled
return testcase(self)
for frameinfo in inspect.stack(0):
# print("frameinfo[3]", frameinfo[3], file=sys.stderr)
if frameinfo[3] == "dash_R":
# it is a test.regrtest -R: run
return self.skipTest("Test leaks references: " + leak_reason)
return testcase(self)
return wrapper
return decorator


# decref_leaked_object() works, but it is not as useful as I expected, because
# a C-stack usually contains several references to None and other objects. All in all
# there are more references than we can identify and handle manually.
#
# _Py_DecRef = ctypes.pythonapi.Py_DecRef
# _Py_DecRef.argtypes = [ctypes.py_object]
# _Py_DecRef.restype = None
#
#
# def decref_leaked_object(obj):
# """Py_DecRef an object, that would otherwise leak.
#
# Used to forget references held by a C-stack, whose thread already died.
# """
# # Check, that obj is really leaked
# def get_rc_delta(obj):
# gc.collect()
# rc = sys.getrefcount(obj) - 1
# n_refferers = len(gc.get_referrers(obj))
# return rc - n_refferers
# # Quality check
# assert get_rc_delta(object()) == 0
# delta = get_rc_delta(obj)
# if delta > 0:
# _Py_DecRef(obj)
# else:
# raise RuntimeError("There is no ref leak for obj. Missing referrers %d" % (delta,))


def get_watchdog_list(threadid):
"""Get the watchdog list of a thread.
Contrary to :func:`get_current_watchdog_list` this function does
not create a watchdog list, if it does not already exist.
"""
# The second argument of get_thread_info() is intentionally undocumented.
# See C source.
watchdog_list = stackless.get_thread_info(-1, 1 << 31)[3]
return stackless.get_thread_info(threadid, 1 << 31)[3]


def get_current_watchdog_list():
"""Get the watchdog list of the current thread
"""
watchdog_list = get_watchdog_list(-1)
if isinstance(watchdog_list, list):
return watchdog_list
# The watchdog list has not been created. Force its creation.
Expand All @@ -87,11 +154,72 @@ def get_current_watchdog_list():
t.insert()
if scheduled:
assert stackless.current.next == scheduled[0]
watchdog_list = stackless.get_thread_info(-1, 1 << 31)[3]
watchdog_list = get_watchdog_list(-1)
assert isinstance(watchdog_list, list)
return watchdog_list


def get_tasklets_with_cstate():
"""Return a list of all tasklets with a C-stack.
"""
tlets = []
current = stackless.current
if current.nesting_level > 0:
tlets.append(current)
with stackless.atomic():
cscurrent = current.cstate
cs = cscurrent.next
while cs is not cscurrent:
t = cs.task
if (t is not None and
t.cstate is cs and
t.alive and
t.nesting_level > 0):
assert t not in tlets
tlets.append(t)
cs = cs.next
return tlets


class SwitchRecorder(object):
def __init__(self, old_scb):
self.ids = {}
if stackless.main is stackless.current:
self.ids[id(stackless.main)] = "main/current"
else:
self.ids[id(stackless.main)] = "main"
self.ids[id(stackless.current)] = "current"
self.switches = []
self.old_scb = old_scb

def __call__(self, prev_tlet, next_tlet):
# print("%s -> %s" % (id(prev_tlet), id(next_tlet)), file=sys.stderr)
self.switches.append((None if prev_tlet is None else id(prev_tlet),
None if next_tlet is None else id(next_tlet)))
if self.old_scb is not None:
return self.old_scb(prev_tlet, next_tlet)

def id2str(self, tlet_id):
if tlet_id is None:
return "None"
try:
nr = self.ids[tlet_id]
except KeyError:
self.ids[tlet_id] = nr = "tlet %d" % (len(self.ids),)
return nr

def __str__(self):
s = ["", "tasklet switches (%d)" % (len(self.switches),)]
for (p, n) in self.switches:
s.append("%s -> %s" % (self.id2str(p), self.id2str(n)))
return os.linesep.join(s)

def print(self, file=None):
if file is None:
file = sys.stderr
print(str(self), file=file)


class StacklessTestCaseMixin(object):
def skipUnlessSoftswitching(self):
if not stackless.enable_softswitch(None):
Expand Down Expand Up @@ -190,6 +318,109 @@ def _checkSignature(self, func, nb_mandatory, accept_arbitrary, additionalArg, *
# only required args as kw-args
yield func(**dict((k, kwargs[k]) for k in names[:nb_mandatory]))

def trace_tasklet_switches(self):
old_scb = stackless.get_schedule_callback()
switch_recorder = SwitchRecorder(old_scb)
self.addCleanup(stackless.set_schedule_callback, old_scb)
self.addCleanup(switch_recorder.print)
stackless.set_schedule_callback(switch_recorder)

def register_tasklet_name(self, tlet, name):
scb = stackless.get_schedule_callback()
if isinstance(scb, SwitchRecorder):
if isinstance(tlet, stackless.tasklet):
tlet = id(tlet)
self.assertIsInstance(tlet, int)
scb.ids[tlet] = name

def refleak_hunting_record_baseline(self):
"""Record a baseline for hunting reference leaks.
This method and the methods refleak_hunting_find_leaks() and
refleak_hunting_print() can be used to identify reference leaks.
The basic idea is simple: For objects which take part in garbage collection,
the number of referrers matches the reference count. In case of a missing Py_DECREF,
we can observe a surplus reference count. Unfortunately, in reality the garbage collector
doesn't care about references from non container objects or local variables of C-functions.
Therefore, we record a baseline first.
Application: Just call bracket the suspicious code like this::
self.refleak_hunting_record_baseline()
# suspicious code goes here
...
self.refleak_hunting_find_leaks()
In case you need this functions in a C-Python test suite module, use the following
code snippet::
import os
import sysconfig
import sys
sys.path.append(os.path.join(sysconfig.get_path("data"), "Stackless", "unittests"))
from support import StacklessTestCaseMixin
...
class TestCase(unittest.TestCase, StacklessTestCaseMixin):
...
"""
gc.collect()
self.singleton_ref_counts_baseline = [sys.getrefcount(None)]
oids = set(id(o) for o in gc.get_objects() if len(gc.get_referrers(o)) <= 2)
oids.add(id(sys._getframe(1)))
self.oids_insufficient_referrers = oids

def refleak_hunting_find_leaks(self, objects=None):
"""Detect potential leaked objects.
"""
gc.collect()
if objects is None:
objects = gc.get_objects()
gc.collect()
self.singleton_ref_counts = [sys.getrefcount(None)]
self.assertIsInstance(objects, list)
oids = self.oids_insufficient_referrers
# add the watchdog list to the baseline.
wd_list = get_watchdog_list(-1)
if isinstance(wd_list, list):
oids.add(id(wd_list))
# add the current frame
oids.add(id(sys._getframe()))
# add the parent frame
oids.add(id(sys._getframe(1)))
# compute objects, which are alive, but have to less referrers to
# justify their live.
candidates = []
for i in xrange(len(objects)):
if id(objects[i]) in oids:
continue
if len(gc.get_referrers(objects[i])) <= 1:
candidates.append(objects[i])
del objects[:]
del self.oids_insufficient_referrers
self.refleak_candidates = candidates
self.addCleanup(self.refleak_hunting_print)
return candidates

def refleak_hunting_print(self, file=None):
"""Print the result.
Print the result of the reference leak hunting to sys.stderr.
"""
if file is None:
file = sys.stderr
print("", file=file)
print("Leak hunting:", file=file)
for (i, name) in enumerate(("None",)):
a = self.singleton_ref_counts_baseline[i]
b = self.singleton_ref_counts[i]
print("Ref count of %s: %d - %d = %d" % (name, a, b, a - b), file=file)
print("Number of candidates", len(self.refleak_candidates), file=file)
for (i, o) in enumerate(self.refleak_candidates):
print("%d: %r %r" % (i, type(o), o), file=file)


# call the class method prepare_test_methods(cls) after creating the
# class. This method can be used to modify the newly created class
Expand Down Expand Up @@ -251,7 +482,8 @@ def prepare_pickle_test_method(cls, func, name=None):
for i in range(0, pickle.HIGHEST_PROTOCOL + 1):
for p_letter in ("C", "P"):
def test(self, method=func, proto=i, pickle_module=p_letter, unpickle_module=p_letter):
self.assertTrue(self._StacklessTestCase__setup_called, "Broken test case: it didn't call super(..., self).setUp()")
self.assertTrue(self._StacklessTestCase__setup_called,
"Broken test case: it didn't call super(..., self).setUp()")
self._pickle_protocol = proto
self._pickle_module = pickle_module
self._unpickle_module = unpickle_module
Expand Down Expand Up @@ -284,7 +516,7 @@ def prepare_test_methods(cls):
if inspect.ismethod(m):
m = m.__func__
prepare = getattr(m, "prepare", cls.prepare_test_method)
for x in prepare.__func__(cls, m, n):
for x in prepare.__func__(cls, m, n): # @UnusedVariable
pass

__setup_called = False
Expand Down Expand Up @@ -317,20 +549,45 @@ def setUpStacklessTestCase(self):
self.addCleanup(stackless.enable_softswitch, stackless.enable_softswitch(self.__enable_softswitch))

self.__active_test_cases[id(self)] = self
watchdog_list = get_watchdog_list(-1)
if watchdog_list is not None:
self.assertListEqual(watchdog_list, [None], "Watchdog list is not empty: " + repr(watchdog_list))
if withThreads and self.__preexisting_threads is None:
self.__preexisting_threads = frozenset(threading.enumerate())
for (watchdog_list, tid) in [(get_watchdog_list(tid), tid)
for tid in stackless.threads if tid != stackless.current.thread_id]:
if watchdog_list is None:
continue
self.assertListEqual(watchdog_list, [None],
"Thread %d: watchdog list is not empty: %r" % (tid, watchdog_list))
return len(self.__preexisting_threads)
return 1

def setUp(self):
self.assertEqual(stackless.getruncount(), 1, "Leakage from other tests, with %d tasklets still in the scheduler" % (stackless.getruncount() - 1))
self.assertEqual(stackless.getruncount(), 1,
"Leakage from other tests, with %d tasklets still in the scheduler" %
(stackless.getruncount() - 1))
expected_thread_count = self.setUpStacklessTestCase()
if withThreads:
active_count = threading.active_count()
self.assertEqual(active_count, expected_thread_count, "Leakage from other threads, with %d threads running (%d expected)" % (active_count, expected_thread_count))
self.assertEqual(active_count, expected_thread_count,
"Leakage from other threads, with %d threads running (%d expected)" %
(active_count, expected_thread_count))

def tearDown(self):
# Tasklets created in pickling tests can be left in the scheduler when they finish. We can feel free to
# Test, that stackless errorhandler is None and reset it
self.assertIsNone(stackless.set_error_handler(None))

# Test, that switch_trap level is 0 and set the level back to 0
try:
# get the level without changing it
st_level = stackless.switch_trap(0)
self.assertEqual(st_level, 0, "switch_trap is %d" % (st_level,))
except AssertionError:
# change the level so that the result is 0
stackless.switch_trap(-st_level)
raise
# Tasklets created in various tests can be left in the scheduler when they finish. We can feel free to
# clean them up for the tests. Any tests that expect to exit with no leaked tasklets should do explicit
# assertions to check.
self.assertTrue(self.__setup_called, "Broken test case: it didn't call super(..., self).setUp()")
Expand All @@ -341,9 +598,27 @@ def tearDown(self):
next_ = current.next
current.kill()
current = next_
# Tasklets with C-stack can create reference leaks, if the C-stack holds a reference,
# that keeps the tasklet-object alive. A common case is the call of a tasklet or channel method,
# which causes a tasklet switch. The transient bound-method object keeps the tasklet alive.
# Here we kill such tasklets.
for current in get_tasklets_with_cstate():
if current.blocked:
# print("Killing blocked tasklet", current, file=sys.stderr)
current.kill()
run_count = stackless.getruncount()
self.assertEqual(run_count, 1, "Leakage from this test, with %d tasklets still in the scheduler" % (run_count - 1))
self.assertEqual(run_count, 1,
"Leakage from this test, with %d tasklets still in the scheduler" % (run_count - 1))
watchdog_list = get_watchdog_list(-1)
if watchdog_list is not None:
self.assertListEqual(watchdog_list, [None], "Watchdog list is not empty: " + repr(watchdog_list))
if withThreads:
for (watchdog_list, tid) in [(get_watchdog_list(tid), tid)
for tid in stackless.threads if tid != stackless.current.thread_id]:
if watchdog_list is None:
continue
self.assertListEqual(watchdog_list, [None],
"Thread %d: watchdog list is not empty: %r" % (tid, watchdog_list))
preexisting_threads = self.__preexisting_threads
self.__preexisting_threads = None # avoid pickling problems, see _addSkip
expected_thread_count = len(preexisting_threads)
Expand All @@ -355,7 +630,9 @@ def tearDown(self):
while activeThreads:
activeThreads.pop().join(0.5)
active_count = threading.active_count()
self.assertEqual(active_count, expected_thread_count, "Leakage from other threads, with %d threads running (%d expected)" % (active_count, expected_thread_count))
self.assertEqual(active_count, expected_thread_count,
"Leakage from other threads, with %d threads running (%d expected)" %
(active_count, expected_thread_count))
gc.collect() # emits warnings about uncollectable objects after each test

def dumps(self, obj, protocol=None, *, fix_imports=True):
Expand Down

0 comments on commit e0f01d1

Please sign in to comment.