mirror of
https://github.com/python/cpython.git
synced 2024-12-19 06:44:47 +08:00
a3ff101e6a
dump_traceback_later() and cancel_dump_tracebacks_later() to cancel_dump_traceback_later().
594 lines
18 KiB
Python
594 lines
18 KiB
Python
from contextlib import contextmanager
|
|
import datetime
|
|
import faulthandler
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
from test import support, script_helper
|
|
from test.script_helper import assert_python_ok
|
|
import tempfile
|
|
import unittest
|
|
|
|
try:
|
|
import threading
|
|
HAVE_THREADS = True
|
|
except ImportError:
|
|
HAVE_THREADS = False
|
|
|
|
TIMEOUT = 0.5
|
|
|
|
try:
|
|
from resource import setrlimit, RLIMIT_CORE, error as resource_error
|
|
except ImportError:
|
|
prepare_subprocess = None
|
|
else:
|
|
def prepare_subprocess():
|
|
# don't create core file
|
|
try:
|
|
setrlimit(RLIMIT_CORE, (0, 0))
|
|
except (ValueError, resource_error):
|
|
pass
|
|
|
|
def expected_traceback(lineno1, lineno2, header, min_count=1):
|
|
regex = header
|
|
regex += ' File "<string>", line %s in func\n' % lineno1
|
|
regex += ' File "<string>", line %s in <module>' % lineno2
|
|
if 1 < min_count:
|
|
return '^' + (regex + '\n') * (min_count - 1) + regex
|
|
else:
|
|
return '^' + regex + '$'
|
|
|
|
@contextmanager
|
|
def temporary_filename():
|
|
filename = tempfile.mktemp()
|
|
try:
|
|
yield filename
|
|
finally:
|
|
support.unlink(filename)
|
|
|
|
class FaultHandlerTests(unittest.TestCase):
|
|
def get_output(self, code, filename=None):
|
|
"""
|
|
Run the specified code in Python (in a new child process) and read the
|
|
output from the standard error or from a file (if filename is set).
|
|
Return the output lines as a list.
|
|
|
|
Strip the reference count from the standard error for Python debug
|
|
build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
|
|
thread XXX".
|
|
"""
|
|
options = {}
|
|
if prepare_subprocess:
|
|
options['preexec_fn'] = prepare_subprocess
|
|
process = script_helper.spawn_python('-c', code, **options)
|
|
stdout, stderr = process.communicate()
|
|
exitcode = process.wait()
|
|
output = support.strip_python_stderr(stdout)
|
|
output = output.decode('ascii', 'backslashreplace')
|
|
if filename:
|
|
self.assertEqual(output, '')
|
|
with open(filename, "rb") as fp:
|
|
output = fp.read()
|
|
output = output.decode('ascii', 'backslashreplace')
|
|
output = re.sub('Current thread 0x[0-9a-f]+',
|
|
'Current thread XXX',
|
|
output)
|
|
return output.splitlines(), exitcode
|
|
|
|
def check_fatal_error(self, code, line_number, name_regex,
|
|
filename=None, all_threads=True, other_regex=None):
|
|
"""
|
|
Check that the fault handler for fatal errors is enabled and check the
|
|
traceback from the child process output.
|
|
|
|
Raise an error if the output doesn't match the expected format.
|
|
"""
|
|
if all_threads:
|
|
header = 'Current thread XXX'
|
|
else:
|
|
header = 'Traceback (most recent call first)'
|
|
regex = """
|
|
^Fatal Python error: {name}
|
|
|
|
{header}:
|
|
File "<string>", line {lineno} in <module>
|
|
""".strip()
|
|
regex = regex.format(
|
|
lineno=line_number,
|
|
name=name_regex,
|
|
header=re.escape(header))
|
|
if other_regex:
|
|
regex += '|' + other_regex
|
|
output, exitcode = self.get_output(code, filename)
|
|
output = '\n'.join(output)
|
|
self.assertRegex(output, regex)
|
|
self.assertNotEqual(exitcode, 0)
|
|
|
|
def test_read_null(self):
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
faulthandler.enable()
|
|
faulthandler._read_null()
|
|
""".strip(),
|
|
3,
|
|
# Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
|
|
'(?:Segmentation fault|Bus error|Illegal instruction)')
|
|
|
|
def test_sigsegv(self):
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
faulthandler.enable()
|
|
faulthandler._sigsegv()
|
|
""".strip(),
|
|
3,
|
|
'Segmentation fault')
|
|
|
|
def test_sigabrt(self):
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
faulthandler.enable()
|
|
faulthandler._sigabrt()
|
|
""".strip(),
|
|
3,
|
|
'Aborted')
|
|
|
|
@unittest.skipIf(sys.platform == 'win32',
|
|
"SIGFPE cannot be caught on Windows")
|
|
def test_sigfpe(self):
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
faulthandler.enable()
|
|
faulthandler._sigfpe()
|
|
""".strip(),
|
|
3,
|
|
'Floating point exception')
|
|
|
|
@unittest.skipIf(not hasattr(faulthandler, '_sigbus'),
|
|
"need faulthandler._sigbus()")
|
|
def test_sigbus(self):
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
faulthandler.enable()
|
|
faulthandler._sigbus()
|
|
""".strip(),
|
|
3,
|
|
'Bus error')
|
|
|
|
@unittest.skipIf(not hasattr(faulthandler, '_sigill'),
|
|
"need faulthandler._sigill()")
|
|
def test_sigill(self):
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
faulthandler.enable()
|
|
faulthandler._sigill()
|
|
""".strip(),
|
|
3,
|
|
'Illegal instruction')
|
|
|
|
def test_fatal_error(self):
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
faulthandler._fatal_error(b'xyz')
|
|
""".strip(),
|
|
2,
|
|
'xyz')
|
|
|
|
@unittest.skipIf(sys.platform.startswith('openbsd') and HAVE_THREADS,
|
|
"Issue #12868: sigaltstack() doesn't work on "
|
|
"OpenBSD if Python is compiled with pthread")
|
|
@unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
|
|
'need faulthandler._stack_overflow()')
|
|
def test_stack_overflow(self):
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
faulthandler.enable()
|
|
faulthandler._stack_overflow()
|
|
""".strip(),
|
|
3,
|
|
'(?:Segmentation fault|Bus error)',
|
|
other_regex='unable to raise a stack overflow')
|
|
|
|
def test_gil_released(self):
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
faulthandler.enable()
|
|
faulthandler._read_null(True)
|
|
""".strip(),
|
|
3,
|
|
'(?:Segmentation fault|Bus error|Illegal instruction)')
|
|
|
|
def test_enable_file(self):
|
|
with temporary_filename() as filename:
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
output = open({filename}, 'wb')
|
|
faulthandler.enable(output)
|
|
faulthandler._read_null()
|
|
""".strip().format(filename=repr(filename)),
|
|
4,
|
|
'(?:Segmentation fault|Bus error|Illegal instruction)',
|
|
filename=filename)
|
|
|
|
def test_enable_single_thread(self):
|
|
self.check_fatal_error("""
|
|
import faulthandler
|
|
faulthandler.enable(all_threads=False)
|
|
faulthandler._read_null()
|
|
""".strip(),
|
|
3,
|
|
'(?:Segmentation fault|Bus error|Illegal instruction)',
|
|
all_threads=False)
|
|
|
|
def test_disable(self):
|
|
code = """
|
|
import faulthandler
|
|
faulthandler.enable()
|
|
faulthandler.disable()
|
|
faulthandler._read_null()
|
|
""".strip()
|
|
not_expected = 'Fatal Python error'
|
|
stderr, exitcode = self.get_output(code)
|
|
stder = '\n'.join(stderr)
|
|
self.assertTrue(not_expected not in stderr,
|
|
"%r is present in %r" % (not_expected, stderr))
|
|
self.assertNotEqual(exitcode, 0)
|
|
|
|
def test_is_enabled(self):
|
|
orig_stderr = sys.stderr
|
|
try:
|
|
# regrtest may replace sys.stderr by io.StringIO object, but
|
|
# faulthandler.enable() requires that sys.stderr has a fileno()
|
|
# method
|
|
sys.stderr = sys.__stderr__
|
|
|
|
was_enabled = faulthandler.is_enabled()
|
|
try:
|
|
faulthandler.enable()
|
|
self.assertTrue(faulthandler.is_enabled())
|
|
faulthandler.disable()
|
|
self.assertFalse(faulthandler.is_enabled())
|
|
finally:
|
|
if was_enabled:
|
|
faulthandler.enable()
|
|
else:
|
|
faulthandler.disable()
|
|
finally:
|
|
sys.stderr = orig_stderr
|
|
|
|
def test_disabled_by_default(self):
|
|
# By default, the module should be disabled
|
|
code = "import faulthandler; print(faulthandler.is_enabled())"
|
|
rc, stdout, stderr = assert_python_ok("-c", code)
|
|
stdout = (stdout + stderr).strip()
|
|
self.assertEqual(stdout, b"False")
|
|
|
|
def test_sys_xoptions(self):
|
|
# Test python -X faulthandler
|
|
code = "import faulthandler; print(faulthandler.is_enabled())"
|
|
rc, stdout, stderr = assert_python_ok("-X", "faulthandler", "-c", code)
|
|
stdout = (stdout + stderr).strip()
|
|
self.assertEqual(stdout, b"True")
|
|
|
|
def check_dump_traceback(self, filename):
|
|
"""
|
|
Explicitly call dump_traceback() function and check its output.
|
|
Raise an error if the output doesn't match the expected format.
|
|
"""
|
|
code = """
|
|
import faulthandler
|
|
|
|
def funcB():
|
|
if {has_filename}:
|
|
with open({filename}, "wb") as fp:
|
|
faulthandler.dump_traceback(fp, all_threads=False)
|
|
else:
|
|
faulthandler.dump_traceback(all_threads=False)
|
|
|
|
def funcA():
|
|
funcB()
|
|
|
|
funcA()
|
|
""".strip()
|
|
code = code.format(
|
|
filename=repr(filename),
|
|
has_filename=bool(filename),
|
|
)
|
|
if filename:
|
|
lineno = 6
|
|
else:
|
|
lineno = 8
|
|
expected = [
|
|
'Traceback (most recent call first):',
|
|
' File "<string>", line %s in funcB' % lineno,
|
|
' File "<string>", line 11 in funcA',
|
|
' File "<string>", line 13 in <module>'
|
|
]
|
|
trace, exitcode = self.get_output(code, filename)
|
|
self.assertEqual(trace, expected)
|
|
self.assertEqual(exitcode, 0)
|
|
|
|
def test_dump_traceback(self):
|
|
self.check_dump_traceback(None)
|
|
|
|
def test_dump_traceback_file(self):
|
|
with temporary_filename() as filename:
|
|
self.check_dump_traceback(filename)
|
|
|
|
def test_truncate(self):
|
|
maxlen = 500
|
|
func_name = 'x' * (maxlen + 50)
|
|
truncated = 'x' * maxlen + '...'
|
|
code = """
|
|
import faulthandler
|
|
|
|
def {func_name}():
|
|
faulthandler.dump_traceback(all_threads=False)
|
|
|
|
{func_name}()
|
|
""".strip()
|
|
code = code.format(
|
|
func_name=func_name,
|
|
)
|
|
expected = [
|
|
'Traceback (most recent call first):',
|
|
' File "<string>", line 4 in %s' % truncated,
|
|
' File "<string>", line 6 in <module>'
|
|
]
|
|
trace, exitcode = self.get_output(code)
|
|
self.assertEqual(trace, expected)
|
|
self.assertEqual(exitcode, 0)
|
|
|
|
@unittest.skipIf(not HAVE_THREADS, 'need threads')
|
|
def check_dump_traceback_threads(self, filename):
|
|
"""
|
|
Call explicitly dump_traceback(all_threads=True) and check the output.
|
|
Raise an error if the output doesn't match the expected format.
|
|
"""
|
|
code = """
|
|
import faulthandler
|
|
from threading import Thread, Event
|
|
import time
|
|
|
|
def dump():
|
|
if {filename}:
|
|
with open({filename}, "wb") as fp:
|
|
faulthandler.dump_traceback(fp, all_threads=True)
|
|
else:
|
|
faulthandler.dump_traceback(all_threads=True)
|
|
|
|
class Waiter(Thread):
|
|
# avoid blocking if the main thread raises an exception.
|
|
daemon = True
|
|
|
|
def __init__(self):
|
|
Thread.__init__(self)
|
|
self.running = Event()
|
|
self.stop = Event()
|
|
|
|
def run(self):
|
|
self.running.set()
|
|
self.stop.wait()
|
|
|
|
waiter = Waiter()
|
|
waiter.start()
|
|
waiter.running.wait()
|
|
dump()
|
|
waiter.stop.set()
|
|
waiter.join()
|
|
""".strip()
|
|
code = code.format(filename=repr(filename))
|
|
output, exitcode = self.get_output(code, filename)
|
|
output = '\n'.join(output)
|
|
if filename:
|
|
lineno = 8
|
|
else:
|
|
lineno = 10
|
|
regex = """
|
|
^Thread 0x[0-9a-f]+:
|
|
(?: File ".*threading.py", line [0-9]+ in [_a-z]+
|
|
){{1,3}} File "<string>", line 23 in run
|
|
File ".*threading.py", line [0-9]+ in _bootstrap_inner
|
|
File ".*threading.py", line [0-9]+ in _bootstrap
|
|
|
|
Current thread XXX:
|
|
File "<string>", line {lineno} in dump
|
|
File "<string>", line 28 in <module>$
|
|
""".strip()
|
|
regex = regex.format(lineno=lineno)
|
|
self.assertRegex(output, regex)
|
|
self.assertEqual(exitcode, 0)
|
|
|
|
def test_dump_traceback_threads(self):
|
|
self.check_dump_traceback_threads(None)
|
|
|
|
def test_dump_traceback_threads_file(self):
|
|
with temporary_filename() as filename:
|
|
self.check_dump_traceback_threads(filename)
|
|
|
|
def _check_dump_traceback_later(self, repeat, cancel, filename, loops):
|
|
"""
|
|
Check how many times the traceback is written in timeout x 2.5 seconds,
|
|
or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
|
|
on repeat and cancel options.
|
|
|
|
Raise an error if the output doesn't match the expect format.
|
|
"""
|
|
timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
|
|
code = """
|
|
import faulthandler
|
|
import time
|
|
|
|
def func(timeout, repeat, cancel, file, loops):
|
|
for loop in range(loops):
|
|
faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
|
|
if cancel:
|
|
faulthandler.cancel_dump_traceback_later()
|
|
time.sleep(timeout * 5)
|
|
faulthandler.cancel_dump_traceback_later()
|
|
|
|
timeout = {timeout}
|
|
repeat = {repeat}
|
|
cancel = {cancel}
|
|
loops = {loops}
|
|
if {has_filename}:
|
|
file = open({filename}, "wb")
|
|
else:
|
|
file = None
|
|
func(timeout, repeat, cancel, file, loops)
|
|
if file is not None:
|
|
file.close()
|
|
""".strip()
|
|
code = code.format(
|
|
timeout=TIMEOUT,
|
|
repeat=repeat,
|
|
cancel=cancel,
|
|
loops=loops,
|
|
has_filename=bool(filename),
|
|
filename=repr(filename),
|
|
)
|
|
trace, exitcode = self.get_output(code, filename)
|
|
trace = '\n'.join(trace)
|
|
|
|
if not cancel:
|
|
count = loops
|
|
if repeat:
|
|
count *= 2
|
|
header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+:\n' % timeout_str
|
|
regex = expected_traceback(9, 20, header, min_count=count)
|
|
self.assertRegex(trace, regex)
|
|
else:
|
|
self.assertEqual(trace, '')
|
|
self.assertEqual(exitcode, 0)
|
|
|
|
@unittest.skipIf(not hasattr(faulthandler, 'dump_traceback_later'),
|
|
'need faulthandler.dump_traceback_later()')
|
|
def check_dump_traceback_later(self, repeat=False, cancel=False,
|
|
file=False, twice=False):
|
|
if twice:
|
|
loops = 2
|
|
else:
|
|
loops = 1
|
|
if file:
|
|
with temporary_filename() as filename:
|
|
self._check_dump_traceback_later(repeat, cancel,
|
|
filename, loops)
|
|
else:
|
|
self._check_dump_traceback_later(repeat, cancel, None, loops)
|
|
|
|
def test_dump_traceback_later(self):
|
|
self.check_dump_traceback_later()
|
|
|
|
def test_dump_traceback_later_repeat(self):
|
|
self.check_dump_traceback_later(repeat=True)
|
|
|
|
def test_dump_traceback_later_cancel(self):
|
|
self.check_dump_traceback_later(cancel=True)
|
|
|
|
def test_dump_traceback_later_file(self):
|
|
self.check_dump_traceback_later(file=True)
|
|
|
|
def test_dump_traceback_later_twice(self):
|
|
self.check_dump_traceback_later(twice=True)
|
|
|
|
@unittest.skipIf(not hasattr(faulthandler, "register"),
|
|
"need faulthandler.register")
|
|
def check_register(self, filename=False, all_threads=False,
|
|
unregister=False, chain=False):
|
|
"""
|
|
Register a handler displaying the traceback on a user signal. Raise the
|
|
signal and check the written traceback.
|
|
|
|
If chain is True, check that the previous signal handler is called.
|
|
|
|
Raise an error if the output doesn't match the expected format.
|
|
"""
|
|
signum = signal.SIGUSR1
|
|
code = """
|
|
import faulthandler
|
|
import os
|
|
import signal
|
|
import sys
|
|
|
|
def func(signum):
|
|
os.kill(os.getpid(), signum)
|
|
|
|
def handler(signum, frame):
|
|
handler.called = True
|
|
handler.called = False
|
|
|
|
exitcode = 0
|
|
signum = {signum}
|
|
unregister = {unregister}
|
|
chain = {chain}
|
|
|
|
if {has_filename}:
|
|
file = open({filename}, "wb")
|
|
else:
|
|
file = None
|
|
if chain:
|
|
signal.signal(signum, handler)
|
|
faulthandler.register(signum, file=file,
|
|
all_threads={all_threads}, chain={chain})
|
|
if unregister:
|
|
faulthandler.unregister(signum)
|
|
func(signum)
|
|
if chain and not handler.called:
|
|
if file is not None:
|
|
output = file
|
|
else:
|
|
output = sys.stderr
|
|
print("Error: signal handler not called!", file=output)
|
|
exitcode = 1
|
|
if file is not None:
|
|
file.close()
|
|
sys.exit(exitcode)
|
|
""".strip()
|
|
code = code.format(
|
|
filename=repr(filename),
|
|
has_filename=bool(filename),
|
|
all_threads=all_threads,
|
|
signum=signum,
|
|
unregister=unregister,
|
|
chain=chain,
|
|
)
|
|
trace, exitcode = self.get_output(code, filename)
|
|
trace = '\n'.join(trace)
|
|
if not unregister:
|
|
if all_threads:
|
|
regex = 'Current thread XXX:\n'
|
|
else:
|
|
regex = 'Traceback \(most recent call first\):\n'
|
|
regex = expected_traceback(7, 28, regex)
|
|
self.assertRegex(trace, regex)
|
|
else:
|
|
self.assertEqual(trace, '')
|
|
if unregister:
|
|
self.assertNotEqual(exitcode, 0)
|
|
else:
|
|
self.assertEqual(exitcode, 0)
|
|
|
|
def test_register(self):
|
|
self.check_register()
|
|
|
|
def test_unregister(self):
|
|
self.check_register(unregister=True)
|
|
|
|
def test_register_file(self):
|
|
with temporary_filename() as filename:
|
|
self.check_register(filename=filename)
|
|
|
|
def test_register_threads(self):
|
|
self.check_register(all_threads=True)
|
|
|
|
def test_register_chain(self):
|
|
self.check_register(chain=True)
|
|
|
|
|
|
def test_main():
|
|
support.run_unittest(FaultHandlerTests)
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|