mirror of
https://github.com/python/cpython.git
synced 2024-12-12 11:23:56 +08:00
b892d3ea46
It adds a missing testcase for bpo-34125. This is testing code which is affected by PEP 590, so missing this test might accidentally break CPython if we screw up with implementing PEP 590.
420 lines
12 KiB
Python
420 lines
12 KiB
Python
import gc
|
|
import pprint
|
|
import sys
|
|
import unittest
|
|
|
|
|
|
class TestGetProfile(unittest.TestCase):
|
|
def setUp(self):
|
|
sys.setprofile(None)
|
|
|
|
def tearDown(self):
|
|
sys.setprofile(None)
|
|
|
|
def test_empty(self):
|
|
self.assertIsNone(sys.getprofile())
|
|
|
|
def test_setget(self):
|
|
def fn(*args):
|
|
pass
|
|
|
|
sys.setprofile(fn)
|
|
self.assertIs(sys.getprofile(), fn)
|
|
|
|
class HookWatcher:
|
|
def __init__(self):
|
|
self.frames = []
|
|
self.events = []
|
|
|
|
def callback(self, frame, event, arg):
|
|
if (event == "call"
|
|
or event == "return"
|
|
or event == "exception"):
|
|
self.add_event(event, frame)
|
|
|
|
def add_event(self, event, frame=None):
|
|
"""Add an event to the log."""
|
|
if frame is None:
|
|
frame = sys._getframe(1)
|
|
|
|
try:
|
|
frameno = self.frames.index(frame)
|
|
except ValueError:
|
|
frameno = len(self.frames)
|
|
self.frames.append(frame)
|
|
|
|
self.events.append((frameno, event, ident(frame)))
|
|
|
|
def get_events(self):
|
|
"""Remove calls to add_event()."""
|
|
disallowed = [ident(self.add_event.__func__), ident(ident)]
|
|
self.frames = None
|
|
|
|
return [item for item in self.events if item[2] not in disallowed]
|
|
|
|
|
|
class ProfileSimulator(HookWatcher):
|
|
def __init__(self, testcase):
|
|
self.testcase = testcase
|
|
self.stack = []
|
|
HookWatcher.__init__(self)
|
|
|
|
def callback(self, frame, event, arg):
|
|
# Callback registered with sys.setprofile()/sys.settrace()
|
|
self.dispatch[event](self, frame)
|
|
|
|
def trace_call(self, frame):
|
|
self.add_event('call', frame)
|
|
self.stack.append(frame)
|
|
|
|
def trace_return(self, frame):
|
|
self.add_event('return', frame)
|
|
self.stack.pop()
|
|
|
|
def trace_exception(self, frame):
|
|
self.testcase.fail(
|
|
"the profiler should never receive exception events")
|
|
|
|
def trace_pass(self, frame):
|
|
pass
|
|
|
|
dispatch = {
|
|
'call': trace_call,
|
|
'exception': trace_exception,
|
|
'return': trace_return,
|
|
'c_call': trace_pass,
|
|
'c_return': trace_pass,
|
|
'c_exception': trace_pass,
|
|
}
|
|
|
|
|
|
class TestCaseBase(unittest.TestCase):
|
|
def check_events(self, callable, expected):
|
|
events = capture_events(callable, self.new_watcher())
|
|
if events != expected:
|
|
self.fail("Expected events:\n%s\nReceived events:\n%s"
|
|
% (pprint.pformat(expected), pprint.pformat(events)))
|
|
|
|
|
|
class ProfileHookTestCase(TestCaseBase):
|
|
def new_watcher(self):
|
|
return HookWatcher()
|
|
|
|
def test_simple(self):
|
|
def f(p):
|
|
pass
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_exception(self):
|
|
def f(p):
|
|
1/0
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_caught_exception(self):
|
|
def f(p):
|
|
try: 1/0
|
|
except: pass
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_caught_nested_exception(self):
|
|
def f(p):
|
|
try: 1/0
|
|
except: pass
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_nested_exception(self):
|
|
def f(p):
|
|
1/0
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
# This isn't what I expected:
|
|
# (0, 'exception', protect_ident),
|
|
# I expected this again:
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_exception_in_except_clause(self):
|
|
def f(p):
|
|
1/0
|
|
def g(p):
|
|
try:
|
|
f(p)
|
|
except:
|
|
try: f(p)
|
|
except: pass
|
|
f_ident = ident(f)
|
|
g_ident = ident(g)
|
|
self.check_events(g, [(1, 'call', g_ident),
|
|
(2, 'call', f_ident),
|
|
(2, 'return', f_ident),
|
|
(3, 'call', f_ident),
|
|
(3, 'return', f_ident),
|
|
(1, 'return', g_ident),
|
|
])
|
|
|
|
def test_exception_propagation(self):
|
|
def f(p):
|
|
1/0
|
|
def g(p):
|
|
try: f(p)
|
|
finally: p.add_event("falling through")
|
|
f_ident = ident(f)
|
|
g_ident = ident(g)
|
|
self.check_events(g, [(1, 'call', g_ident),
|
|
(2, 'call', f_ident),
|
|
(2, 'return', f_ident),
|
|
(1, 'falling through', g_ident),
|
|
(1, 'return', g_ident),
|
|
])
|
|
|
|
def test_raise_twice(self):
|
|
def f(p):
|
|
try: 1/0
|
|
except: 1/0
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_raise_reraise(self):
|
|
def f(p):
|
|
try: 1/0
|
|
except: raise
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_raise(self):
|
|
def f(p):
|
|
raise Exception()
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_distant_exception(self):
|
|
def f():
|
|
1/0
|
|
def g():
|
|
f()
|
|
def h():
|
|
g()
|
|
def i():
|
|
h()
|
|
def j(p):
|
|
i()
|
|
f_ident = ident(f)
|
|
g_ident = ident(g)
|
|
h_ident = ident(h)
|
|
i_ident = ident(i)
|
|
j_ident = ident(j)
|
|
self.check_events(j, [(1, 'call', j_ident),
|
|
(2, 'call', i_ident),
|
|
(3, 'call', h_ident),
|
|
(4, 'call', g_ident),
|
|
(5, 'call', f_ident),
|
|
(5, 'return', f_ident),
|
|
(4, 'return', g_ident),
|
|
(3, 'return', h_ident),
|
|
(2, 'return', i_ident),
|
|
(1, 'return', j_ident),
|
|
])
|
|
|
|
def test_generator(self):
|
|
def f():
|
|
for i in range(2):
|
|
yield i
|
|
def g(p):
|
|
for i in f():
|
|
pass
|
|
f_ident = ident(f)
|
|
g_ident = ident(g)
|
|
self.check_events(g, [(1, 'call', g_ident),
|
|
# call the iterator twice to generate values
|
|
(2, 'call', f_ident),
|
|
(2, 'return', f_ident),
|
|
(2, 'call', f_ident),
|
|
(2, 'return', f_ident),
|
|
# once more; returns end-of-iteration with
|
|
# actually raising an exception
|
|
(2, 'call', f_ident),
|
|
(2, 'return', f_ident),
|
|
(1, 'return', g_ident),
|
|
])
|
|
|
|
def test_stop_iteration(self):
|
|
def f():
|
|
for i in range(2):
|
|
yield i
|
|
def g(p):
|
|
for i in f():
|
|
pass
|
|
f_ident = ident(f)
|
|
g_ident = ident(g)
|
|
self.check_events(g, [(1, 'call', g_ident),
|
|
# call the iterator twice to generate values
|
|
(2, 'call', f_ident),
|
|
(2, 'return', f_ident),
|
|
(2, 'call', f_ident),
|
|
(2, 'return', f_ident),
|
|
# once more to hit the raise:
|
|
(2, 'call', f_ident),
|
|
(2, 'return', f_ident),
|
|
(1, 'return', g_ident),
|
|
])
|
|
|
|
|
|
class ProfileSimulatorTestCase(TestCaseBase):
|
|
def new_watcher(self):
|
|
return ProfileSimulator(self)
|
|
|
|
def test_simple(self):
|
|
def f(p):
|
|
pass
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_basic_exception(self):
|
|
def f(p):
|
|
1/0
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_caught_exception(self):
|
|
def f(p):
|
|
try: 1/0
|
|
except: pass
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident),
|
|
])
|
|
|
|
def test_distant_exception(self):
|
|
def f():
|
|
1/0
|
|
def g():
|
|
f()
|
|
def h():
|
|
g()
|
|
def i():
|
|
h()
|
|
def j(p):
|
|
i()
|
|
f_ident = ident(f)
|
|
g_ident = ident(g)
|
|
h_ident = ident(h)
|
|
i_ident = ident(i)
|
|
j_ident = ident(j)
|
|
self.check_events(j, [(1, 'call', j_ident),
|
|
(2, 'call', i_ident),
|
|
(3, 'call', h_ident),
|
|
(4, 'call', g_ident),
|
|
(5, 'call', f_ident),
|
|
(5, 'return', f_ident),
|
|
(4, 'return', g_ident),
|
|
(3, 'return', h_ident),
|
|
(2, 'return', i_ident),
|
|
(1, 'return', j_ident),
|
|
])
|
|
|
|
# bpo-34125: profiling method_descriptor with **kwargs
|
|
def test_unbound_method(self):
|
|
kwargs = {}
|
|
def f(p):
|
|
dict.get({}, 42, **kwargs)
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident)])
|
|
|
|
# Test an invalid call (bpo-34126)
|
|
def test_unbound_method_no_args(self):
|
|
def f(p):
|
|
dict.get()
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident)])
|
|
|
|
# Test an invalid call (bpo-34126)
|
|
def test_unbound_method_invalid_args(self):
|
|
def f(p):
|
|
dict.get(print, 42)
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident)])
|
|
|
|
# Test an invalid call (bpo-34125)
|
|
def test_unbound_method_no_keyword_args(self):
|
|
kwargs = {}
|
|
def f(p):
|
|
dict.get(**kwargs)
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident)])
|
|
|
|
# Test an invalid call (bpo-34125)
|
|
def test_unbound_method_invalid_keyword_args(self):
|
|
kwargs = {}
|
|
def f(p):
|
|
dict.get(print, 42, **kwargs)
|
|
f_ident = ident(f)
|
|
self.check_events(f, [(1, 'call', f_ident),
|
|
(1, 'return', f_ident)])
|
|
|
|
|
|
def ident(function):
|
|
if hasattr(function, "f_code"):
|
|
code = function.f_code
|
|
else:
|
|
code = function.__code__
|
|
return code.co_firstlineno, code.co_name
|
|
|
|
|
|
def protect(f, p):
|
|
try: f(p)
|
|
except: pass
|
|
|
|
protect_ident = ident(protect)
|
|
|
|
|
|
def capture_events(callable, p=None):
|
|
if p is None:
|
|
p = HookWatcher()
|
|
# Disable the garbage collector. This prevents __del__s from showing up in
|
|
# traces.
|
|
old_gc = gc.isenabled()
|
|
gc.disable()
|
|
try:
|
|
sys.setprofile(p.callback)
|
|
protect(callable, p)
|
|
sys.setprofile(None)
|
|
finally:
|
|
if old_gc:
|
|
gc.enable()
|
|
return p.get_events()[1:-1]
|
|
|
|
|
|
def show_events(callable):
|
|
import pprint
|
|
pprint.pprint(capture_events(callable))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|