mirror of
https://github.com/python/cpython.git
synced 2024-11-26 19:34:19 +08:00
27486c3365
Add free-threaded specialization for `UNPACK_SEQUENCE` opcode. `UNPACK_SEQUENCE_TUPLE/UNPACK_SEQUENCE_TWO_TUPLE` are already thread safe since tuples are immutable. `UNPACK_SEQUENCE_LIST` is not thread safe because of nature of lists (there is nothing preventing another thread from adding items to or removing them the list while the instruction is executing). To achieve thread safety we add a critical section to the implementation of `UNPACK_SEQUENCE_LIST`, especially around the parts where we check the size of the list and push items onto the stack. --------- Co-authored-by: Matt Page <mpage@meta.com> Co-authored-by: mpage <mpage@cs.stanford.edu>
1376 lines
35 KiB
Python
1376 lines
35 KiB
Python
import copy
|
|
import pickle
|
|
import dis
|
|
import threading
|
|
import types
|
|
import unittest
|
|
from test.support import (threading_helper, check_impl_detail,
|
|
requires_specialization, requires_specialization_ft,
|
|
cpython_only)
|
|
from test.support.import_helper import import_module
|
|
|
|
# Skip this module on other interpreters, it is cpython specific:
|
|
if check_impl_detail(cpython=False):
|
|
raise unittest.SkipTest('implementation detail specific to cpython')
|
|
|
|
_testinternalcapi = import_module("_testinternalcapi")
|
|
|
|
|
|
def disabling_optimizer(func):
|
|
def wrapper(*args, **kwargs):
|
|
if not hasattr(_testinternalcapi, "get_optimizer"):
|
|
return func(*args, **kwargs)
|
|
old_opt = _testinternalcapi.get_optimizer()
|
|
_testinternalcapi.set_optimizer(None)
|
|
try:
|
|
return func(*args, **kwargs)
|
|
finally:
|
|
_testinternalcapi.set_optimizer(old_opt)
|
|
|
|
return wrapper
|
|
|
|
|
|
class TestBase(unittest.TestCase):
|
|
def assert_specialized(self, f, opname):
|
|
instructions = dis.get_instructions(f, adaptive=True)
|
|
opnames = {instruction.opname for instruction in instructions}
|
|
self.assertIn(opname, opnames)
|
|
|
|
def assert_no_opcode(self, f, opname):
|
|
instructions = dis.get_instructions(f, adaptive=True)
|
|
opnames = {instruction.opname for instruction in instructions}
|
|
self.assertNotIn(opname, opnames)
|
|
|
|
|
|
class TestLoadSuperAttrCache(unittest.TestCase):
|
|
def test_descriptor_not_double_executed_on_spec_fail(self):
|
|
calls = []
|
|
class Descriptor:
|
|
def __get__(self, instance, owner):
|
|
calls.append((instance, owner))
|
|
return lambda: 1
|
|
|
|
class C:
|
|
d = Descriptor()
|
|
|
|
class D(C):
|
|
def f(self):
|
|
return super().d()
|
|
|
|
d = D()
|
|
|
|
self.assertEqual(d.f(), 1) # warmup
|
|
calls.clear()
|
|
self.assertEqual(d.f(), 1) # try to specialize
|
|
self.assertEqual(calls, [(d, D)])
|
|
|
|
|
|
class TestLoadAttrCache(unittest.TestCase):
|
|
def test_descriptor_added_after_optimization(self):
|
|
class Descriptor:
|
|
pass
|
|
|
|
class C:
|
|
def __init__(self):
|
|
self.x = 1
|
|
x = Descriptor()
|
|
|
|
def f(o):
|
|
return o.x
|
|
|
|
o = C()
|
|
for i in range(1025):
|
|
assert f(o) == 1
|
|
|
|
Descriptor.__get__ = lambda self, instance, value: 2
|
|
Descriptor.__set__ = lambda *args: None
|
|
|
|
self.assertEqual(f(o), 2)
|
|
|
|
def test_metaclass_descriptor_added_after_optimization(self):
|
|
class Descriptor:
|
|
pass
|
|
|
|
class Metaclass(type):
|
|
attribute = Descriptor()
|
|
|
|
class Class(metaclass=Metaclass):
|
|
attribute = True
|
|
|
|
def __get__(self, instance, owner):
|
|
return False
|
|
|
|
def __set__(self, instance, value):
|
|
return None
|
|
|
|
def f():
|
|
return Class.attribute
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
Descriptor.__get__ = __get__
|
|
Descriptor.__set__ = __set__
|
|
|
|
for _ in range(1025):
|
|
self.assertFalse(f())
|
|
|
|
def test_metaclass_descriptor_shadows_class_attribute(self):
|
|
class Metaclass(type):
|
|
@property
|
|
def attribute(self):
|
|
return True
|
|
|
|
class Class(metaclass=Metaclass):
|
|
attribute = False
|
|
|
|
def f():
|
|
return Class.attribute
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
def test_metaclass_set_descriptor_after_optimization(self):
|
|
class Metaclass(type):
|
|
pass
|
|
|
|
class Class(metaclass=Metaclass):
|
|
attribute = True
|
|
|
|
@property
|
|
def attribute(self):
|
|
return False
|
|
|
|
def f():
|
|
return Class.attribute
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
Metaclass.attribute = attribute
|
|
|
|
for _ in range(1025):
|
|
self.assertFalse(f())
|
|
|
|
def test_metaclass_del_descriptor_after_optimization(self):
|
|
class Metaclass(type):
|
|
@property
|
|
def attribute(self):
|
|
return True
|
|
|
|
class Class(metaclass=Metaclass):
|
|
attribute = False
|
|
|
|
def f():
|
|
return Class.attribute
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
del Metaclass.attribute
|
|
|
|
for _ in range(1025):
|
|
self.assertFalse(f())
|
|
|
|
def test_type_descriptor_shadows_attribute_method(self):
|
|
class Class:
|
|
mro = None
|
|
|
|
def f():
|
|
return Class.mro
|
|
|
|
for _ in range(1025):
|
|
self.assertIsNone(f())
|
|
|
|
def test_type_descriptor_shadows_attribute_member(self):
|
|
class Class:
|
|
__base__ = None
|
|
|
|
def f():
|
|
return Class.__base__
|
|
|
|
for _ in range(1025):
|
|
self.assertIs(f(), object)
|
|
|
|
def test_type_descriptor_shadows_attribute_getset(self):
|
|
class Class:
|
|
__name__ = "Spam"
|
|
|
|
def f():
|
|
return Class.__name__
|
|
|
|
for _ in range(1025):
|
|
self.assertEqual(f(), "Class")
|
|
|
|
def test_metaclass_getattribute(self):
|
|
class Metaclass(type):
|
|
def __getattribute__(self, name):
|
|
return True
|
|
|
|
class Class(metaclass=Metaclass):
|
|
attribute = False
|
|
|
|
def f():
|
|
return Class.attribute
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
def test_metaclass_swap(self):
|
|
class OldMetaclass(type):
|
|
@property
|
|
def attribute(self):
|
|
return True
|
|
|
|
class NewMetaclass(type):
|
|
@property
|
|
def attribute(self):
|
|
return False
|
|
|
|
class Class(metaclass=OldMetaclass):
|
|
pass
|
|
|
|
def f():
|
|
return Class.attribute
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
Class.__class__ = NewMetaclass
|
|
|
|
for _ in range(1025):
|
|
self.assertFalse(f())
|
|
|
|
def test_load_shadowing_slot_should_raise_type_error(self):
|
|
class Class:
|
|
__slots__ = ("slot",)
|
|
|
|
class Sneaky:
|
|
__slots__ = ("shadowed",)
|
|
shadowing = Class.slot
|
|
|
|
def f(o):
|
|
o.shadowing
|
|
|
|
o = Sneaky()
|
|
o.shadowed = 42
|
|
|
|
for _ in range(1025):
|
|
with self.assertRaises(TypeError):
|
|
f(o)
|
|
|
|
def test_store_shadowing_slot_should_raise_type_error(self):
|
|
class Class:
|
|
__slots__ = ("slot",)
|
|
|
|
class Sneaky:
|
|
__slots__ = ("shadowed",)
|
|
shadowing = Class.slot
|
|
|
|
def f(o):
|
|
o.shadowing = 42
|
|
|
|
o = Sneaky()
|
|
|
|
for _ in range(1025):
|
|
with self.assertRaises(TypeError):
|
|
f(o)
|
|
|
|
def test_load_borrowed_slot_should_not_crash(self):
|
|
class Class:
|
|
__slots__ = ("slot",)
|
|
|
|
class Sneaky:
|
|
borrowed = Class.slot
|
|
|
|
def f(o):
|
|
o.borrowed
|
|
|
|
o = Sneaky()
|
|
|
|
for _ in range(1025):
|
|
with self.assertRaises(TypeError):
|
|
f(o)
|
|
|
|
def test_store_borrowed_slot_should_not_crash(self):
|
|
class Class:
|
|
__slots__ = ("slot",)
|
|
|
|
class Sneaky:
|
|
borrowed = Class.slot
|
|
|
|
def f(o):
|
|
o.borrowed = 42
|
|
|
|
o = Sneaky()
|
|
|
|
for _ in range(1025):
|
|
with self.assertRaises(TypeError):
|
|
f(o)
|
|
|
|
|
|
class TestLoadMethodCache(unittest.TestCase):
|
|
def test_descriptor_added_after_optimization(self):
|
|
class Descriptor:
|
|
pass
|
|
|
|
class Class:
|
|
attribute = Descriptor()
|
|
|
|
def __get__(self, instance, owner):
|
|
return lambda: False
|
|
|
|
def __set__(self, instance, value):
|
|
return None
|
|
|
|
def attribute():
|
|
return True
|
|
|
|
instance = Class()
|
|
instance.attribute = attribute
|
|
|
|
def f():
|
|
return instance.attribute()
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
Descriptor.__get__ = __get__
|
|
Descriptor.__set__ = __set__
|
|
|
|
for _ in range(1025):
|
|
self.assertFalse(f())
|
|
|
|
def test_metaclass_descriptor_added_after_optimization(self):
|
|
class Descriptor:
|
|
pass
|
|
|
|
class Metaclass(type):
|
|
attribute = Descriptor()
|
|
|
|
class Class(metaclass=Metaclass):
|
|
def attribute():
|
|
return True
|
|
|
|
def __get__(self, instance, owner):
|
|
return lambda: False
|
|
|
|
def __set__(self, instance, value):
|
|
return None
|
|
|
|
def f():
|
|
return Class.attribute()
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
Descriptor.__get__ = __get__
|
|
Descriptor.__set__ = __set__
|
|
|
|
for _ in range(1025):
|
|
self.assertFalse(f())
|
|
|
|
def test_metaclass_descriptor_shadows_class_attribute(self):
|
|
class Metaclass(type):
|
|
@property
|
|
def attribute(self):
|
|
return lambda: True
|
|
|
|
class Class(metaclass=Metaclass):
|
|
def attribute():
|
|
return False
|
|
|
|
def f():
|
|
return Class.attribute()
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
def test_metaclass_set_descriptor_after_optimization(self):
|
|
class Metaclass(type):
|
|
pass
|
|
|
|
class Class(metaclass=Metaclass):
|
|
def attribute():
|
|
return True
|
|
|
|
@property
|
|
def attribute(self):
|
|
return lambda: False
|
|
|
|
def f():
|
|
return Class.attribute()
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
Metaclass.attribute = attribute
|
|
|
|
for _ in range(1025):
|
|
self.assertFalse(f())
|
|
|
|
def test_metaclass_del_descriptor_after_optimization(self):
|
|
class Metaclass(type):
|
|
@property
|
|
def attribute(self):
|
|
return lambda: True
|
|
|
|
class Class(metaclass=Metaclass):
|
|
def attribute():
|
|
return False
|
|
|
|
def f():
|
|
return Class.attribute()
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
del Metaclass.attribute
|
|
|
|
for _ in range(1025):
|
|
self.assertFalse(f())
|
|
|
|
def test_type_descriptor_shadows_attribute_method(self):
|
|
class Class:
|
|
def mro():
|
|
return ["Spam", "eggs"]
|
|
|
|
def f():
|
|
return Class.mro()
|
|
|
|
for _ in range(1025):
|
|
self.assertEqual(f(), ["Spam", "eggs"])
|
|
|
|
def test_type_descriptor_shadows_attribute_member(self):
|
|
class Class:
|
|
def __base__():
|
|
return "Spam"
|
|
|
|
def f():
|
|
return Class.__base__()
|
|
|
|
for _ in range(1025):
|
|
self.assertNotEqual(f(), "Spam")
|
|
|
|
def test_metaclass_getattribute(self):
|
|
class Metaclass(type):
|
|
def __getattribute__(self, name):
|
|
return lambda: True
|
|
|
|
class Class(metaclass=Metaclass):
|
|
def attribute():
|
|
return False
|
|
|
|
def f():
|
|
return Class.attribute()
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
def test_metaclass_swap(self):
|
|
class OldMetaclass(type):
|
|
@property
|
|
def attribute(self):
|
|
return lambda: True
|
|
|
|
class NewMetaclass(type):
|
|
@property
|
|
def attribute(self):
|
|
return lambda: False
|
|
|
|
class Class(metaclass=OldMetaclass):
|
|
pass
|
|
|
|
def f():
|
|
return Class.attribute()
|
|
|
|
for _ in range(1025):
|
|
self.assertTrue(f())
|
|
|
|
Class.__class__ = NewMetaclass
|
|
|
|
for _ in range(1025):
|
|
self.assertFalse(f())
|
|
|
|
|
|
class TestCallCache(TestBase):
|
|
def test_too_many_defaults_0(self):
|
|
def f():
|
|
pass
|
|
|
|
f.__defaults__ = (None,)
|
|
for _ in range(1025):
|
|
f()
|
|
|
|
def test_too_many_defaults_1(self):
|
|
def f(x):
|
|
pass
|
|
|
|
f.__defaults__ = (None, None)
|
|
for _ in range(1025):
|
|
f(None)
|
|
f()
|
|
|
|
def test_too_many_defaults_2(self):
|
|
def f(x, y):
|
|
pass
|
|
|
|
f.__defaults__ = (None, None, None)
|
|
for _ in range(1025):
|
|
f(None, None)
|
|
f(None)
|
|
f()
|
|
|
|
@disabling_optimizer
|
|
@requires_specialization
|
|
def test_assign_init_code(self):
|
|
class MyClass:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def instantiate():
|
|
return MyClass()
|
|
|
|
# Trigger specialization
|
|
for _ in range(1025):
|
|
instantiate()
|
|
self.assert_specialized(instantiate, "CALL_ALLOC_AND_ENTER_INIT")
|
|
|
|
def count_args(self, *args):
|
|
self.num_args = len(args)
|
|
|
|
# Set MyClass.__init__.__code__ to a code object that uses different
|
|
# args
|
|
MyClass.__init__.__code__ = count_args.__code__
|
|
instantiate()
|
|
|
|
|
|
@threading_helper.requires_working_threading()
|
|
class TestRacesDoNotCrash(TestBase):
|
|
# Careful with these. Bigger numbers have a higher chance of catching bugs,
|
|
# but you can also burn through a *ton* of type/dict/function versions:
|
|
ITEMS = 1000
|
|
LOOPS = 4
|
|
WARMUPS = 2
|
|
WRITERS = 2
|
|
|
|
@disabling_optimizer
|
|
def assert_races_do_not_crash(
|
|
self, opname, get_items, read, write, *, check_items=False
|
|
):
|
|
# This might need a few dozen loops in some cases:
|
|
for _ in range(self.LOOPS):
|
|
items = get_items()
|
|
# Reset:
|
|
if check_items:
|
|
for item in items:
|
|
item.__code__ = item.__code__.replace()
|
|
else:
|
|
read.__code__ = read.__code__.replace()
|
|
# Specialize:
|
|
for _ in range(self.WARMUPS):
|
|
read(items)
|
|
if check_items:
|
|
for item in items:
|
|
self.assert_specialized(item, opname)
|
|
else:
|
|
self.assert_specialized(read, opname)
|
|
# Create writers:
|
|
writers = []
|
|
for _ in range(self.WRITERS):
|
|
writer = threading.Thread(target=write, args=[items])
|
|
writers.append(writer)
|
|
# Run:
|
|
for writer in writers:
|
|
writer.start()
|
|
read(items) # BOOM!
|
|
for writer in writers:
|
|
writer.join()
|
|
|
|
@requires_specialization
|
|
def test_binary_subscr_getitem(self):
|
|
def get_items():
|
|
class C:
|
|
__getitem__ = lambda self, item: None
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C()
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
item[None]
|
|
except TypeError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
try:
|
|
del item.__getitem__
|
|
except AttributeError:
|
|
pass
|
|
type(item).__getitem__ = lambda self, item: None
|
|
|
|
opname = "BINARY_SUBSCR_GETITEM"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_binary_subscr_list_int(self):
|
|
def get_items():
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = [None]
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
item[0]
|
|
except IndexError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
item.clear()
|
|
item.append(None)
|
|
|
|
opname = "BINARY_SUBSCR_LIST_INT"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_for_iter_gen(self):
|
|
def get_items():
|
|
def g():
|
|
yield
|
|
yield
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = g()
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
for _ in item:
|
|
break
|
|
except ValueError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
try:
|
|
for _ in item:
|
|
break
|
|
except ValueError:
|
|
pass
|
|
|
|
opname = "FOR_ITER_GEN"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_for_iter_list(self):
|
|
def get_items():
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = [None]
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
for item in item:
|
|
break
|
|
|
|
def write(items):
|
|
for item in items:
|
|
item.clear()
|
|
item.append(None)
|
|
|
|
opname = "FOR_ITER_LIST"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_load_attr_class(self):
|
|
def get_items():
|
|
class C:
|
|
a = object()
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
item.a
|
|
except AttributeError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
try:
|
|
del item.a
|
|
except AttributeError:
|
|
pass
|
|
item.a = object()
|
|
|
|
opname = "LOAD_ATTR_CLASS"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_load_attr_getattribute_overridden(self):
|
|
def get_items():
|
|
class C:
|
|
__getattribute__ = lambda self, name: None
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C()
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
item.a
|
|
except AttributeError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
try:
|
|
del item.__getattribute__
|
|
except AttributeError:
|
|
pass
|
|
type(item).__getattribute__ = lambda self, name: None
|
|
|
|
opname = "LOAD_ATTR_GETATTRIBUTE_OVERRIDDEN"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_load_attr_instance_value(self):
|
|
def get_items():
|
|
class C:
|
|
pass
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C()
|
|
item.a = None
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
item.a
|
|
|
|
def write(items):
|
|
for item in items:
|
|
item.__dict__[None] = None
|
|
|
|
opname = "LOAD_ATTR_INSTANCE_VALUE"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_load_attr_method_lazy_dict(self):
|
|
def get_items():
|
|
class C(Exception):
|
|
m = lambda self: None
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C()
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
item.m()
|
|
except AttributeError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
try:
|
|
del item.m
|
|
except AttributeError:
|
|
pass
|
|
type(item).m = lambda self: None
|
|
|
|
opname = "LOAD_ATTR_METHOD_LAZY_DICT"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_load_attr_method_no_dict(self):
|
|
def get_items():
|
|
class C:
|
|
__slots__ = ()
|
|
m = lambda self: None
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C()
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
item.m()
|
|
except AttributeError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
try:
|
|
del item.m
|
|
except AttributeError:
|
|
pass
|
|
type(item).m = lambda self: None
|
|
|
|
opname = "LOAD_ATTR_METHOD_NO_DICT"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_load_attr_method_with_values(self):
|
|
def get_items():
|
|
class C:
|
|
m = lambda self: None
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C()
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
item.m()
|
|
except AttributeError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
try:
|
|
del item.m
|
|
except AttributeError:
|
|
pass
|
|
type(item).m = lambda self: None
|
|
|
|
opname = "LOAD_ATTR_METHOD_WITH_VALUES"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_load_attr_module(self):
|
|
def get_items():
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = types.ModuleType("<item>")
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
item.__name__
|
|
except AttributeError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
d = item.__dict__.copy()
|
|
item.__dict__.clear()
|
|
item.__dict__.update(d)
|
|
|
|
opname = "LOAD_ATTR_MODULE"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_load_attr_property(self):
|
|
def get_items():
|
|
class C:
|
|
a = property(lambda self: None)
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C()
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
item.a
|
|
except AttributeError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
try:
|
|
del type(item).a
|
|
except AttributeError:
|
|
pass
|
|
type(item).a = property(lambda self: None)
|
|
|
|
opname = "LOAD_ATTR_PROPERTY"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_load_attr_with_hint(self):
|
|
def get_items():
|
|
class C:
|
|
pass
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C()
|
|
item.a = None
|
|
# Resize into a combined unicode dict:
|
|
for i in range(29):
|
|
setattr(item, f"_{i}", None)
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
item.a
|
|
|
|
def write(items):
|
|
for item in items:
|
|
item.__dict__[None] = None
|
|
|
|
opname = "LOAD_ATTR_WITH_HINT"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization_ft
|
|
def test_load_global_module(self):
|
|
def get_items():
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = eval("lambda: x", {"x": None})
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
item()
|
|
|
|
def write(items):
|
|
for item in items:
|
|
item.__globals__[None] = None
|
|
|
|
opname = "LOAD_GLOBAL_MODULE"
|
|
self.assert_races_do_not_crash(
|
|
opname, get_items, read, write, check_items=True
|
|
)
|
|
|
|
@requires_specialization
|
|
def test_store_attr_instance_value(self):
|
|
def get_items():
|
|
class C:
|
|
pass
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C()
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
item.a = None
|
|
|
|
def write(items):
|
|
for item in items:
|
|
item.__dict__[None] = None
|
|
|
|
opname = "STORE_ATTR_INSTANCE_VALUE"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_store_attr_with_hint(self):
|
|
def get_items():
|
|
class C:
|
|
pass
|
|
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = C()
|
|
# Resize into a combined unicode dict:
|
|
for i in range(29):
|
|
setattr(item, f"_{i}", None)
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
item.a = None
|
|
|
|
def write(items):
|
|
for item in items:
|
|
item.__dict__[None] = None
|
|
|
|
opname = "STORE_ATTR_WITH_HINT"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_store_subscr_list_int(self):
|
|
def get_items():
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = [None]
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
item[0] = None
|
|
except IndexError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
item.clear()
|
|
item.append(None)
|
|
|
|
opname = "STORE_SUBSCR_LIST_INT"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
@requires_specialization
|
|
def test_unpack_sequence_list(self):
|
|
def get_items():
|
|
items = []
|
|
for _ in range(self.ITEMS):
|
|
item = [None]
|
|
items.append(item)
|
|
return items
|
|
|
|
def read(items):
|
|
for item in items:
|
|
try:
|
|
[_] = item
|
|
except ValueError:
|
|
pass
|
|
|
|
def write(items):
|
|
for item in items:
|
|
item.clear()
|
|
item.append(None)
|
|
|
|
opname = "UNPACK_SEQUENCE_LIST"
|
|
self.assert_races_do_not_crash(opname, get_items, read, write)
|
|
|
|
class C:
|
|
pass
|
|
|
|
@requires_specialization
|
|
class TestInstanceDict(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
c = C()
|
|
c.a, c.b, c.c = 0,0,0
|
|
|
|
def test_values_on_instance(self):
|
|
c = C()
|
|
c.a = 1
|
|
C().b = 2
|
|
c.c = 3
|
|
self.assertEqual(
|
|
_testinternalcapi.get_object_dict_values(c),
|
|
(1, '<NULL>', 3)
|
|
)
|
|
|
|
def test_dict_materialization(self):
|
|
c = C()
|
|
c.a = 1
|
|
c.b = 2
|
|
c.__dict__
|
|
self.assertEqual(c.__dict__, {"a":1, "b": 2})
|
|
|
|
def test_dict_dematerialization(self):
|
|
c = C()
|
|
c.a = 1
|
|
c.b = 2
|
|
c.__dict__
|
|
for _ in range(100):
|
|
c.a
|
|
self.assertEqual(
|
|
_testinternalcapi.get_object_dict_values(c),
|
|
(1, 2, '<NULL>')
|
|
)
|
|
|
|
def test_dict_dematerialization_multiple_refs(self):
|
|
c = C()
|
|
c.a = 1
|
|
c.b = 2
|
|
d = c.__dict__
|
|
for _ in range(100):
|
|
c.a
|
|
self.assertIs(c.__dict__, d)
|
|
|
|
def test_dict_dematerialization_copy(self):
|
|
c = C()
|
|
c.a = 1
|
|
c.b = 2
|
|
c2 = copy.copy(c)
|
|
for _ in range(100):
|
|
c.a
|
|
c2.a
|
|
self.assertEqual(
|
|
_testinternalcapi.get_object_dict_values(c),
|
|
(1, 2, '<NULL>')
|
|
)
|
|
self.assertEqual(
|
|
_testinternalcapi.get_object_dict_values(c2),
|
|
(1, 2, '<NULL>')
|
|
)
|
|
c3 = copy.deepcopy(c)
|
|
for _ in range(100):
|
|
c.a
|
|
c3.a
|
|
self.assertEqual(
|
|
_testinternalcapi.get_object_dict_values(c),
|
|
(1, 2, '<NULL>')
|
|
)
|
|
#NOTE -- c3.__dict__ does not de-materialize
|
|
|
|
def test_dict_dematerialization_pickle(self):
|
|
c = C()
|
|
c.a = 1
|
|
c.b = 2
|
|
c2 = pickle.loads(pickle.dumps(c))
|
|
for _ in range(100):
|
|
c.a
|
|
c2.a
|
|
self.assertEqual(
|
|
_testinternalcapi.get_object_dict_values(c),
|
|
(1, 2, '<NULL>')
|
|
)
|
|
self.assertEqual(
|
|
_testinternalcapi.get_object_dict_values(c2),
|
|
(1, 2, '<NULL>')
|
|
)
|
|
|
|
def test_dict_dematerialization_subclass(self):
|
|
class D(dict): pass
|
|
c = C()
|
|
c.a = 1
|
|
c.b = 2
|
|
c.__dict__ = D(c.__dict__)
|
|
for _ in range(100):
|
|
c.a
|
|
self.assertIs(
|
|
_testinternalcapi.get_object_dict_values(c),
|
|
None
|
|
)
|
|
self.assertEqual(
|
|
c.__dict__,
|
|
{'a':1, 'b':2}
|
|
)
|
|
|
|
def test_125868(self):
|
|
|
|
def make_special_dict():
|
|
"""Create a dictionary an object with a this table:
|
|
index | key | value
|
|
----- | --- | -----
|
|
0 | 'b' | 'value'
|
|
1 | 'b' | NULL
|
|
"""
|
|
class A:
|
|
pass
|
|
a = A()
|
|
a.a = 1
|
|
a.b = 2
|
|
d = a.__dict__.copy()
|
|
del d['a']
|
|
del d['b']
|
|
d['b'] = "value"
|
|
return d
|
|
|
|
class NoInlineAorB:
|
|
pass
|
|
for i in range(ord('c'), ord('z')):
|
|
setattr(NoInlineAorB(), chr(i), i)
|
|
|
|
c = NoInlineAorB()
|
|
c.a = 0
|
|
c.b = 1
|
|
self.assertFalse(_testinternalcapi.has_inline_values(c))
|
|
|
|
def f(o, n):
|
|
for i in range(n):
|
|
o.b = i
|
|
# Prime f to store to dict slot 1
|
|
f(c, 100)
|
|
|
|
test_obj = NoInlineAorB()
|
|
test_obj.__dict__ = make_special_dict()
|
|
self.assertEqual(test_obj.b, "value")
|
|
|
|
#This should set x.b = 0
|
|
f(test_obj, 1)
|
|
self.assertEqual(test_obj.b, 0)
|
|
|
|
|
|
class TestSpecializer(TestBase):
|
|
|
|
@cpython_only
|
|
@requires_specialization_ft
|
|
def test_binary_op(self):
|
|
def f():
|
|
for _ in range(100):
|
|
a, b = 1, 2
|
|
c = a + b
|
|
self.assertEqual(c, 3)
|
|
|
|
f()
|
|
self.assert_specialized(f, "BINARY_OP_ADD_INT")
|
|
self.assert_no_opcode(f, "BINARY_OP")
|
|
|
|
def g():
|
|
for _ in range(100):
|
|
a, b = "foo", "bar"
|
|
c = a + b
|
|
self.assertEqual(c, "foobar")
|
|
|
|
g()
|
|
self.assert_specialized(g, "BINARY_OP_ADD_UNICODE")
|
|
self.assert_no_opcode(g, "BINARY_OP")
|
|
|
|
@cpython_only
|
|
@requires_specialization_ft
|
|
def test_contain_op(self):
|
|
def f():
|
|
for _ in range(100):
|
|
a, b = 1, {1: 2, 2: 5}
|
|
self.assertTrue(a in b)
|
|
self.assertFalse(3 in b)
|
|
|
|
f()
|
|
self.assert_specialized(f, "CONTAINS_OP_DICT")
|
|
self.assert_no_opcode(f, "CONTAINS_OP")
|
|
|
|
def g():
|
|
for _ in range(100):
|
|
a, b = 1, {1, 2}
|
|
self.assertTrue(a in b)
|
|
self.assertFalse(3 in b)
|
|
|
|
g()
|
|
self.assert_specialized(g, "CONTAINS_OP_SET")
|
|
self.assert_no_opcode(g, "CONTAINS_OP")
|
|
|
|
@cpython_only
|
|
@requires_specialization_ft
|
|
def test_to_bool(self):
|
|
def to_bool_bool():
|
|
true_cnt, false_cnt = 0, 0
|
|
elems = [e % 2 == 0 for e in range(100)]
|
|
for e in elems:
|
|
if e:
|
|
true_cnt += 1
|
|
else:
|
|
false_cnt += 1
|
|
self.assertEqual(true_cnt, 50)
|
|
self.assertEqual(false_cnt, 50)
|
|
|
|
to_bool_bool()
|
|
self.assert_specialized(to_bool_bool, "TO_BOOL_BOOL")
|
|
self.assert_no_opcode(to_bool_bool, "TO_BOOL")
|
|
|
|
def to_bool_int():
|
|
count = 0
|
|
for i in range(100):
|
|
if i:
|
|
count += 1
|
|
else:
|
|
count -= 1
|
|
self.assertEqual(count, 98)
|
|
|
|
to_bool_int()
|
|
self.assert_specialized(to_bool_int, "TO_BOOL_INT")
|
|
self.assert_no_opcode(to_bool_int, "TO_BOOL")
|
|
|
|
def to_bool_list():
|
|
count = 0
|
|
elems = [1, 2, 3]
|
|
while elems:
|
|
count += elems.pop()
|
|
self.assertEqual(elems, [])
|
|
self.assertEqual(count, 6)
|
|
|
|
to_bool_list()
|
|
self.assert_specialized(to_bool_list, "TO_BOOL_LIST")
|
|
self.assert_no_opcode(to_bool_list, "TO_BOOL")
|
|
|
|
def to_bool_none():
|
|
count = 0
|
|
elems = [None, None, None, None]
|
|
for e in elems:
|
|
if not e:
|
|
count += 1
|
|
self.assertEqual(count, len(elems))
|
|
|
|
to_bool_none()
|
|
self.assert_specialized(to_bool_none, "TO_BOOL_NONE")
|
|
self.assert_no_opcode(to_bool_none, "TO_BOOL")
|
|
|
|
def to_bool_str():
|
|
count = 0
|
|
elems = ["", "foo", ""]
|
|
for e in elems:
|
|
if e:
|
|
count += 1
|
|
self.assertEqual(count, 1)
|
|
|
|
to_bool_str()
|
|
self.assert_specialized(to_bool_str, "TO_BOOL_STR")
|
|
self.assert_no_opcode(to_bool_str, "TO_BOOL")
|
|
|
|
@cpython_only
|
|
@requires_specialization_ft
|
|
def test_unpack_sequence(self):
|
|
def f():
|
|
for _ in range(100):
|
|
a, b = 1, 2
|
|
self.assertEqual(a, 1)
|
|
self.assertEqual(b, 2)
|
|
|
|
f()
|
|
self.assert_specialized(f, "UNPACK_SEQUENCE_TWO_TUPLE")
|
|
self.assert_no_opcode(f, "UNPACK_SEQUENCE")
|
|
|
|
def g():
|
|
for _ in range(100):
|
|
a, = 1,
|
|
self.assertEqual(a, 1)
|
|
|
|
g()
|
|
self.assert_specialized(g, "UNPACK_SEQUENCE_TUPLE")
|
|
self.assert_no_opcode(g, "UNPACK_SEQUENCE")
|
|
|
|
def x():
|
|
for _ in range(100):
|
|
a, b = [1, 2]
|
|
self.assertEqual(a, 1)
|
|
self.assertEqual(b, 2)
|
|
|
|
x()
|
|
self.assert_specialized(x, "UNPACK_SEQUENCE_LIST")
|
|
self.assert_no_opcode(x, "UNPACK_SEQUENCE")
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|