mirror of
https://github.com/python/cpython.git
synced 2024-12-11 10:50:11 +08:00
f260e443ac
Updated documentation to use new name. Merged revisions 63077 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r63077 | alexandre.vassalotti | 2008-05-11 15:39:48 -0400 (Sun, 11 May 2008) | 3 lines Added stub for the Queue module to be renamed in 3.0. Use the 3.0 module name to avoid spurious warnings. ........
182 lines
7.0 KiB
Python
182 lines
7.0 KiB
Python
"""Generic thread tests.
|
|
|
|
Meant to be used by dummy_thread and thread. To allow for different modules
|
|
to be used, test_main() can be called with the module to use as the thread
|
|
implementation as its sole argument.
|
|
|
|
"""
|
|
import dummy_thread as _thread
|
|
import time
|
|
import queue
|
|
import random
|
|
import unittest
|
|
from test import test_support
|
|
|
|
DELAY = 0 # Set > 0 when testing a module other than dummy_thread, such as
|
|
# the 'thread' module.
|
|
|
|
class LockTests(unittest.TestCase):
|
|
"""Test lock objects."""
|
|
|
|
def setUp(self):
|
|
# Create a lock
|
|
self.lock = _thread.allocate_lock()
|
|
|
|
def test_initlock(self):
|
|
#Make sure locks start locked
|
|
self.failUnless(not self.lock.locked(),
|
|
"Lock object is not initialized unlocked.")
|
|
|
|
def test_release(self):
|
|
# Test self.lock.release()
|
|
self.lock.acquire()
|
|
self.lock.release()
|
|
self.failUnless(not self.lock.locked(),
|
|
"Lock object did not release properly.")
|
|
|
|
def test_improper_release(self):
|
|
#Make sure release of an unlocked thread raises _thread.error
|
|
self.failUnlessRaises(_thread.error, self.lock.release)
|
|
|
|
def test_cond_acquire_success(self):
|
|
#Make sure the conditional acquiring of the lock works.
|
|
self.failUnless(self.lock.acquire(0),
|
|
"Conditional acquiring of the lock failed.")
|
|
|
|
def test_cond_acquire_fail(self):
|
|
#Test acquiring locked lock returns False
|
|
self.lock.acquire(0)
|
|
self.failUnless(not self.lock.acquire(0),
|
|
"Conditional acquiring of a locked lock incorrectly "
|
|
"succeeded.")
|
|
|
|
def test_uncond_acquire_success(self):
|
|
#Make sure unconditional acquiring of a lock works.
|
|
self.lock.acquire()
|
|
self.failUnless(self.lock.locked(),
|
|
"Uncondional locking failed.")
|
|
|
|
def test_uncond_acquire_return_val(self):
|
|
#Make sure that an unconditional locking returns True.
|
|
self.failUnless(self.lock.acquire(1) is True,
|
|
"Unconditional locking did not return True.")
|
|
|
|
def test_uncond_acquire_blocking(self):
|
|
#Make sure that unconditional acquiring of a locked lock blocks.
|
|
def delay_unlock(to_unlock, delay):
|
|
"""Hold on to lock for a set amount of time before unlocking."""
|
|
time.sleep(delay)
|
|
to_unlock.release()
|
|
|
|
self.lock.acquire()
|
|
start_time = int(time.time())
|
|
_thread.start_new_thread(delay_unlock,(self.lock, DELAY))
|
|
if test_support.verbose:
|
|
print()
|
|
print("*** Waiting for thread to release the lock "\
|
|
"(approx. %s sec.) ***" % DELAY)
|
|
self.lock.acquire()
|
|
end_time = int(time.time())
|
|
if test_support.verbose:
|
|
print("done")
|
|
self.failUnless((end_time - start_time) >= DELAY,
|
|
"Blocking by unconditional acquiring failed.")
|
|
|
|
class MiscTests(unittest.TestCase):
|
|
"""Miscellaneous tests."""
|
|
|
|
def test_exit(self):
|
|
#Make sure _thread.exit() raises SystemExit
|
|
self.failUnlessRaises(SystemExit, _thread.exit)
|
|
|
|
def test_ident(self):
|
|
#Test sanity of _thread.get_ident()
|
|
self.failUnless(isinstance(_thread.get_ident(), int),
|
|
"_thread.get_ident() returned a non-integer")
|
|
self.failUnless(_thread.get_ident() != 0,
|
|
"_thread.get_ident() returned 0")
|
|
|
|
def test_LockType(self):
|
|
#Make sure _thread.LockType is the same type as _thread.allocate_locke()
|
|
self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType),
|
|
"_thread.LockType is not an instance of what is "
|
|
"returned by _thread.allocate_lock()")
|
|
|
|
def test_interrupt_main(self):
|
|
#Calling start_new_thread with a function that executes interrupt_main
|
|
# should raise KeyboardInterrupt upon completion.
|
|
def call_interrupt():
|
|
_thread.interrupt_main()
|
|
self.failUnlessRaises(KeyboardInterrupt, _thread.start_new_thread,
|
|
call_interrupt, tuple())
|
|
|
|
def test_interrupt_in_main(self):
|
|
# Make sure that if interrupt_main is called in main threat that
|
|
# KeyboardInterrupt is raised instantly.
|
|
self.failUnlessRaises(KeyboardInterrupt, _thread.interrupt_main)
|
|
|
|
class ThreadTests(unittest.TestCase):
|
|
"""Test thread creation."""
|
|
|
|
def test_arg_passing(self):
|
|
#Make sure that parameter passing works.
|
|
def arg_tester(queue, arg1=False, arg2=False):
|
|
"""Use to test _thread.start_new_thread() passes args properly."""
|
|
queue.put((arg1, arg2))
|
|
|
|
testing_queue = queue.Queue(1)
|
|
_thread.start_new_thread(arg_tester, (testing_queue, True, True))
|
|
result = testing_queue.get()
|
|
self.failUnless(result[0] and result[1],
|
|
"Argument passing for thread creation using tuple failed")
|
|
_thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
|
|
'arg1':True, 'arg2':True})
|
|
result = testing_queue.get()
|
|
self.failUnless(result[0] and result[1],
|
|
"Argument passing for thread creation using kwargs failed")
|
|
_thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
|
|
result = testing_queue.get()
|
|
self.failUnless(result[0] and result[1],
|
|
"Argument passing for thread creation using both tuple"
|
|
" and kwargs failed")
|
|
|
|
def test_multi_creation(self):
|
|
#Make sure multiple threads can be created.
|
|
def queue_mark(queue, delay):
|
|
"""Wait for ``delay`` seconds and then put something into ``queue``"""
|
|
time.sleep(delay)
|
|
queue.put(_thread.get_ident())
|
|
|
|
thread_count = 5
|
|
testing_queue = queue.Queue(thread_count)
|
|
if test_support.verbose:
|
|
print()
|
|
print("*** Testing multiple thread creation "\
|
|
"(will take approx. %s to %s sec.) ***" % (DELAY, thread_count))
|
|
for count in range(thread_count):
|
|
if DELAY:
|
|
local_delay = round(random.random(), 1)
|
|
else:
|
|
local_delay = 0
|
|
_thread.start_new_thread(queue_mark,
|
|
(testing_queue, local_delay))
|
|
time.sleep(DELAY)
|
|
if test_support.verbose:
|
|
print('done')
|
|
self.failUnless(testing_queue.qsize() == thread_count,
|
|
"Not all %s threads executed properly after %s sec." %
|
|
(thread_count, DELAY))
|
|
|
|
def test_main(imported_module=None):
|
|
global _thread, DELAY
|
|
if imported_module:
|
|
_thread = imported_module
|
|
DELAY = 2
|
|
if test_support.verbose:
|
|
print()
|
|
print("*** Using %s as _thread module ***" % _thread)
|
|
test_support.run_unittest(LockTests, MiscTests, ThreadTests)
|
|
|
|
if __name__ == '__main__':
|
|
test_main()
|