mirror of
https://github.com/python/cpython.git
synced 2024-12-11 18:53:56 +08:00
106 lines
3.2 KiB
Python
106 lines
3.2 KiB
Python
import atexit
|
|
import os
|
|
import textwrap
|
|
import unittest
|
|
from test import support
|
|
from test.support import script_helper
|
|
|
|
|
|
class GeneralTest(unittest.TestCase):
|
|
def test_general(self):
|
|
# Run _test_atexit.py in a subprocess since it calls atexit._clear()
|
|
script = support.findfile("_test_atexit.py")
|
|
script_helper.run_test_script(script)
|
|
|
|
class FunctionalTest(unittest.TestCase):
|
|
def test_shutdown(self):
|
|
# Actually test the shutdown mechanism in a subprocess
|
|
code = textwrap.dedent("""
|
|
import atexit
|
|
|
|
def f(msg):
|
|
print(msg)
|
|
|
|
atexit.register(f, "one")
|
|
atexit.register(f, "two")
|
|
""")
|
|
res = script_helper.assert_python_ok("-c", code)
|
|
self.assertEqual(res.out.decode().splitlines(), ["two", "one"])
|
|
self.assertFalse(res.err)
|
|
|
|
def test_atexit_instances(self):
|
|
# bpo-42639: It is safe to have more than one atexit instance.
|
|
code = textwrap.dedent("""
|
|
import sys
|
|
import atexit as atexit1
|
|
del sys.modules['atexit']
|
|
import atexit as atexit2
|
|
del sys.modules['atexit']
|
|
|
|
assert atexit2 is not atexit1
|
|
|
|
atexit1.register(print, "atexit1")
|
|
atexit2.register(print, "atexit2")
|
|
""")
|
|
res = script_helper.assert_python_ok("-c", code)
|
|
self.assertEqual(res.out.decode().splitlines(), ["atexit2", "atexit1"])
|
|
self.assertFalse(res.err)
|
|
|
|
|
|
@support.cpython_only
|
|
class SubinterpreterTest(unittest.TestCase):
|
|
|
|
def test_callbacks_leak(self):
|
|
# This test shows a leak in refleak mode if atexit doesn't
|
|
# take care to free callbacks in its per-subinterpreter module
|
|
# state.
|
|
n = atexit._ncallbacks()
|
|
code = textwrap.dedent(r"""
|
|
import atexit
|
|
def f():
|
|
pass
|
|
atexit.register(f)
|
|
del atexit
|
|
""")
|
|
ret = support.run_in_subinterp(code)
|
|
self.assertEqual(ret, 0)
|
|
self.assertEqual(atexit._ncallbacks(), n)
|
|
|
|
def test_callbacks_leak_refcycle(self):
|
|
# Similar to the above, but with a refcycle through the atexit
|
|
# module.
|
|
n = atexit._ncallbacks()
|
|
code = textwrap.dedent(r"""
|
|
import atexit
|
|
def f():
|
|
pass
|
|
atexit.register(f)
|
|
atexit.__atexit = atexit
|
|
""")
|
|
ret = support.run_in_subinterp(code)
|
|
self.assertEqual(ret, 0)
|
|
self.assertEqual(atexit._ncallbacks(), n)
|
|
|
|
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
|
|
def test_callback_on_subinterpreter_teardown(self):
|
|
# This tests if a callback is called on
|
|
# subinterpreter teardown.
|
|
expected = b"The test has passed!"
|
|
r, w = os.pipe()
|
|
|
|
code = textwrap.dedent(r"""
|
|
import os
|
|
import atexit
|
|
def callback():
|
|
os.write({:d}, b"The test has passed!")
|
|
atexit.register(callback)
|
|
""".format(w))
|
|
ret = support.run_in_subinterp(code)
|
|
os.close(w)
|
|
self.assertEqual(os.read(r, len(expected)), expected)
|
|
os.close(r)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|