mirror of
https://github.com/python/cpython.git
synced 2024-12-17 05:43:48 +08:00
8b62644831
The reset_peak function sets the peak memory size to the current size, representing a resetting of that metric. This allows for recording the peak of specific sections of code, ignoring other code that may have had a higher peak (since the most recent `tracemalloc.start()` or tracemalloc.clear_traces()` call).
1076 lines
39 KiB
Python
1076 lines
39 KiB
Python
import contextlib
|
|
import os
|
|
import sys
|
|
import tracemalloc
|
|
import unittest
|
|
from unittest.mock import patch
|
|
from test.support.script_helper import (assert_python_ok, assert_python_failure,
|
|
interpreter_requires_environment)
|
|
from test import support
|
|
|
|
try:
|
|
import _testcapi
|
|
except ImportError:
|
|
_testcapi = None
|
|
|
|
|
|
EMPTY_STRING_SIZE = sys.getsizeof(b'')
|
|
INVALID_NFRAME = (-1, 2**30)
|
|
|
|
|
|
def get_frames(nframe, lineno_delta):
|
|
frames = []
|
|
frame = sys._getframe(1)
|
|
for index in range(nframe):
|
|
code = frame.f_code
|
|
lineno = frame.f_lineno + lineno_delta
|
|
frames.append((code.co_filename, lineno))
|
|
lineno_delta = 0
|
|
frame = frame.f_back
|
|
if frame is None:
|
|
break
|
|
return tuple(frames)
|
|
|
|
def allocate_bytes(size):
|
|
nframe = tracemalloc.get_traceback_limit()
|
|
bytes_len = (size - EMPTY_STRING_SIZE)
|
|
frames = get_frames(nframe, 1)
|
|
data = b'x' * bytes_len
|
|
return data, tracemalloc.Traceback(frames, min(len(frames), nframe))
|
|
|
|
def create_snapshots():
|
|
traceback_limit = 2
|
|
|
|
# _tracemalloc._get_traces() returns a list of (domain, size,
|
|
# traceback_frames) tuples. traceback_frames is a tuple of (filename,
|
|
# line_number) tuples.
|
|
raw_traces = [
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
|
|
(1, 2, (('a.py', 5), ('b.py', 4)), 3),
|
|
|
|
(2, 66, (('b.py', 1),), 1),
|
|
|
|
(3, 7, (('<unknown>', 0),), 1),
|
|
]
|
|
snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
|
|
|
|
raw_traces2 = [
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
|
|
(2, 2, (('a.py', 5), ('b.py', 4)), 3),
|
|
(2, 5000, (('a.py', 5), ('b.py', 4)), 3),
|
|
|
|
(4, 400, (('c.py', 578),), 1),
|
|
]
|
|
snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
|
|
|
|
return (snapshot, snapshot2)
|
|
|
|
def frame(filename, lineno):
|
|
return tracemalloc._Frame((filename, lineno))
|
|
|
|
def traceback(*frames):
|
|
return tracemalloc.Traceback(frames)
|
|
|
|
def traceback_lineno(filename, lineno):
|
|
return traceback((filename, lineno))
|
|
|
|
def traceback_filename(filename):
|
|
return traceback_lineno(filename, 0)
|
|
|
|
|
|
class TestTracemallocEnabled(unittest.TestCase):
|
|
def setUp(self):
|
|
if tracemalloc.is_tracing():
|
|
self.skipTest("tracemalloc must be stopped before the test")
|
|
|
|
tracemalloc.start(1)
|
|
|
|
def tearDown(self):
|
|
tracemalloc.stop()
|
|
|
|
def test_get_tracemalloc_memory(self):
|
|
data = [allocate_bytes(123) for count in range(1000)]
|
|
size = tracemalloc.get_tracemalloc_memory()
|
|
self.assertGreaterEqual(size, 0)
|
|
|
|
tracemalloc.clear_traces()
|
|
size2 = tracemalloc.get_tracemalloc_memory()
|
|
self.assertGreaterEqual(size2, 0)
|
|
self.assertLessEqual(size2, size)
|
|
|
|
def test_get_object_traceback(self):
|
|
tracemalloc.clear_traces()
|
|
obj_size = 12345
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
traceback = tracemalloc.get_object_traceback(obj)
|
|
self.assertEqual(traceback, obj_traceback)
|
|
|
|
def test_new_reference(self):
|
|
tracemalloc.clear_traces()
|
|
# gc.collect() indirectly calls PyList_ClearFreeList()
|
|
support.gc_collect()
|
|
|
|
# Create a list and "destroy it": put it in the PyListObject free list
|
|
obj = []
|
|
obj = None
|
|
|
|
# Create a list which should reuse the previously created empty list
|
|
obj = []
|
|
|
|
nframe = tracemalloc.get_traceback_limit()
|
|
frames = get_frames(nframe, -3)
|
|
obj_traceback = tracemalloc.Traceback(frames, min(len(frames), nframe))
|
|
|
|
traceback = tracemalloc.get_object_traceback(obj)
|
|
self.assertIsNotNone(traceback)
|
|
self.assertEqual(traceback, obj_traceback)
|
|
|
|
def test_set_traceback_limit(self):
|
|
obj_size = 10
|
|
|
|
tracemalloc.stop()
|
|
self.assertRaises(ValueError, tracemalloc.start, -1)
|
|
|
|
tracemalloc.stop()
|
|
tracemalloc.start(10)
|
|
obj2, obj2_traceback = allocate_bytes(obj_size)
|
|
traceback = tracemalloc.get_object_traceback(obj2)
|
|
self.assertEqual(len(traceback), 10)
|
|
self.assertEqual(traceback, obj2_traceback)
|
|
|
|
tracemalloc.stop()
|
|
tracemalloc.start(1)
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
traceback = tracemalloc.get_object_traceback(obj)
|
|
self.assertEqual(len(traceback), 1)
|
|
self.assertEqual(traceback, obj_traceback)
|
|
|
|
def find_trace(self, traces, traceback):
|
|
for trace in traces:
|
|
if trace[2] == traceback._frames:
|
|
return trace
|
|
|
|
self.fail("trace not found")
|
|
|
|
def test_get_traces(self):
|
|
tracemalloc.clear_traces()
|
|
obj_size = 12345
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
|
|
traces = tracemalloc._get_traces()
|
|
trace = self.find_trace(traces, obj_traceback)
|
|
|
|
self.assertIsInstance(trace, tuple)
|
|
domain, size, traceback, length = trace
|
|
self.assertEqual(size, obj_size)
|
|
self.assertEqual(traceback, obj_traceback._frames)
|
|
|
|
tracemalloc.stop()
|
|
self.assertEqual(tracemalloc._get_traces(), [])
|
|
|
|
def test_get_traces_intern_traceback(self):
|
|
# dummy wrappers to get more useful and identical frames in the traceback
|
|
def allocate_bytes2(size):
|
|
return allocate_bytes(size)
|
|
def allocate_bytes3(size):
|
|
return allocate_bytes2(size)
|
|
def allocate_bytes4(size):
|
|
return allocate_bytes3(size)
|
|
|
|
# Ensure that two identical tracebacks are not duplicated
|
|
tracemalloc.stop()
|
|
tracemalloc.start(4)
|
|
obj_size = 123
|
|
obj1, obj1_traceback = allocate_bytes4(obj_size)
|
|
obj2, obj2_traceback = allocate_bytes4(obj_size)
|
|
|
|
traces = tracemalloc._get_traces()
|
|
|
|
obj1_traceback._frames = tuple(reversed(obj1_traceback._frames))
|
|
obj2_traceback._frames = tuple(reversed(obj2_traceback._frames))
|
|
|
|
trace1 = self.find_trace(traces, obj1_traceback)
|
|
trace2 = self.find_trace(traces, obj2_traceback)
|
|
domain1, size1, traceback1, length1 = trace1
|
|
domain2, size2, traceback2, length2 = trace2
|
|
self.assertIs(traceback2, traceback1)
|
|
|
|
def test_get_traced_memory(self):
|
|
# Python allocates some internals objects, so the test must tolerate
|
|
# a small difference between the expected size and the real usage
|
|
max_error = 2048
|
|
|
|
# allocate one object
|
|
obj_size = 1024 * 1024
|
|
tracemalloc.clear_traces()
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
size, peak_size = tracemalloc.get_traced_memory()
|
|
self.assertGreaterEqual(size, obj_size)
|
|
self.assertGreaterEqual(peak_size, size)
|
|
|
|
self.assertLessEqual(size - obj_size, max_error)
|
|
self.assertLessEqual(peak_size - size, max_error)
|
|
|
|
# destroy the object
|
|
obj = None
|
|
size2, peak_size2 = tracemalloc.get_traced_memory()
|
|
self.assertLess(size2, size)
|
|
self.assertGreaterEqual(size - size2, obj_size - max_error)
|
|
self.assertGreaterEqual(peak_size2, peak_size)
|
|
|
|
# clear_traces() must reset traced memory counters
|
|
tracemalloc.clear_traces()
|
|
self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
|
|
|
|
# allocate another object
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
size, peak_size = tracemalloc.get_traced_memory()
|
|
self.assertGreaterEqual(size, obj_size)
|
|
|
|
# stop() also resets traced memory counters
|
|
tracemalloc.stop()
|
|
self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
|
|
|
|
def test_clear_traces(self):
|
|
obj, obj_traceback = allocate_bytes(123)
|
|
traceback = tracemalloc.get_object_traceback(obj)
|
|
self.assertIsNotNone(traceback)
|
|
|
|
tracemalloc.clear_traces()
|
|
traceback2 = tracemalloc.get_object_traceback(obj)
|
|
self.assertIsNone(traceback2)
|
|
|
|
def test_reset_peak(self):
|
|
# Python allocates some internals objects, so the test must tolerate
|
|
# a small difference between the expected size and the real usage
|
|
tracemalloc.clear_traces()
|
|
|
|
# Example: allocate a large piece of memory, temporarily
|
|
large_sum = sum(list(range(100000)))
|
|
size1, peak1 = tracemalloc.get_traced_memory()
|
|
|
|
# reset_peak() resets peak to traced memory: peak2 < peak1
|
|
tracemalloc.reset_peak()
|
|
size2, peak2 = tracemalloc.get_traced_memory()
|
|
self.assertGreaterEqual(peak2, size2)
|
|
self.assertLess(peak2, peak1)
|
|
|
|
# check that peak continue to be updated if new memory is allocated:
|
|
# peak3 > peak2
|
|
obj_size = 1024 * 1024
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
size3, peak3 = tracemalloc.get_traced_memory()
|
|
self.assertGreaterEqual(peak3, size3)
|
|
self.assertGreater(peak3, peak2)
|
|
self.assertGreaterEqual(peak3 - peak2, obj_size)
|
|
|
|
def test_is_tracing(self):
|
|
tracemalloc.stop()
|
|
self.assertFalse(tracemalloc.is_tracing())
|
|
|
|
tracemalloc.start()
|
|
self.assertTrue(tracemalloc.is_tracing())
|
|
|
|
def test_snapshot(self):
|
|
obj, source = allocate_bytes(123)
|
|
|
|
# take a snapshot
|
|
snapshot = tracemalloc.take_snapshot()
|
|
|
|
# This can vary
|
|
self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10)
|
|
|
|
# write on disk
|
|
snapshot.dump(support.TESTFN)
|
|
self.addCleanup(support.unlink, support.TESTFN)
|
|
|
|
# load from disk
|
|
snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
|
|
self.assertEqual(snapshot2.traces, snapshot.traces)
|
|
|
|
# tracemalloc must be tracing memory allocations to take a snapshot
|
|
tracemalloc.stop()
|
|
with self.assertRaises(RuntimeError) as cm:
|
|
tracemalloc.take_snapshot()
|
|
self.assertEqual(str(cm.exception),
|
|
"the tracemalloc module must be tracing memory "
|
|
"allocations to take a snapshot")
|
|
|
|
def test_snapshot_save_attr(self):
|
|
# take a snapshot with a new attribute
|
|
snapshot = tracemalloc.take_snapshot()
|
|
snapshot.test_attr = "new"
|
|
snapshot.dump(support.TESTFN)
|
|
self.addCleanup(support.unlink, support.TESTFN)
|
|
|
|
# load() should recreate the attribute
|
|
snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
|
|
self.assertEqual(snapshot2.test_attr, "new")
|
|
|
|
def fork_child(self):
|
|
if not tracemalloc.is_tracing():
|
|
return 2
|
|
|
|
obj_size = 12345
|
|
obj, obj_traceback = allocate_bytes(obj_size)
|
|
traceback = tracemalloc.get_object_traceback(obj)
|
|
if traceback is None:
|
|
return 3
|
|
|
|
# everything is fine
|
|
return 0
|
|
|
|
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
|
|
def test_fork(self):
|
|
# check that tracemalloc is still working after fork
|
|
pid = os.fork()
|
|
if not pid:
|
|
# child
|
|
exitcode = 1
|
|
try:
|
|
exitcode = self.fork_child()
|
|
finally:
|
|
os._exit(exitcode)
|
|
else:
|
|
support.wait_process(pid, exitcode=0)
|
|
|
|
|
|
class TestSnapshot(unittest.TestCase):
|
|
maxDiff = 4000
|
|
|
|
def test_create_snapshot(self):
|
|
raw_traces = [(0, 5, (('a.py', 2),), 10)]
|
|
|
|
with contextlib.ExitStack() as stack:
|
|
stack.enter_context(patch.object(tracemalloc, 'is_tracing',
|
|
return_value=True))
|
|
stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
|
|
return_value=5))
|
|
stack.enter_context(patch.object(tracemalloc, '_get_traces',
|
|
return_value=raw_traces))
|
|
|
|
snapshot = tracemalloc.take_snapshot()
|
|
self.assertEqual(snapshot.traceback_limit, 5)
|
|
self.assertEqual(len(snapshot.traces), 1)
|
|
trace = snapshot.traces[0]
|
|
self.assertEqual(trace.size, 5)
|
|
self.assertEqual(trace.traceback.total_nframe, 10)
|
|
self.assertEqual(len(trace.traceback), 1)
|
|
self.assertEqual(trace.traceback[0].filename, 'a.py')
|
|
self.assertEqual(trace.traceback[0].lineno, 2)
|
|
|
|
def test_filter_traces(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
filter1 = tracemalloc.Filter(False, "b.py")
|
|
filter2 = tracemalloc.Filter(True, "a.py", 2)
|
|
filter3 = tracemalloc.Filter(True, "a.py", 5)
|
|
|
|
original_traces = list(snapshot.traces._traces)
|
|
|
|
# exclude b.py
|
|
snapshot3 = snapshot.filter_traces((filter1,))
|
|
self.assertEqual(snapshot3.traces._traces, [
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(1, 2, (('a.py', 5), ('b.py', 4)), 3),
|
|
(3, 7, (('<unknown>', 0),), 1),
|
|
])
|
|
|
|
# filter_traces() must not touch the original snapshot
|
|
self.assertEqual(snapshot.traces._traces, original_traces)
|
|
|
|
# only include two lines of a.py
|
|
snapshot4 = snapshot3.filter_traces((filter2, filter3))
|
|
self.assertEqual(snapshot4.traces._traces, [
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(1, 2, (('a.py', 5), ('b.py', 4)), 3),
|
|
])
|
|
|
|
# No filter: just duplicate the snapshot
|
|
snapshot5 = snapshot.filter_traces(())
|
|
self.assertIsNot(snapshot5, snapshot)
|
|
self.assertIsNot(snapshot5.traces, snapshot.traces)
|
|
self.assertEqual(snapshot5.traces, snapshot.traces)
|
|
|
|
self.assertRaises(TypeError, snapshot.filter_traces, filter1)
|
|
|
|
def test_filter_traces_domain(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
filter1 = tracemalloc.Filter(False, "a.py", domain=1)
|
|
filter2 = tracemalloc.Filter(True, "a.py", domain=1)
|
|
|
|
original_traces = list(snapshot.traces._traces)
|
|
|
|
# exclude a.py of domain 1
|
|
snapshot3 = snapshot.filter_traces((filter1,))
|
|
self.assertEqual(snapshot3.traces._traces, [
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(2, 66, (('b.py', 1),), 1),
|
|
(3, 7, (('<unknown>', 0),), 1),
|
|
])
|
|
|
|
# include domain 1
|
|
snapshot3 = snapshot.filter_traces((filter1,))
|
|
self.assertEqual(snapshot3.traces._traces, [
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(2, 66, (('b.py', 1),), 1),
|
|
(3, 7, (('<unknown>', 0),), 1),
|
|
])
|
|
|
|
def test_filter_traces_domain_filter(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
filter1 = tracemalloc.DomainFilter(False, domain=3)
|
|
filter2 = tracemalloc.DomainFilter(True, domain=3)
|
|
|
|
# exclude domain 2
|
|
snapshot3 = snapshot.filter_traces((filter1,))
|
|
self.assertEqual(snapshot3.traces._traces, [
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(0, 10, (('a.py', 2), ('b.py', 4)), 3),
|
|
(1, 2, (('a.py', 5), ('b.py', 4)), 3),
|
|
(2, 66, (('b.py', 1),), 1),
|
|
])
|
|
|
|
# include domain 2
|
|
snapshot3 = snapshot.filter_traces((filter2,))
|
|
self.assertEqual(snapshot3.traces._traces, [
|
|
(3, 7, (('<unknown>', 0),), 1),
|
|
])
|
|
|
|
def test_snapshot_group_by_line(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
tb_0 = traceback_lineno('<unknown>', 0)
|
|
tb_a_2 = traceback_lineno('a.py', 2)
|
|
tb_a_5 = traceback_lineno('a.py', 5)
|
|
tb_b_1 = traceback_lineno('b.py', 1)
|
|
tb_c_578 = traceback_lineno('c.py', 578)
|
|
|
|
# stats per file and line
|
|
stats1 = snapshot.statistics('lineno')
|
|
self.assertEqual(stats1, [
|
|
tracemalloc.Statistic(tb_b_1, 66, 1),
|
|
tracemalloc.Statistic(tb_a_2, 30, 3),
|
|
tracemalloc.Statistic(tb_0, 7, 1),
|
|
tracemalloc.Statistic(tb_a_5, 2, 1),
|
|
])
|
|
|
|
# stats per file and line (2)
|
|
stats2 = snapshot2.statistics('lineno')
|
|
self.assertEqual(stats2, [
|
|
tracemalloc.Statistic(tb_a_5, 5002, 2),
|
|
tracemalloc.Statistic(tb_c_578, 400, 1),
|
|
tracemalloc.Statistic(tb_a_2, 30, 3),
|
|
])
|
|
|
|
# stats diff per file and line
|
|
statistics = snapshot2.compare_to(snapshot, 'lineno')
|
|
self.assertEqual(statistics, [
|
|
tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
|
|
tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
|
|
tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
|
|
tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
|
|
tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
|
|
])
|
|
|
|
def test_snapshot_group_by_file(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
tb_0 = traceback_filename('<unknown>')
|
|
tb_a = traceback_filename('a.py')
|
|
tb_b = traceback_filename('b.py')
|
|
tb_c = traceback_filename('c.py')
|
|
|
|
# stats per file
|
|
stats1 = snapshot.statistics('filename')
|
|
self.assertEqual(stats1, [
|
|
tracemalloc.Statistic(tb_b, 66, 1),
|
|
tracemalloc.Statistic(tb_a, 32, 4),
|
|
tracemalloc.Statistic(tb_0, 7, 1),
|
|
])
|
|
|
|
# stats per file (2)
|
|
stats2 = snapshot2.statistics('filename')
|
|
self.assertEqual(stats2, [
|
|
tracemalloc.Statistic(tb_a, 5032, 5),
|
|
tracemalloc.Statistic(tb_c, 400, 1),
|
|
])
|
|
|
|
# stats diff per file
|
|
diff = snapshot2.compare_to(snapshot, 'filename')
|
|
self.assertEqual(diff, [
|
|
tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
|
|
tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
|
|
tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
|
|
tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
|
|
])
|
|
|
|
def test_snapshot_group_by_traceback(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
|
|
# stats per file
|
|
tb1 = traceback(('a.py', 2), ('b.py', 4))
|
|
tb2 = traceback(('a.py', 5), ('b.py', 4))
|
|
tb3 = traceback(('b.py', 1))
|
|
tb4 = traceback(('<unknown>', 0))
|
|
stats1 = snapshot.statistics('traceback')
|
|
self.assertEqual(stats1, [
|
|
tracemalloc.Statistic(tb3, 66, 1),
|
|
tracemalloc.Statistic(tb1, 30, 3),
|
|
tracemalloc.Statistic(tb4, 7, 1),
|
|
tracemalloc.Statistic(tb2, 2, 1),
|
|
])
|
|
|
|
# stats per file (2)
|
|
tb5 = traceback(('c.py', 578))
|
|
stats2 = snapshot2.statistics('traceback')
|
|
self.assertEqual(stats2, [
|
|
tracemalloc.Statistic(tb2, 5002, 2),
|
|
tracemalloc.Statistic(tb5, 400, 1),
|
|
tracemalloc.Statistic(tb1, 30, 3),
|
|
])
|
|
|
|
# stats diff per file
|
|
diff = snapshot2.compare_to(snapshot, 'traceback')
|
|
self.assertEqual(diff, [
|
|
tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
|
|
tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
|
|
tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
|
|
tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
|
|
tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
|
|
])
|
|
|
|
self.assertRaises(ValueError,
|
|
snapshot.statistics, 'traceback', cumulative=True)
|
|
|
|
def test_snapshot_group_by_cumulative(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
tb_0 = traceback_filename('<unknown>')
|
|
tb_a = traceback_filename('a.py')
|
|
tb_b = traceback_filename('b.py')
|
|
tb_a_2 = traceback_lineno('a.py', 2)
|
|
tb_a_5 = traceback_lineno('a.py', 5)
|
|
tb_b_1 = traceback_lineno('b.py', 1)
|
|
tb_b_4 = traceback_lineno('b.py', 4)
|
|
|
|
# per file
|
|
stats = snapshot.statistics('filename', True)
|
|
self.assertEqual(stats, [
|
|
tracemalloc.Statistic(tb_b, 98, 5),
|
|
tracemalloc.Statistic(tb_a, 32, 4),
|
|
tracemalloc.Statistic(tb_0, 7, 1),
|
|
])
|
|
|
|
# per line
|
|
stats = snapshot.statistics('lineno', True)
|
|
self.assertEqual(stats, [
|
|
tracemalloc.Statistic(tb_b_1, 66, 1),
|
|
tracemalloc.Statistic(tb_b_4, 32, 4),
|
|
tracemalloc.Statistic(tb_a_2, 30, 3),
|
|
tracemalloc.Statistic(tb_0, 7, 1),
|
|
tracemalloc.Statistic(tb_a_5, 2, 1),
|
|
])
|
|
|
|
def test_trace_format(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
trace = snapshot.traces[0]
|
|
self.assertEqual(str(trace), 'b.py:4: 10 B')
|
|
traceback = trace.traceback
|
|
self.assertEqual(str(traceback), 'b.py:4')
|
|
frame = traceback[0]
|
|
self.assertEqual(str(frame), 'b.py:4')
|
|
|
|
def test_statistic_format(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
stats = snapshot.statistics('lineno')
|
|
stat = stats[0]
|
|
self.assertEqual(str(stat),
|
|
'b.py:1: size=66 B, count=1, average=66 B')
|
|
|
|
def test_statistic_diff_format(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
stats = snapshot2.compare_to(snapshot, 'lineno')
|
|
stat = stats[0]
|
|
self.assertEqual(str(stat),
|
|
'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
|
|
|
|
def test_slices(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
self.assertEqual(snapshot.traces[:2],
|
|
(snapshot.traces[0], snapshot.traces[1]))
|
|
|
|
traceback = snapshot.traces[0].traceback
|
|
self.assertEqual(traceback[:2],
|
|
(traceback[0], traceback[1]))
|
|
|
|
def test_format_traceback(self):
|
|
snapshot, snapshot2 = create_snapshots()
|
|
def getline(filename, lineno):
|
|
return ' <%s, %s>' % (filename, lineno)
|
|
with unittest.mock.patch('tracemalloc.linecache.getline',
|
|
side_effect=getline):
|
|
tb = snapshot.traces[0].traceback
|
|
self.assertEqual(tb.format(),
|
|
[' File "b.py", line 4',
|
|
' <b.py, 4>',
|
|
' File "a.py", line 2',
|
|
' <a.py, 2>'])
|
|
|
|
self.assertEqual(tb.format(limit=1),
|
|
[' File "a.py", line 2',
|
|
' <a.py, 2>'])
|
|
|
|
self.assertEqual(tb.format(limit=-1),
|
|
[' File "b.py", line 4',
|
|
' <b.py, 4>'])
|
|
|
|
self.assertEqual(tb.format(most_recent_first=True),
|
|
[' File "a.py", line 2',
|
|
' <a.py, 2>',
|
|
' File "b.py", line 4',
|
|
' <b.py, 4>'])
|
|
|
|
self.assertEqual(tb.format(limit=1, most_recent_first=True),
|
|
[' File "a.py", line 2',
|
|
' <a.py, 2>'])
|
|
|
|
self.assertEqual(tb.format(limit=-1, most_recent_first=True),
|
|
[' File "b.py", line 4',
|
|
' <b.py, 4>'])
|
|
|
|
|
|
class TestFilters(unittest.TestCase):
|
|
maxDiff = 2048
|
|
|
|
def test_filter_attributes(self):
|
|
# test default values
|
|
f = tracemalloc.Filter(True, "abc")
|
|
self.assertEqual(f.inclusive, True)
|
|
self.assertEqual(f.filename_pattern, "abc")
|
|
self.assertIsNone(f.lineno)
|
|
self.assertEqual(f.all_frames, False)
|
|
|
|
# test custom values
|
|
f = tracemalloc.Filter(False, "test.py", 123, True)
|
|
self.assertEqual(f.inclusive, False)
|
|
self.assertEqual(f.filename_pattern, "test.py")
|
|
self.assertEqual(f.lineno, 123)
|
|
self.assertEqual(f.all_frames, True)
|
|
|
|
# parameters passed by keyword
|
|
f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
|
|
self.assertEqual(f.inclusive, False)
|
|
self.assertEqual(f.filename_pattern, "test.py")
|
|
self.assertEqual(f.lineno, 123)
|
|
self.assertEqual(f.all_frames, True)
|
|
|
|
# read-only attribute
|
|
self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
|
|
|
|
def test_filter_match(self):
|
|
# filter without line number
|
|
f = tracemalloc.Filter(True, "abc")
|
|
self.assertTrue(f._match_frame("abc", 0))
|
|
self.assertTrue(f._match_frame("abc", 5))
|
|
self.assertTrue(f._match_frame("abc", 10))
|
|
self.assertFalse(f._match_frame("12356", 0))
|
|
self.assertFalse(f._match_frame("12356", 5))
|
|
self.assertFalse(f._match_frame("12356", 10))
|
|
|
|
f = tracemalloc.Filter(False, "abc")
|
|
self.assertFalse(f._match_frame("abc", 0))
|
|
self.assertFalse(f._match_frame("abc", 5))
|
|
self.assertFalse(f._match_frame("abc", 10))
|
|
self.assertTrue(f._match_frame("12356", 0))
|
|
self.assertTrue(f._match_frame("12356", 5))
|
|
self.assertTrue(f._match_frame("12356", 10))
|
|
|
|
# filter with line number > 0
|
|
f = tracemalloc.Filter(True, "abc", 5)
|
|
self.assertFalse(f._match_frame("abc", 0))
|
|
self.assertTrue(f._match_frame("abc", 5))
|
|
self.assertFalse(f._match_frame("abc", 10))
|
|
self.assertFalse(f._match_frame("12356", 0))
|
|
self.assertFalse(f._match_frame("12356", 5))
|
|
self.assertFalse(f._match_frame("12356", 10))
|
|
|
|
f = tracemalloc.Filter(False, "abc", 5)
|
|
self.assertTrue(f._match_frame("abc", 0))
|
|
self.assertFalse(f._match_frame("abc", 5))
|
|
self.assertTrue(f._match_frame("abc", 10))
|
|
self.assertTrue(f._match_frame("12356", 0))
|
|
self.assertTrue(f._match_frame("12356", 5))
|
|
self.assertTrue(f._match_frame("12356", 10))
|
|
|
|
# filter with line number 0
|
|
f = tracemalloc.Filter(True, "abc", 0)
|
|
self.assertTrue(f._match_frame("abc", 0))
|
|
self.assertFalse(f._match_frame("abc", 5))
|
|
self.assertFalse(f._match_frame("abc", 10))
|
|
self.assertFalse(f._match_frame("12356", 0))
|
|
self.assertFalse(f._match_frame("12356", 5))
|
|
self.assertFalse(f._match_frame("12356", 10))
|
|
|
|
f = tracemalloc.Filter(False, "abc", 0)
|
|
self.assertFalse(f._match_frame("abc", 0))
|
|
self.assertTrue(f._match_frame("abc", 5))
|
|
self.assertTrue(f._match_frame("abc", 10))
|
|
self.assertTrue(f._match_frame("12356", 0))
|
|
self.assertTrue(f._match_frame("12356", 5))
|
|
self.assertTrue(f._match_frame("12356", 10))
|
|
|
|
def test_filter_match_filename(self):
|
|
def fnmatch(inclusive, filename, pattern):
|
|
f = tracemalloc.Filter(inclusive, pattern)
|
|
return f._match_frame(filename, 0)
|
|
|
|
self.assertTrue(fnmatch(True, "abc", "abc"))
|
|
self.assertFalse(fnmatch(True, "12356", "abc"))
|
|
self.assertFalse(fnmatch(True, "<unknown>", "abc"))
|
|
|
|
self.assertFalse(fnmatch(False, "abc", "abc"))
|
|
self.assertTrue(fnmatch(False, "12356", "abc"))
|
|
self.assertTrue(fnmatch(False, "<unknown>", "abc"))
|
|
|
|
def test_filter_match_filename_joker(self):
|
|
def fnmatch(filename, pattern):
|
|
filter = tracemalloc.Filter(True, pattern)
|
|
return filter._match_frame(filename, 0)
|
|
|
|
# empty string
|
|
self.assertFalse(fnmatch('abc', ''))
|
|
self.assertFalse(fnmatch('', 'abc'))
|
|
self.assertTrue(fnmatch('', ''))
|
|
self.assertTrue(fnmatch('', '*'))
|
|
|
|
# no *
|
|
self.assertTrue(fnmatch('abc', 'abc'))
|
|
self.assertFalse(fnmatch('abc', 'abcd'))
|
|
self.assertFalse(fnmatch('abc', 'def'))
|
|
|
|
# a*
|
|
self.assertTrue(fnmatch('abc', 'a*'))
|
|
self.assertTrue(fnmatch('abc', 'abc*'))
|
|
self.assertFalse(fnmatch('abc', 'b*'))
|
|
self.assertFalse(fnmatch('abc', 'abcd*'))
|
|
|
|
# a*b
|
|
self.assertTrue(fnmatch('abc', 'a*c'))
|
|
self.assertTrue(fnmatch('abcdcx', 'a*cx'))
|
|
self.assertFalse(fnmatch('abb', 'a*c'))
|
|
self.assertFalse(fnmatch('abcdce', 'a*cx'))
|
|
|
|
# a*b*c
|
|
self.assertTrue(fnmatch('abcde', 'a*c*e'))
|
|
self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
|
|
self.assertFalse(fnmatch('abcdd', 'a*c*e'))
|
|
self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
|
|
|
|
# replace .pyc suffix with .py
|
|
self.assertTrue(fnmatch('a.pyc', 'a.py'))
|
|
self.assertTrue(fnmatch('a.py', 'a.pyc'))
|
|
|
|
if os.name == 'nt':
|
|
# case insensitive
|
|
self.assertTrue(fnmatch('aBC', 'ABc'))
|
|
self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
|
|
|
|
self.assertTrue(fnmatch('a.pyc', 'a.PY'))
|
|
self.assertTrue(fnmatch('a.py', 'a.PYC'))
|
|
else:
|
|
# case sensitive
|
|
self.assertFalse(fnmatch('aBC', 'ABc'))
|
|
self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
|
|
|
|
self.assertFalse(fnmatch('a.pyc', 'a.PY'))
|
|
self.assertFalse(fnmatch('a.py', 'a.PYC'))
|
|
|
|
if os.name == 'nt':
|
|
# normalize alternate separator "/" to the standard separator "\"
|
|
self.assertTrue(fnmatch(r'a/b', r'a\b'))
|
|
self.assertTrue(fnmatch(r'a\b', r'a/b'))
|
|
self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
|
|
self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
|
|
else:
|
|
# there is no alternate separator
|
|
self.assertFalse(fnmatch(r'a/b', r'a\b'))
|
|
self.assertFalse(fnmatch(r'a\b', r'a/b'))
|
|
self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
|
|
self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
|
|
|
|
# as of 3.5, .pyo is no longer munged to .py
|
|
self.assertFalse(fnmatch('a.pyo', 'a.py'))
|
|
|
|
def test_filter_match_trace(self):
|
|
t1 = (("a.py", 2), ("b.py", 3))
|
|
t2 = (("b.py", 4), ("b.py", 5))
|
|
t3 = (("c.py", 5), ('<unknown>', 0))
|
|
unknown = (('<unknown>', 0),)
|
|
|
|
f = tracemalloc.Filter(True, "b.py", all_frames=True)
|
|
self.assertTrue(f._match_traceback(t1))
|
|
self.assertTrue(f._match_traceback(t2))
|
|
self.assertFalse(f._match_traceback(t3))
|
|
self.assertFalse(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(True, "b.py", all_frames=False)
|
|
self.assertFalse(f._match_traceback(t1))
|
|
self.assertTrue(f._match_traceback(t2))
|
|
self.assertFalse(f._match_traceback(t3))
|
|
self.assertFalse(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(False, "b.py", all_frames=True)
|
|
self.assertFalse(f._match_traceback(t1))
|
|
self.assertFalse(f._match_traceback(t2))
|
|
self.assertTrue(f._match_traceback(t3))
|
|
self.assertTrue(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(False, "b.py", all_frames=False)
|
|
self.assertTrue(f._match_traceback(t1))
|
|
self.assertFalse(f._match_traceback(t2))
|
|
self.assertTrue(f._match_traceback(t3))
|
|
self.assertTrue(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
|
|
self.assertTrue(f._match_traceback(t1))
|
|
self.assertTrue(f._match_traceback(t2))
|
|
self.assertTrue(f._match_traceback(t3))
|
|
self.assertFalse(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
|
|
self.assertFalse(f._match_traceback(t1))
|
|
self.assertFalse(f._match_traceback(t2))
|
|
self.assertTrue(f._match_traceback(t3))
|
|
self.assertTrue(f._match_traceback(unknown))
|
|
|
|
f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
|
|
self.assertTrue(f._match_traceback(t1))
|
|
self.assertTrue(f._match_traceback(t2))
|
|
self.assertFalse(f._match_traceback(t3))
|
|
self.assertFalse(f._match_traceback(unknown))
|
|
|
|
|
|
class TestCommandLine(unittest.TestCase):
|
|
def test_env_var_disabled_by_default(self):
|
|
# not tracing by default
|
|
code = 'import tracemalloc; print(tracemalloc.is_tracing())'
|
|
ok, stdout, stderr = assert_python_ok('-c', code)
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, b'False')
|
|
|
|
@unittest.skipIf(interpreter_requires_environment(),
|
|
'Cannot run -E tests when PYTHON env vars are required.')
|
|
def test_env_var_ignored_with_E(self):
|
|
"""PYTHON* environment variables must be ignored when -E is present."""
|
|
code = 'import tracemalloc; print(tracemalloc.is_tracing())'
|
|
ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, b'False')
|
|
|
|
def test_env_var_disabled(self):
|
|
# tracing at startup
|
|
code = 'import tracemalloc; print(tracemalloc.is_tracing())'
|
|
ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0')
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, b'False')
|
|
|
|
def test_env_var_enabled_at_startup(self):
|
|
# tracing at startup
|
|
code = 'import tracemalloc; print(tracemalloc.is_tracing())'
|
|
ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, b'True')
|
|
|
|
def test_env_limit(self):
|
|
# start and set the number of frames
|
|
code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
|
|
ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, b'10')
|
|
|
|
def check_env_var_invalid(self, nframe):
|
|
with support.SuppressCrashReport():
|
|
ok, stdout, stderr = assert_python_failure(
|
|
'-c', 'pass',
|
|
PYTHONTRACEMALLOC=str(nframe))
|
|
|
|
if b'ValueError: the number of frames must be in range' in stderr:
|
|
return
|
|
if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr:
|
|
return
|
|
self.fail(f"unexpected output: {stderr!a}")
|
|
|
|
|
|
def test_env_var_invalid(self):
|
|
for nframe in INVALID_NFRAME:
|
|
with self.subTest(nframe=nframe):
|
|
self.check_env_var_invalid(nframe)
|
|
|
|
def test_sys_xoptions(self):
|
|
for xoptions, nframe in (
|
|
('tracemalloc', 1),
|
|
('tracemalloc=1', 1),
|
|
('tracemalloc=15', 15),
|
|
):
|
|
with self.subTest(xoptions=xoptions, nframe=nframe):
|
|
code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
|
|
ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
|
|
stdout = stdout.rstrip()
|
|
self.assertEqual(stdout, str(nframe).encode('ascii'))
|
|
|
|
def check_sys_xoptions_invalid(self, nframe):
|
|
args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
|
|
with support.SuppressCrashReport():
|
|
ok, stdout, stderr = assert_python_failure(*args)
|
|
|
|
if b'ValueError: the number of frames must be in range' in stderr:
|
|
return
|
|
if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr:
|
|
return
|
|
self.fail(f"unexpected output: {stderr!a}")
|
|
|
|
def test_sys_xoptions_invalid(self):
|
|
for nframe in INVALID_NFRAME:
|
|
with self.subTest(nframe=nframe):
|
|
self.check_sys_xoptions_invalid(nframe)
|
|
|
|
@unittest.skipIf(_testcapi is None, 'need _testcapi')
|
|
def test_pymem_alloc0(self):
|
|
# Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled
|
|
# does not crash.
|
|
code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1'
|
|
assert_python_ok('-X', 'tracemalloc', '-c', code)
|
|
|
|
|
|
@unittest.skipIf(_testcapi is None, 'need _testcapi')
|
|
class TestCAPI(unittest.TestCase):
|
|
maxDiff = 80 * 20
|
|
|
|
def setUp(self):
|
|
if tracemalloc.is_tracing():
|
|
self.skipTest("tracemalloc must be stopped before the test")
|
|
|
|
self.domain = 5
|
|
self.size = 123
|
|
self.obj = allocate_bytes(self.size)[0]
|
|
|
|
# for the type "object", id(obj) is the address of its memory block.
|
|
# This type is not tracked by the garbage collector
|
|
self.ptr = id(self.obj)
|
|
|
|
def tearDown(self):
|
|
tracemalloc.stop()
|
|
|
|
def get_traceback(self):
|
|
frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr)
|
|
if frames is not None:
|
|
return tracemalloc.Traceback(frames)
|
|
else:
|
|
return None
|
|
|
|
def track(self, release_gil=False, nframe=1):
|
|
frames = get_frames(nframe, 1)
|
|
_testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
|
|
release_gil)
|
|
return frames
|
|
|
|
def untrack(self):
|
|
_testcapi.tracemalloc_untrack(self.domain, self.ptr)
|
|
|
|
def get_traced_memory(self):
|
|
# Get the traced size in the domain
|
|
snapshot = tracemalloc.take_snapshot()
|
|
domain_filter = tracemalloc.DomainFilter(True, self.domain)
|
|
snapshot = snapshot.filter_traces([domain_filter])
|
|
return sum(trace.size for trace in snapshot.traces)
|
|
|
|
def check_track(self, release_gil):
|
|
nframe = 5
|
|
tracemalloc.start(nframe)
|
|
|
|
size = tracemalloc.get_traced_memory()[0]
|
|
|
|
frames = self.track(release_gil, nframe)
|
|
self.assertEqual(self.get_traceback(),
|
|
tracemalloc.Traceback(frames))
|
|
|
|
self.assertEqual(self.get_traced_memory(), self.size)
|
|
|
|
def test_track(self):
|
|
self.check_track(False)
|
|
|
|
def test_track_without_gil(self):
|
|
# check that calling _PyTraceMalloc_Track() without holding the GIL
|
|
# works too
|
|
self.check_track(True)
|
|
|
|
def test_track_already_tracked(self):
|
|
nframe = 5
|
|
tracemalloc.start(nframe)
|
|
|
|
# track a first time
|
|
self.track()
|
|
|
|
# calling _PyTraceMalloc_Track() must remove the old trace and add
|
|
# a new trace with the new traceback
|
|
frames = self.track(nframe=nframe)
|
|
self.assertEqual(self.get_traceback(),
|
|
tracemalloc.Traceback(frames))
|
|
|
|
def test_untrack(self):
|
|
tracemalloc.start()
|
|
|
|
self.track()
|
|
self.assertIsNotNone(self.get_traceback())
|
|
self.assertEqual(self.get_traced_memory(), self.size)
|
|
|
|
# untrack must remove the trace
|
|
self.untrack()
|
|
self.assertIsNone(self.get_traceback())
|
|
self.assertEqual(self.get_traced_memory(), 0)
|
|
|
|
# calling _PyTraceMalloc_Untrack() multiple times must not crash
|
|
self.untrack()
|
|
self.untrack()
|
|
|
|
def test_stop_track(self):
|
|
tracemalloc.start()
|
|
tracemalloc.stop()
|
|
|
|
with self.assertRaises(RuntimeError):
|
|
self.track()
|
|
self.assertIsNone(self.get_traceback())
|
|
|
|
def test_stop_untrack(self):
|
|
tracemalloc.start()
|
|
self.track()
|
|
|
|
tracemalloc.stop()
|
|
with self.assertRaises(RuntimeError):
|
|
self.untrack()
|
|
|
|
|
|
def test_main():
|
|
support.run_unittest(
|
|
TestTracemallocEnabled,
|
|
TestSnapshot,
|
|
TestFilters,
|
|
TestCommandLine,
|
|
TestCAPI,
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|