mirror of
https://github.com/python/cpython.git
synced 2024-12-12 03:04:15 +08:00
1411 lines
42 KiB
Python
1411 lines
42 KiB
Python
# Testing the line trace facility.
|
|
|
|
from test import support
|
|
import unittest
|
|
import sys
|
|
import difflib
|
|
import gc
|
|
from functools import wraps
|
|
import asyncio
|
|
|
|
|
|
class tracecontext:
|
|
"""Context manager that traces its enter and exit."""
|
|
def __init__(self, output, value):
|
|
self.output = output
|
|
self.value = value
|
|
|
|
def __enter__(self):
|
|
self.output.append(self.value)
|
|
|
|
def __exit__(self, *exc_info):
|
|
self.output.append(-self.value)
|
|
|
|
class asynctracecontext:
|
|
"""Asynchronous context manager that traces its aenter and aexit."""
|
|
def __init__(self, output, value):
|
|
self.output = output
|
|
self.value = value
|
|
|
|
async def __aenter__(self):
|
|
self.output.append(self.value)
|
|
|
|
async def __aexit__(self, *exc_info):
|
|
self.output.append(-self.value)
|
|
|
|
async def asynciter(iterable):
|
|
"""Convert an iterable to an asynchronous iterator."""
|
|
for x in iterable:
|
|
yield x
|
|
|
|
|
|
# A very basic example. If this fails, we're in deep trouble.
|
|
def basic():
|
|
return 1
|
|
|
|
basic.events = [(0, 'call'),
|
|
(1, 'line'),
|
|
(1, 'return')]
|
|
|
|
# Many of the tests below are tricky because they involve pass statements.
|
|
# If there is implicit control flow around a pass statement (in an except
|
|
# clause or else clause) under what conditions do you set a line number
|
|
# following that clause?
|
|
|
|
|
|
# The entire "while 0:" statement is optimized away. No code
|
|
# exists for it, so the line numbers skip directly from "del x"
|
|
# to "x = 1".
|
|
def arigo_example():
|
|
x = 1
|
|
del x
|
|
while 0:
|
|
pass
|
|
x = 1
|
|
|
|
arigo_example.events = [(0, 'call'),
|
|
(1, 'line'),
|
|
(2, 'line'),
|
|
(5, 'line'),
|
|
(5, 'return')]
|
|
|
|
# check that lines consisting of just one instruction get traced:
|
|
def one_instr_line():
|
|
x = 1
|
|
del x
|
|
x = 1
|
|
|
|
one_instr_line.events = [(0, 'call'),
|
|
(1, 'line'),
|
|
(2, 'line'),
|
|
(3, 'line'),
|
|
(3, 'return')]
|
|
|
|
def no_pop_tops(): # 0
|
|
x = 1 # 1
|
|
for a in range(2): # 2
|
|
if a: # 3
|
|
x = 1 # 4
|
|
else: # 5
|
|
x = 1 # 6
|
|
|
|
no_pop_tops.events = [(0, 'call'),
|
|
(1, 'line'),
|
|
(2, 'line'),
|
|
(3, 'line'),
|
|
(6, 'line'),
|
|
(2, 'line'),
|
|
(3, 'line'),
|
|
(4, 'line'),
|
|
(2, 'line'),
|
|
(2, 'return')]
|
|
|
|
def no_pop_blocks():
|
|
y = 1
|
|
while not y:
|
|
bla
|
|
x = 1
|
|
|
|
no_pop_blocks.events = [(0, 'call'),
|
|
(1, 'line'),
|
|
(2, 'line'),
|
|
(4, 'line'),
|
|
(4, 'return')]
|
|
|
|
def called(): # line -3
|
|
x = 1
|
|
|
|
def call(): # line 0
|
|
called()
|
|
|
|
call.events = [(0, 'call'),
|
|
(1, 'line'),
|
|
(-3, 'call'),
|
|
(-2, 'line'),
|
|
(-2, 'return'),
|
|
(1, 'return')]
|
|
|
|
def raises():
|
|
raise Exception
|
|
|
|
def test_raise():
|
|
try:
|
|
raises()
|
|
except Exception as exc:
|
|
x = 1
|
|
|
|
test_raise.events = [(0, 'call'),
|
|
(1, 'line'),
|
|
(2, 'line'),
|
|
(-3, 'call'),
|
|
(-2, 'line'),
|
|
(-2, 'exception'),
|
|
(-2, 'return'),
|
|
(2, 'exception'),
|
|
(3, 'line'),
|
|
(4, 'line'),
|
|
(4, 'return')]
|
|
|
|
def _settrace_and_return(tracefunc):
|
|
sys.settrace(tracefunc)
|
|
sys._getframe().f_back.f_trace = tracefunc
|
|
def settrace_and_return(tracefunc):
|
|
_settrace_and_return(tracefunc)
|
|
|
|
settrace_and_return.events = [(1, 'return')]
|
|
|
|
def _settrace_and_raise(tracefunc):
|
|
sys.settrace(tracefunc)
|
|
sys._getframe().f_back.f_trace = tracefunc
|
|
raise RuntimeError
|
|
def settrace_and_raise(tracefunc):
|
|
try:
|
|
_settrace_and_raise(tracefunc)
|
|
except RuntimeError as exc:
|
|
pass
|
|
|
|
settrace_and_raise.events = [(2, 'exception'),
|
|
(3, 'line'),
|
|
(4, 'line'),
|
|
(4, 'return')]
|
|
|
|
# implicit return example
|
|
# This test is interesting because of the else: pass
|
|
# part of the code. The code generate for the true
|
|
# part of the if contains a jump past the else branch.
|
|
# The compiler then generates an implicit "return None"
|
|
# Internally, the compiler visits the pass statement
|
|
# and stores its line number for use on the next instruction.
|
|
# The next instruction is the implicit return None.
|
|
def ireturn_example():
|
|
a = 5
|
|
b = 5
|
|
if a == b:
|
|
b = a+1
|
|
else:
|
|
pass
|
|
|
|
ireturn_example.events = [(0, 'call'),
|
|
(1, 'line'),
|
|
(2, 'line'),
|
|
(3, 'line'),
|
|
(4, 'line'),
|
|
(6, 'line'),
|
|
(6, 'return')]
|
|
|
|
# Tight loop with while(1) example (SF #765624)
|
|
def tightloop_example():
|
|
items = range(0, 3)
|
|
try:
|
|
i = 0
|
|
while 1:
|
|
b = items[i]; i+=1
|
|
except IndexError:
|
|
pass
|
|
|
|
tightloop_example.events = [(0, 'call'),
|
|
(1, 'line'),
|
|
(2, 'line'),
|
|
(3, 'line'),
|
|
(5, 'line'),
|
|
(5, 'line'),
|
|
(5, 'line'),
|
|
(5, 'line'),
|
|
(5, 'exception'),
|
|
(6, 'line'),
|
|
(7, 'line'),
|
|
(7, 'return')]
|
|
|
|
def tighterloop_example():
|
|
items = range(1, 4)
|
|
try:
|
|
i = 0
|
|
while 1: i = items[i]
|
|
except IndexError:
|
|
pass
|
|
|
|
tighterloop_example.events = [(0, 'call'),
|
|
(1, 'line'),
|
|
(2, 'line'),
|
|
(3, 'line'),
|
|
(4, 'line'),
|
|
(4, 'line'),
|
|
(4, 'line'),
|
|
(4, 'line'),
|
|
(4, 'exception'),
|
|
(5, 'line'),
|
|
(6, 'line'),
|
|
(6, 'return')]
|
|
|
|
def generator_function():
|
|
try:
|
|
yield True
|
|
"continued"
|
|
finally:
|
|
"finally"
|
|
def generator_example():
|
|
# any() will leave the generator before its end
|
|
x = any(generator_function())
|
|
|
|
# the following lines were not traced
|
|
for x in range(10):
|
|
y = x
|
|
|
|
generator_example.events = ([(0, 'call'),
|
|
(2, 'line'),
|
|
(-6, 'call'),
|
|
(-5, 'line'),
|
|
(-4, 'line'),
|
|
(-4, 'return'),
|
|
(-4, 'call'),
|
|
(-4, 'exception'),
|
|
(-1, 'line'),
|
|
(-1, 'return')] +
|
|
[(5, 'line'), (6, 'line')] * 10 +
|
|
[(5, 'line'), (5, 'return')])
|
|
|
|
|
|
class Tracer:
|
|
def __init__(self, trace_line_events=None, trace_opcode_events=None):
|
|
self.trace_line_events = trace_line_events
|
|
self.trace_opcode_events = trace_opcode_events
|
|
self.events = []
|
|
|
|
def _reconfigure_frame(self, frame):
|
|
if self.trace_line_events is not None:
|
|
frame.f_trace_lines = self.trace_line_events
|
|
if self.trace_opcode_events is not None:
|
|
frame.f_trace_opcodes = self.trace_opcode_events
|
|
|
|
def trace(self, frame, event, arg):
|
|
self._reconfigure_frame(frame)
|
|
self.events.append((frame.f_lineno, event))
|
|
return self.trace
|
|
|
|
def traceWithGenexp(self, frame, event, arg):
|
|
self._reconfigure_frame(frame)
|
|
(o for o in [1])
|
|
self.events.append((frame.f_lineno, event))
|
|
return self.trace
|
|
|
|
|
|
class TraceTestCase(unittest.TestCase):
|
|
|
|
# Disable gc collection when tracing, otherwise the
|
|
# deallocators may be traced as well.
|
|
def setUp(self):
|
|
self.using_gc = gc.isenabled()
|
|
gc.disable()
|
|
self.addCleanup(sys.settrace, sys.gettrace())
|
|
|
|
def tearDown(self):
|
|
if self.using_gc:
|
|
gc.enable()
|
|
|
|
@staticmethod
|
|
def make_tracer():
|
|
"""Helper to allow test subclasses to configure tracers differently"""
|
|
return Tracer()
|
|
|
|
def compare_events(self, line_offset, events, expected_events):
|
|
events = [(l - line_offset, e) for (l, e) in events]
|
|
if events != expected_events:
|
|
self.fail(
|
|
"events did not match expectation:\n" +
|
|
"\n".join(difflib.ndiff([str(x) for x in expected_events],
|
|
[str(x) for x in events])))
|
|
|
|
def run_and_compare(self, func, events):
|
|
tracer = self.make_tracer()
|
|
sys.settrace(tracer.trace)
|
|
func()
|
|
sys.settrace(None)
|
|
self.compare_events(func.__code__.co_firstlineno,
|
|
tracer.events, events)
|
|
|
|
def run_test(self, func):
|
|
self.run_and_compare(func, func.events)
|
|
|
|
def run_test2(self, func):
|
|
tracer = self.make_tracer()
|
|
func(tracer.trace)
|
|
sys.settrace(None)
|
|
self.compare_events(func.__code__.co_firstlineno,
|
|
tracer.events, func.events)
|
|
|
|
def test_set_and_retrieve_none(self):
|
|
sys.settrace(None)
|
|
assert sys.gettrace() is None
|
|
|
|
def test_set_and_retrieve_func(self):
|
|
def fn(*args):
|
|
pass
|
|
|
|
sys.settrace(fn)
|
|
try:
|
|
assert sys.gettrace() is fn
|
|
finally:
|
|
sys.settrace(None)
|
|
|
|
def test_01_basic(self):
|
|
self.run_test(basic)
|
|
def test_02_arigo(self):
|
|
self.run_test(arigo_example)
|
|
def test_03_one_instr(self):
|
|
self.run_test(one_instr_line)
|
|
def test_04_no_pop_blocks(self):
|
|
self.run_test(no_pop_blocks)
|
|
def test_05_no_pop_tops(self):
|
|
self.run_test(no_pop_tops)
|
|
def test_06_call(self):
|
|
self.run_test(call)
|
|
def test_07_raise(self):
|
|
self.run_test(test_raise)
|
|
|
|
def test_08_settrace_and_return(self):
|
|
self.run_test2(settrace_and_return)
|
|
def test_09_settrace_and_raise(self):
|
|
self.run_test2(settrace_and_raise)
|
|
def test_10_ireturn(self):
|
|
self.run_test(ireturn_example)
|
|
def test_11_tightloop(self):
|
|
self.run_test(tightloop_example)
|
|
def test_12_tighterloop(self):
|
|
self.run_test(tighterloop_example)
|
|
|
|
def test_13_genexp(self):
|
|
self.run_test(generator_example)
|
|
# issue1265: if the trace function contains a generator,
|
|
# and if the traced function contains another generator
|
|
# that is not completely exhausted, the trace stopped.
|
|
# Worse: the 'finally' clause was not invoked.
|
|
tracer = self.make_tracer()
|
|
sys.settrace(tracer.traceWithGenexp)
|
|
generator_example()
|
|
sys.settrace(None)
|
|
self.compare_events(generator_example.__code__.co_firstlineno,
|
|
tracer.events, generator_example.events)
|
|
|
|
def test_14_onliner_if(self):
|
|
def onliners():
|
|
if True: x=False
|
|
else: x=True
|
|
return 0
|
|
self.run_and_compare(
|
|
onliners,
|
|
[(0, 'call'),
|
|
(1, 'line'),
|
|
(3, 'line'),
|
|
(3, 'return')])
|
|
|
|
def test_15_loops(self):
|
|
# issue1750076: "while" expression is skipped by debugger
|
|
def for_example():
|
|
for x in range(2):
|
|
pass
|
|
self.run_and_compare(
|
|
for_example,
|
|
[(0, 'call'),
|
|
(1, 'line'),
|
|
(2, 'line'),
|
|
(1, 'line'),
|
|
(2, 'line'),
|
|
(1, 'line'),
|
|
(1, 'return')])
|
|
|
|
def while_example():
|
|
# While expression should be traced on every loop
|
|
x = 2
|
|
while x > 0:
|
|
x -= 1
|
|
self.run_and_compare(
|
|
while_example,
|
|
[(0, 'call'),
|
|
(2, 'line'),
|
|
(3, 'line'),
|
|
(4, 'line'),
|
|
(3, 'line'),
|
|
(4, 'line'),
|
|
(3, 'line'),
|
|
(3, 'return')])
|
|
|
|
def test_16_blank_lines(self):
|
|
namespace = {}
|
|
exec("def f():\n" + "\n" * 256 + " pass", namespace)
|
|
self.run_and_compare(
|
|
namespace["f"],
|
|
[(0, 'call'),
|
|
(257, 'line'),
|
|
(257, 'return')])
|
|
|
|
def test_17_none_f_trace(self):
|
|
# Issue 20041: fix TypeError when f_trace is set to None.
|
|
def func():
|
|
sys._getframe().f_trace = None
|
|
lineno = 2
|
|
self.run_and_compare(func,
|
|
[(0, 'call'),
|
|
(1, 'line')])
|
|
|
|
|
|
class SkipLineEventsTraceTestCase(TraceTestCase):
|
|
"""Repeat the trace tests, but with per-line events skipped"""
|
|
|
|
def compare_events(self, line_offset, events, expected_events):
|
|
skip_line_events = [e for e in expected_events if e[1] != 'line']
|
|
super().compare_events(line_offset, events, skip_line_events)
|
|
|
|
@staticmethod
|
|
def make_tracer():
|
|
return Tracer(trace_line_events=False)
|
|
|
|
|
|
@support.cpython_only
|
|
class TraceOpcodesTestCase(TraceTestCase):
|
|
"""Repeat the trace tests, but with per-opcodes events enabled"""
|
|
|
|
def compare_events(self, line_offset, events, expected_events):
|
|
skip_opcode_events = [e for e in events if e[1] != 'opcode']
|
|
if len(events) > 1:
|
|
self.assertLess(len(skip_opcode_events), len(events),
|
|
msg="No 'opcode' events received by the tracer")
|
|
super().compare_events(line_offset, skip_opcode_events, expected_events)
|
|
|
|
@staticmethod
|
|
def make_tracer():
|
|
return Tracer(trace_opcode_events=True)
|
|
|
|
|
|
class RaisingTraceFuncTestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.addCleanup(sys.settrace, sys.gettrace())
|
|
|
|
def trace(self, frame, event, arg):
|
|
"""A trace function that raises an exception in response to a
|
|
specific trace event."""
|
|
if event == self.raiseOnEvent:
|
|
raise ValueError # just something that isn't RuntimeError
|
|
else:
|
|
return self.trace
|
|
|
|
def f(self):
|
|
"""The function to trace; raises an exception if that's the case
|
|
we're testing, so that the 'exception' trace event fires."""
|
|
if self.raiseOnEvent == 'exception':
|
|
x = 0
|
|
y = 1/x
|
|
else:
|
|
return 1
|
|
|
|
def run_test_for_event(self, event):
|
|
"""Tests that an exception raised in response to the given event is
|
|
handled OK."""
|
|
self.raiseOnEvent = event
|
|
try:
|
|
for i in range(sys.getrecursionlimit() + 1):
|
|
sys.settrace(self.trace)
|
|
try:
|
|
self.f()
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
self.fail("exception not raised!")
|
|
except RuntimeError:
|
|
self.fail("recursion counter not reset")
|
|
|
|
# Test the handling of exceptions raised by each kind of trace event.
|
|
def test_call(self):
|
|
self.run_test_for_event('call')
|
|
def test_line(self):
|
|
self.run_test_for_event('line')
|
|
def test_return(self):
|
|
self.run_test_for_event('return')
|
|
def test_exception(self):
|
|
self.run_test_for_event('exception')
|
|
|
|
def test_trash_stack(self):
|
|
def f():
|
|
for i in range(5):
|
|
print(i) # line tracing will raise an exception at this line
|
|
|
|
def g(frame, why, extra):
|
|
if (why == 'line' and
|
|
frame.f_lineno == f.__code__.co_firstlineno + 2):
|
|
raise RuntimeError("i am crashing")
|
|
return g
|
|
|
|
sys.settrace(g)
|
|
try:
|
|
f()
|
|
except RuntimeError:
|
|
# the test is really that this doesn't segfault:
|
|
import gc
|
|
gc.collect()
|
|
else:
|
|
self.fail("exception not propagated")
|
|
|
|
|
|
def test_exception_arguments(self):
|
|
def f():
|
|
x = 0
|
|
# this should raise an error
|
|
x.no_such_attr
|
|
def g(frame, event, arg):
|
|
if (event == 'exception'):
|
|
type, exception, trace = arg
|
|
self.assertIsInstance(exception, Exception)
|
|
return g
|
|
|
|
existing = sys.gettrace()
|
|
try:
|
|
sys.settrace(g)
|
|
try:
|
|
f()
|
|
except AttributeError:
|
|
# this is expected
|
|
pass
|
|
finally:
|
|
sys.settrace(existing)
|
|
|
|
|
|
# 'Jump' tests: assigning to frame.f_lineno within a trace function
|
|
# moves the execution position - it's how debuggers implement a Jump
|
|
# command (aka. "Set next statement").
|
|
|
|
class JumpTracer:
|
|
"""Defines a trace function that jumps from one place to another."""
|
|
|
|
def __init__(self, function, jumpFrom, jumpTo, event='line',
|
|
decorated=False):
|
|
self.code = function.__code__
|
|
self.jumpFrom = jumpFrom
|
|
self.jumpTo = jumpTo
|
|
self.event = event
|
|
self.firstLine = None if decorated else self.code.co_firstlineno
|
|
self.done = False
|
|
|
|
def trace(self, frame, event, arg):
|
|
if self.done:
|
|
return
|
|
# frame.f_code.co_firstlineno is the first line of the decorator when
|
|
# 'function' is decorated and the decorator may be written using
|
|
# multiple physical lines when it is too long. Use the first line
|
|
# trace event in 'function' to find the first line of 'function'.
|
|
if (self.firstLine is None and frame.f_code == self.code and
|
|
event == 'line'):
|
|
self.firstLine = frame.f_lineno - 1
|
|
if (event == self.event and self.firstLine and
|
|
frame.f_lineno == self.firstLine + self.jumpFrom):
|
|
f = frame
|
|
while f is not None and f.f_code != self.code:
|
|
f = f.f_back
|
|
if f is not None:
|
|
# Cope with non-integer self.jumpTo (because of
|
|
# no_jump_to_non_integers below).
|
|
try:
|
|
frame.f_lineno = self.firstLine + self.jumpTo
|
|
except TypeError:
|
|
frame.f_lineno = self.jumpTo
|
|
self.done = True
|
|
return self.trace
|
|
|
|
# This verifies the line-numbers-must-be-integers rule.
|
|
def no_jump_to_non_integers(output):
|
|
try:
|
|
output.append(2)
|
|
except ValueError as e:
|
|
output.append('integer' in str(e))
|
|
|
|
# This verifies that you can't set f_lineno via _getframe or similar
|
|
# trickery.
|
|
def no_jump_without_trace_function():
|
|
try:
|
|
previous_frame = sys._getframe().f_back
|
|
previous_frame.f_lineno = previous_frame.f_lineno
|
|
except ValueError as e:
|
|
# This is the exception we wanted; make sure the error message
|
|
# talks about trace functions.
|
|
if 'trace' not in str(e):
|
|
raise
|
|
else:
|
|
# Something's wrong - the expected exception wasn't raised.
|
|
raise AssertionError("Trace-function-less jump failed to fail")
|
|
|
|
|
|
class JumpTestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.addCleanup(sys.settrace, sys.gettrace())
|
|
sys.settrace(None)
|
|
|
|
def compare_jump_output(self, expected, received):
|
|
if received != expected:
|
|
self.fail( "Outputs don't match:\n" +
|
|
"Expected: " + repr(expected) + "\n" +
|
|
"Received: " + repr(received))
|
|
|
|
def run_test(self, func, jumpFrom, jumpTo, expected, error=None,
|
|
event='line', decorated=False):
|
|
tracer = JumpTracer(func, jumpFrom, jumpTo, event, decorated)
|
|
sys.settrace(tracer.trace)
|
|
output = []
|
|
if error is None:
|
|
func(output)
|
|
else:
|
|
with self.assertRaisesRegex(*error):
|
|
func(output)
|
|
sys.settrace(None)
|
|
self.compare_jump_output(expected, output)
|
|
|
|
def run_async_test(self, func, jumpFrom, jumpTo, expected, error=None,
|
|
event='line', decorated=False):
|
|
tracer = JumpTracer(func, jumpFrom, jumpTo, event, decorated)
|
|
sys.settrace(tracer.trace)
|
|
output = []
|
|
if error is None:
|
|
asyncio.run(func(output))
|
|
else:
|
|
with self.assertRaisesRegex(*error):
|
|
asyncio.run(func(output))
|
|
sys.settrace(None)
|
|
asyncio.set_event_loop_policy(None)
|
|
self.compare_jump_output(expected, output)
|
|
|
|
def jump_test(jumpFrom, jumpTo, expected, error=None, event='line'):
|
|
"""Decorator that creates a test that makes a jump
|
|
from one place to another in the following code.
|
|
"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def test(self):
|
|
self.run_test(func, jumpFrom, jumpTo, expected,
|
|
error=error, event=event, decorated=True)
|
|
return test
|
|
return decorator
|
|
|
|
def async_jump_test(jumpFrom, jumpTo, expected, error=None, event='line'):
|
|
"""Decorator that creates a test that makes a jump
|
|
from one place to another in the following asynchronous code.
|
|
"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def test(self):
|
|
self.run_async_test(func, jumpFrom, jumpTo, expected,
|
|
error=error, event=event, decorated=True)
|
|
return test
|
|
return decorator
|
|
|
|
## The first set of 'jump' tests are for things that are allowed:
|
|
|
|
@jump_test(1, 3, [3])
|
|
def test_jump_simple_forwards(output):
|
|
output.append(1)
|
|
output.append(2)
|
|
output.append(3)
|
|
|
|
@jump_test(2, 1, [1, 1, 2])
|
|
def test_jump_simple_backwards(output):
|
|
output.append(1)
|
|
output.append(2)
|
|
|
|
@jump_test(3, 5, [2, 5])
|
|
def test_jump_out_of_block_forwards(output):
|
|
for i in 1, 2:
|
|
output.append(2)
|
|
for j in [3]: # Also tests jumping over a block
|
|
output.append(4)
|
|
output.append(5)
|
|
|
|
@jump_test(6, 1, [1, 3, 5, 1, 3, 5, 6, 7])
|
|
def test_jump_out_of_block_backwards(output):
|
|
output.append(1)
|
|
for i in [1]:
|
|
output.append(3)
|
|
for j in [2]: # Also tests jumping over a block
|
|
output.append(5)
|
|
output.append(6)
|
|
output.append(7)
|
|
|
|
@async_jump_test(4, 5, [3, 5])
|
|
async def test_jump_out_of_async_for_block_forwards(output):
|
|
for i in [1]:
|
|
async for i in asynciter([1, 2]):
|
|
output.append(3)
|
|
output.append(4)
|
|
output.append(5)
|
|
|
|
@async_jump_test(5, 2, [2, 4, 2, 4, 5, 6])
|
|
async def test_jump_out_of_async_for_block_backwards(output):
|
|
for i in [1]:
|
|
output.append(2)
|
|
async for i in asynciter([1]):
|
|
output.append(4)
|
|
output.append(5)
|
|
output.append(6)
|
|
|
|
@jump_test(1, 2, [3])
|
|
def test_jump_to_codeless_line(output):
|
|
output.append(1)
|
|
# Jumping to this line should skip to the next one.
|
|
output.append(3)
|
|
|
|
@jump_test(2, 2, [1, 2, 3])
|
|
def test_jump_to_same_line(output):
|
|
output.append(1)
|
|
output.append(2)
|
|
output.append(3)
|
|
|
|
# Tests jumping within a finally block, and over one.
|
|
@jump_test(4, 9, [2, 9])
|
|
def test_jump_in_nested_finally(output):
|
|
try:
|
|
output.append(2)
|
|
finally:
|
|
output.append(4)
|
|
try:
|
|
output.append(6)
|
|
finally:
|
|
output.append(8)
|
|
output.append(9)
|
|
|
|
@jump_test(6, 7, [2, 7], (ZeroDivisionError, ''))
|
|
def test_jump_in_nested_finally_2(output):
|
|
try:
|
|
output.append(2)
|
|
1/0
|
|
return
|
|
finally:
|
|
output.append(6)
|
|
output.append(7)
|
|
output.append(8)
|
|
|
|
@jump_test(6, 11, [2, 11], (ZeroDivisionError, ''))
|
|
def test_jump_in_nested_finally_3(output):
|
|
try:
|
|
output.append(2)
|
|
1/0
|
|
return
|
|
finally:
|
|
output.append(6)
|
|
try:
|
|
output.append(8)
|
|
finally:
|
|
output.append(10)
|
|
output.append(11)
|
|
output.append(12)
|
|
|
|
@jump_test(5, 11, [2, 4, 12])
|
|
def test_jump_over_return_try_finally_in_finally_block(output):
|
|
try:
|
|
output.append(2)
|
|
finally:
|
|
output.append(4)
|
|
output.append(5)
|
|
return
|
|
try:
|
|
output.append(8)
|
|
finally:
|
|
output.append(10)
|
|
pass
|
|
output.append(12)
|
|
|
|
@jump_test(3, 4, [1, 4])
|
|
def test_jump_infinite_while_loop(output):
|
|
output.append(1)
|
|
while True:
|
|
output.append(3)
|
|
output.append(4)
|
|
|
|
@jump_test(2, 4, [4, 4])
|
|
def test_jump_forwards_into_while_block(output):
|
|
i = 1
|
|
output.append(2)
|
|
while i <= 2:
|
|
output.append(4)
|
|
i += 1
|
|
|
|
@jump_test(5, 3, [3, 3, 3, 5])
|
|
def test_jump_backwards_into_while_block(output):
|
|
i = 1
|
|
while i <= 2:
|
|
output.append(3)
|
|
i += 1
|
|
output.append(5)
|
|
|
|
@jump_test(2, 3, [1, 3])
|
|
def test_jump_forwards_out_of_with_block(output):
|
|
with tracecontext(output, 1):
|
|
output.append(2)
|
|
output.append(3)
|
|
|
|
@async_jump_test(2, 3, [1, 3])
|
|
async def test_jump_forwards_out_of_async_with_block(output):
|
|
async with asynctracecontext(output, 1):
|
|
output.append(2)
|
|
output.append(3)
|
|
|
|
@jump_test(3, 1, [1, 2, 1, 2, 3, -2])
|
|
def test_jump_backwards_out_of_with_block(output):
|
|
output.append(1)
|
|
with tracecontext(output, 2):
|
|
output.append(3)
|
|
|
|
@async_jump_test(3, 1, [1, 2, 1, 2, 3, -2])
|
|
async def test_jump_backwards_out_of_async_with_block(output):
|
|
output.append(1)
|
|
async with asynctracecontext(output, 2):
|
|
output.append(3)
|
|
|
|
@jump_test(2, 5, [5])
|
|
def test_jump_forwards_out_of_try_finally_block(output):
|
|
try:
|
|
output.append(2)
|
|
finally:
|
|
output.append(4)
|
|
output.append(5)
|
|
|
|
@jump_test(3, 1, [1, 1, 3, 5])
|
|
def test_jump_backwards_out_of_try_finally_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
finally:
|
|
output.append(5)
|
|
|
|
@jump_test(2, 6, [6])
|
|
def test_jump_forwards_out_of_try_except_block(output):
|
|
try:
|
|
output.append(2)
|
|
except:
|
|
output.append(4)
|
|
raise
|
|
output.append(6)
|
|
|
|
@jump_test(3, 1, [1, 1, 3])
|
|
def test_jump_backwards_out_of_try_except_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
except:
|
|
output.append(5)
|
|
raise
|
|
|
|
@jump_test(5, 7, [4, 7, 8])
|
|
def test_jump_between_except_blocks(output):
|
|
try:
|
|
1/0
|
|
except ZeroDivisionError:
|
|
output.append(4)
|
|
output.append(5)
|
|
except FloatingPointError:
|
|
output.append(7)
|
|
output.append(8)
|
|
|
|
@jump_test(5, 6, [4, 6, 7])
|
|
def test_jump_within_except_block(output):
|
|
try:
|
|
1/0
|
|
except:
|
|
output.append(4)
|
|
output.append(5)
|
|
output.append(6)
|
|
output.append(7)
|
|
|
|
@jump_test(2, 4, [1, 4, 5, -4])
|
|
def test_jump_across_with(output):
|
|
output.append(1)
|
|
with tracecontext(output, 2):
|
|
output.append(3)
|
|
with tracecontext(output, 4):
|
|
output.append(5)
|
|
|
|
@async_jump_test(2, 4, [1, 4, 5, -4])
|
|
async def test_jump_across_async_with(output):
|
|
output.append(1)
|
|
async with asynctracecontext(output, 2):
|
|
output.append(3)
|
|
async with asynctracecontext(output, 4):
|
|
output.append(5)
|
|
|
|
@jump_test(4, 5, [1, 3, 5, 6])
|
|
def test_jump_out_of_with_block_within_for_block(output):
|
|
output.append(1)
|
|
for i in [1]:
|
|
with tracecontext(output, 3):
|
|
output.append(4)
|
|
output.append(5)
|
|
output.append(6)
|
|
|
|
@async_jump_test(4, 5, [1, 3, 5, 6])
|
|
async def test_jump_out_of_async_with_block_within_for_block(output):
|
|
output.append(1)
|
|
for i in [1]:
|
|
async with asynctracecontext(output, 3):
|
|
output.append(4)
|
|
output.append(5)
|
|
output.append(6)
|
|
|
|
@jump_test(4, 5, [1, 2, 3, 5, -2, 6])
|
|
def test_jump_out_of_with_block_within_with_block(output):
|
|
output.append(1)
|
|
with tracecontext(output, 2):
|
|
with tracecontext(output, 3):
|
|
output.append(4)
|
|
output.append(5)
|
|
output.append(6)
|
|
|
|
@async_jump_test(4, 5, [1, 2, 3, 5, -2, 6])
|
|
async def test_jump_out_of_async_with_block_within_with_block(output):
|
|
output.append(1)
|
|
with tracecontext(output, 2):
|
|
async with asynctracecontext(output, 3):
|
|
output.append(4)
|
|
output.append(5)
|
|
output.append(6)
|
|
|
|
@jump_test(5, 6, [2, 4, 6, 7])
|
|
def test_jump_out_of_with_block_within_finally_block(output):
|
|
try:
|
|
output.append(2)
|
|
finally:
|
|
with tracecontext(output, 4):
|
|
output.append(5)
|
|
output.append(6)
|
|
output.append(7)
|
|
|
|
@async_jump_test(5, 6, [2, 4, 6, 7])
|
|
async def test_jump_out_of_async_with_block_within_finally_block(output):
|
|
try:
|
|
output.append(2)
|
|
finally:
|
|
async with asynctracecontext(output, 4):
|
|
output.append(5)
|
|
output.append(6)
|
|
output.append(7)
|
|
|
|
@jump_test(8, 11, [1, 3, 5, 11, 12])
|
|
def test_jump_out_of_complex_nested_blocks(output):
|
|
output.append(1)
|
|
for i in [1]:
|
|
output.append(3)
|
|
for j in [1, 2]:
|
|
output.append(5)
|
|
try:
|
|
for k in [1, 2]:
|
|
output.append(8)
|
|
finally:
|
|
output.append(10)
|
|
output.append(11)
|
|
output.append(12)
|
|
|
|
@jump_test(3, 5, [1, 2, 5])
|
|
def test_jump_out_of_with_assignment(output):
|
|
output.append(1)
|
|
with tracecontext(output, 2) \
|
|
as x:
|
|
output.append(4)
|
|
output.append(5)
|
|
|
|
@async_jump_test(3, 5, [1, 2, 5])
|
|
async def test_jump_out_of_async_with_assignment(output):
|
|
output.append(1)
|
|
async with asynctracecontext(output, 2) \
|
|
as x:
|
|
output.append(4)
|
|
output.append(5)
|
|
|
|
@jump_test(3, 6, [1, 6, 8, 9])
|
|
def test_jump_over_return_in_try_finally_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
if not output: # always false
|
|
return
|
|
output.append(6)
|
|
finally:
|
|
output.append(8)
|
|
output.append(9)
|
|
|
|
@jump_test(5, 8, [1, 3, 8, 10, 11, 13])
|
|
def test_jump_over_break_in_try_finally_block(output):
|
|
output.append(1)
|
|
while True:
|
|
output.append(3)
|
|
try:
|
|
output.append(5)
|
|
if not output: # always false
|
|
break
|
|
output.append(8)
|
|
finally:
|
|
output.append(10)
|
|
output.append(11)
|
|
break
|
|
output.append(13)
|
|
|
|
@jump_test(1, 7, [7, 8])
|
|
def test_jump_over_for_block_before_else(output):
|
|
output.append(1)
|
|
if not output: # always false
|
|
for i in [3]:
|
|
output.append(4)
|
|
else:
|
|
output.append(6)
|
|
output.append(7)
|
|
output.append(8)
|
|
|
|
@async_jump_test(1, 7, [7, 8])
|
|
async def test_jump_over_async_for_block_before_else(output):
|
|
output.append(1)
|
|
if not output: # always false
|
|
async for i in asynciter([3]):
|
|
output.append(4)
|
|
else:
|
|
output.append(6)
|
|
output.append(7)
|
|
output.append(8)
|
|
|
|
# The second set of 'jump' tests are for things that are not allowed:
|
|
|
|
@jump_test(2, 3, [1], (ValueError, 'after'))
|
|
def test_no_jump_too_far_forwards(output):
|
|
output.append(1)
|
|
output.append(2)
|
|
|
|
@jump_test(2, -2, [1], (ValueError, 'before'))
|
|
def test_no_jump_too_far_backwards(output):
|
|
output.append(1)
|
|
output.append(2)
|
|
|
|
# Test each kind of 'except' line.
|
|
@jump_test(2, 3, [4], (ValueError, 'except'))
|
|
def test_no_jump_to_except_1(output):
|
|
try:
|
|
output.append(2)
|
|
except:
|
|
output.append(4)
|
|
raise
|
|
|
|
@jump_test(2, 3, [4], (ValueError, 'except'))
|
|
def test_no_jump_to_except_2(output):
|
|
try:
|
|
output.append(2)
|
|
except ValueError:
|
|
output.append(4)
|
|
raise
|
|
|
|
@jump_test(2, 3, [4], (ValueError, 'except'))
|
|
def test_no_jump_to_except_3(output):
|
|
try:
|
|
output.append(2)
|
|
except ValueError as e:
|
|
output.append(4)
|
|
raise e
|
|
|
|
@jump_test(2, 3, [4], (ValueError, 'except'))
|
|
def test_no_jump_to_except_4(output):
|
|
try:
|
|
output.append(2)
|
|
except (ValueError, RuntimeError) as e:
|
|
output.append(4)
|
|
raise e
|
|
|
|
@jump_test(1, 3, [], (ValueError, 'into'))
|
|
def test_no_jump_forwards_into_for_block(output):
|
|
output.append(1)
|
|
for i in 1, 2:
|
|
output.append(3)
|
|
|
|
@async_jump_test(1, 3, [], (ValueError, 'into'))
|
|
async def test_no_jump_forwards_into_async_for_block(output):
|
|
output.append(1)
|
|
async for i in asynciter([1, 2]):
|
|
output.append(3)
|
|
|
|
@jump_test(3, 2, [2, 2], (ValueError, 'into'))
|
|
def test_no_jump_backwards_into_for_block(output):
|
|
for i in 1, 2:
|
|
output.append(2)
|
|
output.append(3)
|
|
|
|
@async_jump_test(3, 2, [2, 2], (ValueError, 'into'))
|
|
async def test_no_jump_backwards_into_async_for_block(output):
|
|
async for i in asynciter([1, 2]):
|
|
output.append(2)
|
|
output.append(3)
|
|
|
|
@jump_test(1, 3, [], (ValueError, 'into'))
|
|
def test_no_jump_forwards_into_with_block(output):
|
|
output.append(1)
|
|
with tracecontext(output, 2):
|
|
output.append(3)
|
|
|
|
@async_jump_test(1, 3, [], (ValueError, 'into'))
|
|
async def test_no_jump_forwards_into_async_with_block(output):
|
|
output.append(1)
|
|
async with asynctracecontext(output, 2):
|
|
output.append(3)
|
|
|
|
@jump_test(3, 2, [1, 2, -1], (ValueError, 'into'))
|
|
def test_no_jump_backwards_into_with_block(output):
|
|
with tracecontext(output, 1):
|
|
output.append(2)
|
|
output.append(3)
|
|
|
|
@async_jump_test(3, 2, [1, 2, -1], (ValueError, 'into'))
|
|
async def test_no_jump_backwards_into_async_with_block(output):
|
|
async with asynctracecontext(output, 1):
|
|
output.append(2)
|
|
output.append(3)
|
|
|
|
@jump_test(1, 3, [], (ValueError, 'into'))
|
|
def test_no_jump_forwards_into_try_finally_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
finally:
|
|
output.append(5)
|
|
|
|
@jump_test(5, 2, [2, 4], (ValueError, 'into'))
|
|
def test_no_jump_backwards_into_try_finally_block(output):
|
|
try:
|
|
output.append(2)
|
|
finally:
|
|
output.append(4)
|
|
output.append(5)
|
|
|
|
@jump_test(1, 3, [], (ValueError, 'into'))
|
|
def test_no_jump_forwards_into_try_except_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
except:
|
|
output.append(5)
|
|
raise
|
|
|
|
@jump_test(6, 2, [2], (ValueError, 'into'))
|
|
def test_no_jump_backwards_into_try_except_block(output):
|
|
try:
|
|
output.append(2)
|
|
except:
|
|
output.append(4)
|
|
raise
|
|
output.append(6)
|
|
|
|
# 'except' with a variable creates an implicit finally block
|
|
@jump_test(5, 7, [4], (ValueError, 'into'))
|
|
def test_no_jump_between_except_blocks_2(output):
|
|
try:
|
|
1/0
|
|
except ZeroDivisionError:
|
|
output.append(4)
|
|
output.append(5)
|
|
except FloatingPointError as e:
|
|
output.append(7)
|
|
output.append(8)
|
|
|
|
@jump_test(1, 5, [], (ValueError, "into a 'finally'"))
|
|
def test_no_jump_into_finally_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
finally:
|
|
output.append(5)
|
|
|
|
@jump_test(3, 6, [2, 5, 6], (ValueError, "into a 'finally'"))
|
|
def test_no_jump_into_finally_block_from_try_block(output):
|
|
try:
|
|
output.append(2)
|
|
output.append(3)
|
|
finally: # still executed if the jump is failed
|
|
output.append(5)
|
|
output.append(6)
|
|
output.append(7)
|
|
|
|
@jump_test(5, 1, [1, 3], (ValueError, "out of a 'finally'"))
|
|
def test_no_jump_out_of_finally_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
finally:
|
|
output.append(5)
|
|
|
|
@jump_test(1, 5, [], (ValueError, "into an 'except'"))
|
|
def test_no_jump_into_bare_except_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
except:
|
|
output.append(5)
|
|
|
|
@jump_test(1, 5, [], (ValueError, "into an 'except'"))
|
|
def test_no_jump_into_qualified_except_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
except Exception:
|
|
output.append(5)
|
|
|
|
@jump_test(3, 6, [2, 5, 6], (ValueError, "into an 'except'"))
|
|
def test_no_jump_into_bare_except_block_from_try_block(output):
|
|
try:
|
|
output.append(2)
|
|
output.append(3)
|
|
except: # executed if the jump is failed
|
|
output.append(5)
|
|
output.append(6)
|
|
raise
|
|
output.append(8)
|
|
|
|
@jump_test(3, 6, [2], (ValueError, "into an 'except'"))
|
|
def test_no_jump_into_qualified_except_block_from_try_block(output):
|
|
try:
|
|
output.append(2)
|
|
output.append(3)
|
|
except ZeroDivisionError:
|
|
output.append(5)
|
|
output.append(6)
|
|
raise
|
|
output.append(8)
|
|
|
|
@jump_test(7, 1, [1, 3, 6], (ValueError, "out of an 'except'"))
|
|
def test_no_jump_out_of_bare_except_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
1/0
|
|
except:
|
|
output.append(6)
|
|
output.append(7)
|
|
|
|
@jump_test(7, 1, [1, 3, 6], (ValueError, "out of an 'except'"))
|
|
def test_no_jump_out_of_qualified_except_block(output):
|
|
output.append(1)
|
|
try:
|
|
output.append(3)
|
|
1/0
|
|
except Exception:
|
|
output.append(6)
|
|
output.append(7)
|
|
|
|
@jump_test(3, 5, [1, 2, -2], (ValueError, 'into'))
|
|
def test_no_jump_between_with_blocks(output):
|
|
output.append(1)
|
|
with tracecontext(output, 2):
|
|
output.append(3)
|
|
with tracecontext(output, 4):
|
|
output.append(5)
|
|
|
|
@async_jump_test(3, 5, [1, 2, -2], (ValueError, 'into'))
|
|
async def test_no_jump_between_async_with_blocks(output):
|
|
output.append(1)
|
|
async with asynctracecontext(output, 2):
|
|
output.append(3)
|
|
async with asynctracecontext(output, 4):
|
|
output.append(5)
|
|
|
|
@jump_test(5, 7, [2, 4], (ValueError, 'finally'))
|
|
def test_no_jump_over_return_out_of_finally_block(output):
|
|
try:
|
|
output.append(2)
|
|
finally:
|
|
output.append(4)
|
|
output.append(5)
|
|
return
|
|
output.append(7)
|
|
|
|
@jump_test(7, 4, [1, 6], (ValueError, 'into'))
|
|
def test_no_jump_into_for_block_before_else(output):
|
|
output.append(1)
|
|
if not output: # always false
|
|
for i in [3]:
|
|
output.append(4)
|
|
else:
|
|
output.append(6)
|
|
output.append(7)
|
|
output.append(8)
|
|
|
|
@async_jump_test(7, 4, [1, 6], (ValueError, 'into'))
|
|
async def test_no_jump_into_async_for_block_before_else(output):
|
|
output.append(1)
|
|
if not output: # always false
|
|
async for i in asynciter([3]):
|
|
output.append(4)
|
|
else:
|
|
output.append(6)
|
|
output.append(7)
|
|
output.append(8)
|
|
|
|
def test_no_jump_to_non_integers(self):
|
|
self.run_test(no_jump_to_non_integers, 2, "Spam", [True])
|
|
|
|
def test_no_jump_without_trace_function(self):
|
|
# Must set sys.settrace(None) in setUp(), else condition is not
|
|
# triggered.
|
|
no_jump_without_trace_function()
|
|
|
|
def test_large_function(self):
|
|
d = {}
|
|
exec("""def f(output): # line 0
|
|
x = 0 # line 1
|
|
y = 1 # line 2
|
|
''' # line 3
|
|
%s # lines 4-1004
|
|
''' # line 1005
|
|
x += 1 # line 1006
|
|
output.append(x) # line 1007
|
|
return""" % ('\n' * 1000,), d)
|
|
f = d['f']
|
|
self.run_test(f, 2, 1007, [0])
|
|
|
|
def test_jump_to_firstlineno(self):
|
|
# This tests that PDB can jump back to the first line in a
|
|
# file. See issue #1689458. It can only be triggered in a
|
|
# function call if the function is defined on a single line.
|
|
code = compile("""
|
|
# Comments don't count.
|
|
output.append(2) # firstlineno is here.
|
|
output.append(3)
|
|
output.append(4)
|
|
""", "<fake module>", "exec")
|
|
class fake_function:
|
|
__code__ = code
|
|
tracer = JumpTracer(fake_function, 2, 0)
|
|
sys.settrace(tracer.trace)
|
|
namespace = {"output": []}
|
|
exec(code, namespace)
|
|
sys.settrace(None)
|
|
self.compare_jump_output([2, 3, 2, 3, 4], namespace["output"])
|
|
|
|
@jump_test(2, 3, [1], event='call', error=(ValueError, "can't jump from"
|
|
" the 'call' trace event of a new frame"))
|
|
def test_no_jump_from_call(output):
|
|
output.append(1)
|
|
def nested():
|
|
output.append(3)
|
|
nested()
|
|
output.append(5)
|
|
|
|
@jump_test(2, 1, [1], event='return', error=(ValueError,
|
|
"can only jump from a 'line' trace event"))
|
|
def test_no_jump_from_return_event(output):
|
|
output.append(1)
|
|
return
|
|
|
|
@jump_test(2, 1, [1], event='exception', error=(ValueError,
|
|
"can only jump from a 'line' trace event"))
|
|
def test_no_jump_from_exception_event(output):
|
|
output.append(1)
|
|
1 / 0
|
|
|
|
@jump_test(3, 2, [2], event='return', error=(ValueError,
|
|
"can't jump from a yield statement"))
|
|
def test_no_jump_from_yield(output):
|
|
def gen():
|
|
output.append(2)
|
|
yield 3
|
|
next(gen())
|
|
output.append(5)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|