Merged revisions 76138,76173 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/branches/py3k

................
  r76138 | antoine.pitrou | 2009-11-06 23:41:14 +0100 (ven., 06 nov. 2009) | 10 lines

  Merged revisions 76137 via svnmerge from
  svn+ssh://pythondev@svn.python.org/python/trunk

  ........
    r76137 | antoine.pitrou | 2009-11-06 23:34:35 +0100 (ven., 06 nov. 2009) | 4 lines

    Issue #7270: Add some dedicated unit tests for multi-thread synchronization
    primitives such as Lock, RLock, Condition, Event and Semaphore.
  ........
................
  r76173 | antoine.pitrou | 2009-11-09 17:08:16 +0100 (lun., 09 nov. 2009) | 11 lines

  Merged revisions 76172 via svnmerge from
  svn+ssh://pythondev@svn.python.org/python/trunk

  ........
    r76172 | antoine.pitrou | 2009-11-09 17:00:11 +0100 (lun., 09 nov. 2009) | 5 lines

    Issue #7282: Fix a memory leak when an RLock was used in a thread other
    than those started through `threading.Thread` (for example, using
    `thread.start_new_thread()`.
  ........
................
This commit is contained in:
Antoine Pitrou 2009-11-09 16:52:46 +00:00
parent 536d299ca0
commit 959f3e5032
5 changed files with 603 additions and 33 deletions

546
Lib/test/lock_tests.py Normal file
View File

@ -0,0 +1,546 @@
"""
Various tests for synchronization primitives.
"""
import sys
import time
from _thread import start_new_thread, get_ident
import threading
import unittest
from test import support
def _wait():
# A crude wait/yield function not relying on synchronization primitives.
time.sleep(0.01)
class Bunch(object):
"""
A bunch of threads.
"""
def __init__(self, f, n, wait_before_exit=False):
"""
Construct a bunch of `n` threads running the same function `f`.
If `wait_before_exit` is True, the threads won't terminate until
do_finish() is called.
"""
self.f = f
self.n = n
self.started = []
self.finished = []
self._can_exit = not wait_before_exit
def task():
tid = get_ident()
self.started.append(tid)
try:
f()
finally:
self.finished.append(tid)
while not self._can_exit:
_wait()
for i in range(n):
start_new_thread(task, ())
def wait_for_started(self):
while len(self.started) < self.n:
_wait()
def wait_for_finished(self):
while len(self.finished) < self.n:
_wait()
def do_finish(self):
self._can_exit = True
class BaseTestCase(unittest.TestCase):
def setUp(self):
self._threads = support.threading_setup()
def tearDown(self):
support.threading_cleanup(*self._threads)
support.reap_children()
class BaseLockTests(BaseTestCase):
"""
Tests for both recursive and non-recursive locks.
"""
def test_constructor(self):
lock = self.locktype()
del lock
def test_acquire_destroy(self):
lock = self.locktype()
lock.acquire()
del lock
def test_acquire_release(self):
lock = self.locktype()
lock.acquire()
lock.release()
del lock
def test_try_acquire(self):
lock = self.locktype()
self.assertTrue(lock.acquire(False))
lock.release()
def test_try_acquire_contended(self):
lock = self.locktype()
lock.acquire()
result = []
def f():
result.append(lock.acquire(False))
Bunch(f, 1).wait_for_finished()
self.assertFalse(result[0])
lock.release()
def test_acquire_contended(self):
lock = self.locktype()
lock.acquire()
N = 5
def f():
lock.acquire()
lock.release()
b = Bunch(f, N)
b.wait_for_started()
_wait()
self.assertEqual(len(b.finished), 0)
lock.release()
b.wait_for_finished()
self.assertEqual(len(b.finished), N)
def test_with(self):
lock = self.locktype()
def f():
lock.acquire()
lock.release()
def _with(err=None):
with lock:
if err is not None:
raise err
_with()
# Check the lock is unacquired
Bunch(f, 1).wait_for_finished()
self.assertRaises(TypeError, _with, TypeError)
# Check the lock is unacquired
Bunch(f, 1).wait_for_finished()
def test_thread_leak(self):
# The lock shouldn't leak a Thread instance when used from a foreign
# (non-threading) thread.
lock = self.locktype()
def f():
lock.acquire()
lock.release()
n = len(threading.enumerate())
# We run many threads in the hope that existing threads ids won't
# be recycled.
Bunch(f, 15).wait_for_finished()
self.assertEqual(n, len(threading.enumerate()))
class LockTests(BaseLockTests):
"""
Tests for non-recursive, weak locks
(which can be acquired and released from different threads).
"""
def test_reacquire(self):
# Lock needs to be released before re-acquiring.
lock = self.locktype()
phase = []
def f():
lock.acquire()
phase.append(None)
lock.acquire()
phase.append(None)
start_new_thread(f, ())
while len(phase) == 0:
_wait()
_wait()
self.assertEqual(len(phase), 1)
lock.release()
while len(phase) == 1:
_wait()
self.assertEqual(len(phase), 2)
def test_different_thread(self):
# Lock can be released from a different thread.
lock = self.locktype()
lock.acquire()
def f():
lock.release()
b = Bunch(f, 1)
b.wait_for_finished()
lock.acquire()
lock.release()
class RLockTests(BaseLockTests):
"""
Tests for recursive locks.
"""
def test_reacquire(self):
lock = self.locktype()
lock.acquire()
lock.acquire()
lock.release()
lock.acquire()
lock.release()
lock.release()
def test_release_unacquired(self):
# Cannot release an unacquired lock
lock = self.locktype()
self.assertRaises(RuntimeError, lock.release)
lock.acquire()
lock.acquire()
lock.release()
lock.acquire()
lock.release()
lock.release()
self.assertRaises(RuntimeError, lock.release)
def test_different_thread(self):
# Cannot release from a different thread
lock = self.locktype()
def f():
lock.acquire()
b = Bunch(f, 1, True)
try:
self.assertRaises(RuntimeError, lock.release)
finally:
b.do_finish()
def test__is_owned(self):
lock = self.locktype()
self.assertFalse(lock._is_owned())
lock.acquire()
self.assertTrue(lock._is_owned())
lock.acquire()
self.assertTrue(lock._is_owned())
result = []
def f():
result.append(lock._is_owned())
Bunch(f, 1).wait_for_finished()
self.assertFalse(result[0])
lock.release()
self.assertTrue(lock._is_owned())
lock.release()
self.assertFalse(lock._is_owned())
class EventTests(BaseTestCase):
"""
Tests for Event objects.
"""
def test_is_set(self):
evt = self.eventtype()
self.assertFalse(evt.is_set())
evt.set()
self.assertTrue(evt.is_set())
evt.set()
self.assertTrue(evt.is_set())
evt.clear()
self.assertFalse(evt.is_set())
evt.clear()
self.assertFalse(evt.is_set())
def _check_notify(self, evt):
# All threads get notified
N = 5
results1 = []
results2 = []
def f():
results1.append(evt.wait())
results2.append(evt.wait())
b = Bunch(f, N)
b.wait_for_started()
_wait()
self.assertEqual(len(results1), 0)
evt.set()
b.wait_for_finished()
self.assertEqual(results1, [True] * N)
self.assertEqual(results2, [True] * N)
def test_notify(self):
evt = self.eventtype()
self._check_notify(evt)
# Another time, after an explicit clear()
evt.set()
evt.clear()
self._check_notify(evt)
def test_timeout(self):
evt = self.eventtype()
results1 = []
results2 = []
N = 5
def f():
results1.append(evt.wait(0.0))
t1 = time.time()
r = evt.wait(0.2)
t2 = time.time()
results2.append((r, t2 - t1))
Bunch(f, N).wait_for_finished()
self.assertEqual(results1, [False] * N)
for r, dt in results2:
self.assertFalse(r)
self.assertTrue(dt >= 0.2, dt)
# The event is set
results1 = []
results2 = []
evt.set()
Bunch(f, N).wait_for_finished()
self.assertEqual(results1, [True] * N)
for r, dt in results2:
self.assertTrue(r)
class ConditionTests(BaseTestCase):
"""
Tests for condition variables.
"""
def test_acquire(self):
cond = self.condtype()
# Be default we have an RLock: the condition can be acquired multiple
# times.
cond.acquire()
cond.acquire()
cond.release()
cond.release()
lock = threading.Lock()
cond = self.condtype(lock)
cond.acquire()
self.assertFalse(lock.acquire(False))
cond.release()
self.assertTrue(lock.acquire(False))
self.assertFalse(cond.acquire(False))
lock.release()
with cond:
self.assertFalse(lock.acquire(False))
def test_unacquired_wait(self):
cond = self.condtype()
self.assertRaises(RuntimeError, cond.wait)
def test_unacquired_notify(self):
cond = self.condtype()
self.assertRaises(RuntimeError, cond.notify)
def _check_notify(self, cond):
N = 5
results1 = []
results2 = []
phase_num = 0
def f():
cond.acquire()
cond.wait()
cond.release()
results1.append(phase_num)
cond.acquire()
cond.wait()
cond.release()
results2.append(phase_num)
b = Bunch(f, N)
b.wait_for_started()
_wait()
self.assertEqual(results1, [])
# Notify 3 threads at first
cond.acquire()
cond.notify(3)
_wait()
phase_num = 1
cond.release()
while len(results1) < 3:
_wait()
self.assertEqual(results1, [1] * 3)
self.assertEqual(results2, [])
# Notify 5 threads: they might be in their first or second wait
cond.acquire()
cond.notify(5)
_wait()
phase_num = 2
cond.release()
while len(results1) + len(results2) < 8:
_wait()
self.assertEqual(results1, [1] * 3 + [2] * 2)
self.assertEqual(results2, [2] * 3)
# Notify all threads: they are all in their second wait
cond.acquire()
cond.notify_all()
_wait()
phase_num = 3
cond.release()
while len(results2) < 5:
_wait()
self.assertEqual(results1, [1] * 3 + [2] * 2)
self.assertEqual(results2, [2] * 3 + [3] * 2)
b.wait_for_finished()
def test_notify(self):
cond = self.condtype()
self._check_notify(cond)
# A second time, to check internal state is still ok.
self._check_notify(cond)
def test_timeout(self):
cond = self.condtype()
results = []
N = 5
def f():
cond.acquire()
t1 = time.time()
cond.wait(0.2)
t2 = time.time()
cond.release()
results.append(t2 - t1)
Bunch(f, N).wait_for_finished()
self.assertEqual(len(results), 5)
for dt in results:
self.assertTrue(dt >= 0.2, dt)
class BaseSemaphoreTests(BaseTestCase):
"""
Common tests for {bounded, unbounded} semaphore objects.
"""
def test_constructor(self):
self.assertRaises(ValueError, self.semtype, value = -1)
self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
def test_acquire(self):
sem = self.semtype(1)
sem.acquire()
sem.release()
sem = self.semtype(2)
sem.acquire()
sem.acquire()
sem.release()
sem.release()
def test_acquire_destroy(self):
sem = self.semtype()
sem.acquire()
del sem
def test_acquire_contended(self):
sem = self.semtype(7)
sem.acquire()
N = 10
results1 = []
results2 = []
phase_num = 0
def f():
sem.acquire()
results1.append(phase_num)
sem.acquire()
results2.append(phase_num)
b = Bunch(f, 10)
b.wait_for_started()
while len(results1) + len(results2) < 6:
_wait()
self.assertEqual(results1 + results2, [0] * 6)
phase_num = 1
for i in range(7):
sem.release()
while len(results1) + len(results2) < 13:
_wait()
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
phase_num = 2
for i in range(6):
sem.release()
while len(results1) + len(results2) < 19:
_wait()
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
# The semaphore is still locked
self.assertFalse(sem.acquire(False))
# Final release, to let the last thread finish
sem.release()
b.wait_for_finished()
def test_try_acquire(self):
sem = self.semtype(2)
self.assertTrue(sem.acquire(False))
self.assertTrue(sem.acquire(False))
self.assertFalse(sem.acquire(False))
sem.release()
self.assertTrue(sem.acquire(False))
def test_try_acquire_contended(self):
sem = self.semtype(4)
sem.acquire()
results = []
def f():
results.append(sem.acquire(False))
results.append(sem.acquire(False))
Bunch(f, 5).wait_for_finished()
# There can be a thread switch between acquiring the semaphore and
# appending the result, therefore results will not necessarily be
# ordered.
self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
def test_default_value(self):
# The default initial value is 1.
sem = self.semtype()
sem.acquire()
def f():
sem.acquire()
sem.release()
b = Bunch(f, 1)
b.wait_for_started()
_wait()
self.assertFalse(b.finished)
sem.release()
b.wait_for_finished()
def test_with(self):
sem = self.semtype(2)
def _with(err=None):
with sem:
self.assertTrue(sem.acquire(False))
sem.release()
with sem:
self.assertFalse(sem.acquire(False))
if err:
raise err
_with()
self.assertTrue(sem.acquire(False))
sem.release()
self.assertRaises(TypeError, _with, TypeError)
self.assertTrue(sem.acquire(False))
sem.release()
class SemaphoreTests(BaseSemaphoreTests):
"""
Tests for unbounded semaphores.
"""
def test_release_unacquired(self):
# Unbounded releases are allowed and increment the semaphore's value
sem = self.semtype(1)
sem.release()
sem.acquire()
sem.acquire()
sem.release()
class BoundedSemaphoreTests(BaseSemaphoreTests):
"""
Tests for bounded semaphores.
"""
def test_release_unacquired(self):
# Cannot go past the initial value
sem = self.semtype()
self.assertRaises(ValueError, sem.release)
sem.acquire()
sem.release()
self.assertRaises(ValueError, sem.release)

View File

@ -5,6 +5,7 @@ from test import support
import _thread as thread import _thread as thread
import time import time
from test import lock_tests
NUMTASKS = 10 NUMTASKS = 10
NUMTRIPS = 3 NUMTRIPS = 3
@ -161,8 +162,12 @@ class BarrierTest(BasicThreadTest):
if finished: if finished:
self.done_mutex.release() self.done_mutex.release()
class LockTests(lock_tests.LockTests):
locktype = thread.allocate_lock
def test_main(): def test_main():
support.run_unittest(ThreadRunningTests, BarrierTest) support.run_unittest(ThreadRunningTests, BarrierTest, LockTests)
if __name__ == "__main__": if __name__ == "__main__":
test_main() test_main()

View File

@ -11,6 +11,8 @@ import time
import unittest import unittest
import weakref import weakref
from test import lock_tests
# A trivial mutable counter. # A trivial mutable counter.
class Counter(object): class Counter(object):
def __init__(self): def __init__(self):
@ -133,11 +135,9 @@ class ThreadTests(unittest.TestCase):
def test_foreign_thread(self): def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module. # Check that a "foreign" thread can use the threading module.
def f(mutex): def f(mutex):
# Acquiring an RLock forces an entry for the foreign # Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map. # thread to get made in the threading._active map.
r = threading.RLock() threading.current_thread()
r.acquire()
r.release()
mutex.release() mutex.release()
mutex = threading.Lock() mutex = threading.Lock()
@ -471,22 +471,6 @@ class ThreadingExceptionTests(unittest.TestCase):
thread.start() thread.start()
self.assertRaises(RuntimeError, thread.start) self.assertRaises(RuntimeError, thread.start)
def test_releasing_unacquired_rlock(self):
rlock = threading.RLock()
self.assertRaises(RuntimeError, rlock.release)
def test_waiting_on_unacquired_condition(self):
cond = threading.Condition()
self.assertRaises(RuntimeError, cond.wait)
def test_notify_on_unacquired_condition(self):
cond = threading.Condition()
self.assertRaises(RuntimeError, cond.notify)
def test_semaphore_with_negative_value(self):
self.assertRaises(ValueError, threading.Semaphore, value = -1)
self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxsize)
def test_joining_current_thread(self): def test_joining_current_thread(self):
current_thread = threading.current_thread() current_thread = threading.current_thread()
self.assertRaises(RuntimeError, current_thread.join); self.assertRaises(RuntimeError, current_thread.join);
@ -501,11 +485,37 @@ class ThreadingExceptionTests(unittest.TestCase):
self.assertRaises(RuntimeError, setattr, thread, "daemon", True) self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
class LockTests(lock_tests.LockTests):
locktype = staticmethod(threading.Lock)
class RLockTests(lock_tests.RLockTests):
locktype = staticmethod(threading.RLock)
class EventTests(lock_tests.EventTests):
eventtype = staticmethod(threading.Event)
class ConditionAsRLockTests(lock_tests.RLockTests):
# An Condition uses an RLock by default and exports its API.
locktype = staticmethod(threading.Condition)
class ConditionTests(lock_tests.ConditionTests):
condtype = staticmethod(threading.Condition)
class SemaphoreTests(lock_tests.SemaphoreTests):
semtype = staticmethod(threading.Semaphore)
class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
semtype = staticmethod(threading.BoundedSemaphore)
def test_main(): def test_main():
test.support.run_unittest(ThreadTests, test.support.run_unittest(LockTests, RLockTests, EventTests,
ThreadJoinOnShutdown, ConditionAsRLockTests, ConditionTests,
ThreadingExceptionTests, SemaphoreTests, BoundedSemaphoreTests,
) ThreadTests,
ThreadJoinOnShutdown,
ThreadingExceptionTests,
)
if __name__ == "__main__": if __name__ == "__main__":
test_main() test_main()

View File

@ -92,14 +92,16 @@ class _RLock(_Verbose):
def __repr__(self): def __repr__(self):
owner = self._owner owner = self._owner
return "<%s(%s, %d)>" % ( try:
self.__class__.__name__, owner = _active[owner].name
owner and owner.name, except KeyError:
self._count) pass
return "<%s owner=%r count=%d>" % (
self.__class__.__name__, owner, self._count)
def acquire(self, blocking=True): def acquire(self, blocking=True):
me = current_thread() me = _get_ident()
if self._owner is me: if self._owner == me:
self._count = self._count + 1 self._count = self._count + 1
if __debug__: if __debug__:
self._note("%s.acquire(%s): recursive success", self, blocking) self._note("%s.acquire(%s): recursive success", self, blocking)
@ -118,7 +120,7 @@ class _RLock(_Verbose):
__enter__ = acquire __enter__ = acquire
def release(self): def release(self):
if self._owner is not current_thread(): if self._owner != _get_ident():
raise RuntimeError("cannot release un-acquired lock") raise RuntimeError("cannot release un-acquired lock")
self._count = count = self._count - 1 self._count = count = self._count - 1
if not count: if not count:
@ -152,7 +154,7 @@ class _RLock(_Verbose):
return (count, owner) return (count, owner)
def _is_owned(self): def _is_owned(self):
return self._owner is current_thread() return self._owner == _get_ident()
def Condition(*args, **kwargs): def Condition(*args, **kwargs):

View File

@ -40,6 +40,10 @@ Core and Builtins
Library Library
------- -------
- Issue #7282: Fix a memory leak when an RLock was used in a thread other
than those started through `threading.Thread` (for example, using
`_thread.start_new_thread()`).
- Issue #7187: Importlib would not silence the IOError raised when trying to - Issue #7187: Importlib would not silence the IOError raised when trying to
write new bytecode when it was made read-only. write new bytecode when it was made read-only.
@ -140,6 +144,9 @@ Extension Modules
Tests Tests
----- -----
- Issue #7270: Add some dedicated unit tests for multi-thread synchronization
primitives such as Lock, RLock, Condition, Event and Semaphore.
- Issue #7248 (part 2): Use a unique temporary directory for importlib source - Issue #7248 (part 2): Use a unique temporary directory for importlib source
tests instead of tempfile.tempdir. This prevents the tests from sharing state tests instead of tempfile.tempdir. This prevents the tests from sharing state
between concurrent executions on the same system. between concurrent executions on the same system.