mirror of
https://github.com/python/cpython.git
synced 2024-11-29 04:44:13 +08:00
Removes the 'Call' class which is used to control execution order and is unreliable on Windows
This commit is contained in:
parent
ff72816b5f
commit
1d1df8257f
@ -9,24 +9,16 @@ test.support.import_module('multiprocessing.synchronize')
|
||||
# without thread support.
|
||||
test.support.import_module('threading')
|
||||
|
||||
import io
|
||||
import logging
|
||||
import multiprocessing
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
|
||||
if sys.platform.startswith('win'):
|
||||
import ctypes
|
||||
import ctypes.wintypes
|
||||
|
||||
from concurrent import futures
|
||||
from concurrent.futures._base import (
|
||||
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
|
||||
LOGGER, wait)
|
||||
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
|
||||
import concurrent.futures.process
|
||||
|
||||
|
||||
def create_future(state=PENDING, exception=None, result=None):
|
||||
f = Future()
|
||||
f._state = state
|
||||
@ -34,6 +26,7 @@ def create_future(state=PENDING, exception=None, result=None):
|
||||
f._result = result
|
||||
return f
|
||||
|
||||
|
||||
PENDING_FUTURE = create_future(state=PENDING)
|
||||
RUNNING_FUTURE = create_future(state=RUNNING)
|
||||
CANCELLED_FUTURE = create_future(state=CANCELLED)
|
||||
@ -41,111 +34,48 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
|
||||
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
|
||||
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
|
||||
|
||||
|
||||
def mul(x, y):
|
||||
return x * y
|
||||
|
||||
class Call(object):
|
||||
"""A call that can be submitted to a future.Executor for testing.
|
||||
|
||||
The call signals when it is called and waits for an event before finishing.
|
||||
"""
|
||||
CALL_LOCKS = {}
|
||||
def _create_event(self):
|
||||
if sys.platform.startswith('win'):
|
||||
class SECURITY_ATTRIBUTES(ctypes.Structure):
|
||||
_fields_ = [("nLength", ctypes.wintypes.DWORD),
|
||||
("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
|
||||
("bInheritHandle", ctypes.wintypes.BOOL)]
|
||||
def sleep_and_raise(t):
|
||||
time.sleep(t)
|
||||
raise Exception('this is an exception')
|
||||
|
||||
s = SECURITY_ATTRIBUTES()
|
||||
s.nLength = ctypes.sizeof(s)
|
||||
s.lpSecurityDescriptor = None
|
||||
s.bInheritHandle = True
|
||||
|
||||
handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
|
||||
True,
|
||||
False,
|
||||
None)
|
||||
assert handle is not None
|
||||
return handle
|
||||
else:
|
||||
event = self.Event[0]()
|
||||
self.CALL_LOCKS[id(event)] = event
|
||||
return id(event)
|
||||
class ExecutorMixin:
|
||||
worker_count = 5
|
||||
def _prime_executor(self):
|
||||
# Make sure that the executor is ready to do work before running the
|
||||
# tests. This should reduce the probability of timeouts in the tests.
|
||||
futures = [self.executor.submit(time.sleep, 0.1)
|
||||
for _ in range(self.worker_count)]
|
||||
|
||||
def _wait_on_event(self, handle):
|
||||
if sys.platform.startswith('win'):
|
||||
# WaitForSingleObject returns 0 if handle is signaled.
|
||||
r = ctypes.windll.kernel32.WaitForSingleObject(handle, 60 * 1000)
|
||||
if r != 0:
|
||||
message = (
|
||||
'WaitForSingleObject({}, ...) failed with {}, '
|
||||
'GetLastError() = {}'.format(
|
||||
handle, r, ctypes.GetLastError()))
|
||||
logging.critical(message)
|
||||
assert False, message
|
||||
else:
|
||||
self.CALL_LOCKS[handle].wait()
|
||||
for f in futures:
|
||||
f.result()
|
||||
|
||||
def _signal_event(self, handle):
|
||||
if sys.platform.startswith('win'):
|
||||
r = ctypes.windll.kernel32.SetEvent(handle) # Returns 0 on failure.
|
||||
if r == 0:
|
||||
message = (
|
||||
'SetEvent({}) failed with {}, GetLastError() = {}'.format(
|
||||
handle, r, ctypes.GetLastError()))
|
||||
logging.critical(message)
|
||||
assert False, message
|
||||
else:
|
||||
self.CALL_LOCKS[handle].set()
|
||||
|
||||
def __init__(self, Event, manual_finish=False, result=42):
|
||||
self.Event = Event
|
||||
self._called_event = self._create_event()
|
||||
self._can_finish = self._create_event()
|
||||
class ThreadPoolMixin(ExecutorMixin):
|
||||
def setUp(self):
|
||||
self.executor = futures.ThreadPoolExecutor(max_workers=5)
|
||||
self._prime_executor()
|
||||
|
||||
self._result = result
|
||||
def tearDown(self):
|
||||
self.executor.shutdown(wait=True)
|
||||
|
||||
if not manual_finish:
|
||||
self._signal_event(self._can_finish)
|
||||
|
||||
def wait_on_called(self):
|
||||
self._wait_on_event(self._called_event)
|
||||
class ProcessPoolMixin(ExecutorMixin):
|
||||
def setUp(self):
|
||||
try:
|
||||
self.executor = futures.ProcessPoolExecutor(max_workers=5)
|
||||
except NotImplementedError as e:
|
||||
self.skipTest(str(e))
|
||||
self._prime_executor()
|
||||
|
||||
def set_can(self):
|
||||
self._signal_event(self._can_finish)
|
||||
def tearDown(self):
|
||||
self.executor.shutdown(wait=True)
|
||||
|
||||
def __call__(self):
|
||||
self._signal_event(self._called_event)
|
||||
self._wait_on_event(self._can_finish)
|
||||
|
||||
return self._result
|
||||
|
||||
def close(self):
|
||||
self.set_can()
|
||||
if sys.platform.startswith('win'):
|
||||
ctypes.windll.kernel32.CloseHandle(self._called_event)
|
||||
ctypes.windll.kernel32.CloseHandle(self._can_finish)
|
||||
self._called_event = None
|
||||
self._can_finish = None
|
||||
else:
|
||||
del self.CALL_LOCKS[self._called_event]
|
||||
del self.CALL_LOCKS[self._can_finish]
|
||||
|
||||
class ExceptionCall(Call):
|
||||
def __call__(self):
|
||||
self._signal_event(self._called_event)
|
||||
self._wait_on_event(self._can_finish)
|
||||
raise ZeroDivisionError()
|
||||
|
||||
class MapCall(Call):
|
||||
def __init__(self, Event, result=42):
|
||||
super().__init__(Event, manual_finish=True, result=result)
|
||||
|
||||
def __call__(self, manual_finish):
|
||||
if manual_finish:
|
||||
super().__call__()
|
||||
return self._result
|
||||
|
||||
class ExecutorShutdownTest(unittest.TestCase):
|
||||
def test_run_after_shutdown(self):
|
||||
@ -155,52 +85,14 @@ class ExecutorShutdownTest(unittest.TestCase):
|
||||
pow, 2, 5)
|
||||
|
||||
|
||||
def _start_some_futures(self):
|
||||
call1 = Call(self.Event, manual_finish=True)
|
||||
call2 = Call(self.Event, manual_finish=True)
|
||||
call3 = Call(self.Event, manual_finish=True)
|
||||
|
||||
try:
|
||||
self.executor.submit(call1)
|
||||
self.executor.submit(call2)
|
||||
self.executor.submit(call3)
|
||||
|
||||
call1.wait_on_called()
|
||||
call2.wait_on_called()
|
||||
call3.wait_on_called()
|
||||
|
||||
call1.set_can()
|
||||
call2.set_can()
|
||||
call3.set_can()
|
||||
finally:
|
||||
call1.close()
|
||||
call2.close()
|
||||
call3.close()
|
||||
|
||||
class ThreadPoolMixin:
|
||||
# wrap in tuple to prevent creation of instance methods
|
||||
Event = (threading.Event,)
|
||||
def setUp(self):
|
||||
self.executor = futures.ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
def tearDown(self):
|
||||
self.executor.shutdown(wait=True)
|
||||
|
||||
class ProcessPoolMixin:
|
||||
# wrap in tuple to prevent creation of instance methods
|
||||
Event = (multiprocessing.Event,)
|
||||
def setUp(self):
|
||||
try:
|
||||
self.executor = futures.ProcessPoolExecutor(max_workers=5)
|
||||
except NotImplementedError as e:
|
||||
self.skipTest(str(e))
|
||||
|
||||
def tearDown(self):
|
||||
self.executor.shutdown(wait=True)
|
||||
|
||||
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
|
||||
def _prime_executor(self):
|
||||
pass
|
||||
|
||||
def test_threads_terminate(self):
|
||||
self._start_some_futures()
|
||||
self.executor.submit(mul, 21, 2)
|
||||
self.executor.submit(mul, 6, 7)
|
||||
self.executor.submit(mul, 3, 14)
|
||||
self.assertEqual(len(self.executor._threads), 3)
|
||||
self.executor.shutdown()
|
||||
for t in self.executor._threads:
|
||||
@ -224,9 +116,15 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
|
||||
class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
|
||||
def _prime_executor(self):
|
||||
pass
|
||||
|
||||
def test_processes_terminate(self):
|
||||
self._start_some_futures()
|
||||
self.executor.submit(mul, 21, 2)
|
||||
self.executor.submit(mul, 6, 7)
|
||||
self.executor.submit(mul, 3, 14)
|
||||
self.assertEqual(len(self.executor._processes), 5)
|
||||
processes = self.executor._processes
|
||||
self.executor.shutdown()
|
||||
@ -236,11 +134,11 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
|
||||
|
||||
def test_context_manager_shutdown(self):
|
||||
with futures.ProcessPoolExecutor(max_workers=5) as e:
|
||||
executor = e
|
||||
processes = e._processes
|
||||
self.assertEqual(list(e.map(abs, range(-5, 5))),
|
||||
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
|
||||
|
||||
for p in self.executor._processes:
|
||||
for p in processes:
|
||||
p.join()
|
||||
|
||||
def test_del_shutdown(self):
|
||||
@ -256,85 +154,44 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
|
||||
|
||||
class WaitTests(unittest.TestCase):
|
||||
def test_first_completed(self):
|
||||
def wait_test():
|
||||
while not future1._waiters:
|
||||
pass
|
||||
call1.set_can()
|
||||
future1 = self.executor.submit(mul, 21, 2)
|
||||
future2 = self.executor.submit(time.sleep, 5)
|
||||
|
||||
call1 = Call(self.Event, manual_finish=True)
|
||||
call2 = Call(self.Event, manual_finish=True)
|
||||
try:
|
||||
future1 = self.executor.submit(call1)
|
||||
future2 = self.executor.submit(call2)
|
||||
|
||||
t = threading.Thread(target=wait_test)
|
||||
t.start()
|
||||
done, not_done = futures.wait(
|
||||
[CANCELLED_FUTURE, future1, future2],
|
||||
return_when=futures.FIRST_COMPLETED)
|
||||
|
||||
self.assertEqual(set([future1]), done)
|
||||
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
|
||||
finally:
|
||||
call1.close()
|
||||
call2.close()
|
||||
|
||||
def test_first_completed_one_already_completed(self):
|
||||
call1 = Call(self.Event, manual_finish=True)
|
||||
try:
|
||||
future1 = self.executor.submit(call1)
|
||||
def test_first_completed_some_already_completed(self):
|
||||
future1 = self.executor.submit(time.sleep, 2)
|
||||
|
||||
finished, pending = futures.wait(
|
||||
[SUCCESSFUL_FUTURE, future1],
|
||||
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
|
||||
return_when=futures.FIRST_COMPLETED)
|
||||
|
||||
self.assertEqual(set([SUCCESSFUL_FUTURE]), finished)
|
||||
self.assertEqual(
|
||||
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
|
||||
finished)
|
||||
self.assertEqual(set([future1]), pending)
|
||||
finally:
|
||||
call1.close()
|
||||
|
||||
def test_first_exception(self):
|
||||
def wait_test():
|
||||
while not future1._waiters:
|
||||
pass
|
||||
call1.set_can()
|
||||
call2.set_can()
|
||||
future1 = self.executor.submit(mul, 2, 21)
|
||||
future2 = self.executor.submit(sleep_and_raise, 5)
|
||||
future3 = self.executor.submit(time.sleep, 10)
|
||||
|
||||
call1 = Call(self.Event, manual_finish=True)
|
||||
call2 = ExceptionCall(self.Event, manual_finish=True)
|
||||
call3 = Call(self.Event, manual_finish=True)
|
||||
try:
|
||||
future1 = self.executor.submit(call1)
|
||||
future2 = self.executor.submit(call2)
|
||||
future3 = self.executor.submit(call3)
|
||||
|
||||
t = threading.Thread(target=wait_test)
|
||||
t.start()
|
||||
finished, pending = futures.wait(
|
||||
[future1, future2, future3],
|
||||
return_when=futures.FIRST_EXCEPTION)
|
||||
|
||||
self.assertEqual(set([future1, future2]), finished)
|
||||
self.assertEqual(set([future3]), pending)
|
||||
finally:
|
||||
call1.close()
|
||||
call2.close()
|
||||
call3.close()
|
||||
|
||||
def test_first_exception_some_already_complete(self):
|
||||
def wait_test():
|
||||
while not future1._waiters:
|
||||
pass
|
||||
call1.set_can()
|
||||
future1 = self.executor.submit(divmod, 21, 0)
|
||||
future2 = self.executor.submit(time.sleep, 5)
|
||||
|
||||
call1 = ExceptionCall(self.Event, manual_finish=True)
|
||||
call2 = Call(self.Event, manual_finish=True)
|
||||
try:
|
||||
future1 = self.executor.submit(call1)
|
||||
future2 = self.executor.submit(call2)
|
||||
|
||||
t = threading.Thread(target=wait_test)
|
||||
t.start()
|
||||
finished, pending = futures.wait(
|
||||
[SUCCESSFUL_FUTURE,
|
||||
CANCELLED_FUTURE,
|
||||
@ -347,15 +204,8 @@ class WaitTests(unittest.TestCase):
|
||||
future1]), finished)
|
||||
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
|
||||
|
||||
|
||||
finally:
|
||||
call1.close()
|
||||
call2.close()
|
||||
|
||||
def test_first_exception_one_already_failed(self):
|
||||
call1 = Call(self.Event, manual_finish=True)
|
||||
try:
|
||||
future1 = self.executor.submit(call1)
|
||||
future1 = self.executor.submit(time.sleep, 2)
|
||||
|
||||
finished, pending = futures.wait(
|
||||
[EXCEPTION_FUTURE, future1],
|
||||
@ -363,94 +213,30 @@ class WaitTests(unittest.TestCase):
|
||||
|
||||
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
|
||||
self.assertEqual(set([future1]), pending)
|
||||
finally:
|
||||
call1.close()
|
||||
|
||||
def test_all_completed(self):
|
||||
def wait_test():
|
||||
while not future1._waiters:
|
||||
pass
|
||||
call1.set_can()
|
||||
call2.set_can()
|
||||
future1 = self.executor.submit(divmod, 2, 0)
|
||||
future2 = self.executor.submit(mul, 2, 21)
|
||||
|
||||
call1 = Call(self.Event, manual_finish=True)
|
||||
call2 = Call(self.Event, manual_finish=True)
|
||||
try:
|
||||
future1 = self.executor.submit(call1)
|
||||
future2 = self.executor.submit(call2)
|
||||
|
||||
t = threading.Thread(target=wait_test)
|
||||
t.start()
|
||||
finished, pending = futures.wait(
|
||||
[future1, future2],
|
||||
return_when=futures.ALL_COMPLETED)
|
||||
|
||||
self.assertEqual(set([future1, future2]), finished)
|
||||
self.assertEqual(set(), pending)
|
||||
finally:
|
||||
call1.close()
|
||||
call2.close()
|
||||
|
||||
@unittest.skip # XXX skip the test for now as it hangs
|
||||
def test_all_completed_some_already_completed(self):
|
||||
def wait_test():
|
||||
while not future1._waiters:
|
||||
pass
|
||||
|
||||
future4.cancel()
|
||||
call1.set_can()
|
||||
call2.set_can()
|
||||
call3.set_can()
|
||||
|
||||
self.assertLessEqual(
|
||||
futures.process.EXTRA_QUEUED_CALLS,
|
||||
1,
|
||||
'this test assumes that future4 will be cancelled before it is '
|
||||
'queued to run - which might not be the case if '
|
||||
'ProcessPoolExecutor is too aggresive in scheduling futures')
|
||||
call1 = Call(self.Event, manual_finish=True)
|
||||
call2 = Call(self.Event, manual_finish=True)
|
||||
call3 = Call(self.Event, manual_finish=True)
|
||||
call4 = Call(self.Event, manual_finish=True)
|
||||
try:
|
||||
future1 = self.executor.submit(call1)
|
||||
future2 = self.executor.submit(call2)
|
||||
future3 = self.executor.submit(call3)
|
||||
future4 = self.executor.submit(call4)
|
||||
|
||||
t = threading.Thread(target=wait_test)
|
||||
t.start()
|
||||
finished, pending = futures.wait(
|
||||
[SUCCESSFUL_FUTURE,
|
||||
CANCELLED_AND_NOTIFIED_FUTURE,
|
||||
future1, future2, future3, future4],
|
||||
EXCEPTION_FUTURE,
|
||||
future1,
|
||||
future2],
|
||||
return_when=futures.ALL_COMPLETED)
|
||||
|
||||
self.assertEqual(set([SUCCESSFUL_FUTURE,
|
||||
CANCELLED_AND_NOTIFIED_FUTURE,
|
||||
future1, future2, future3, future4]),
|
||||
finished)
|
||||
EXCEPTION_FUTURE,
|
||||
future1,
|
||||
future2]), finished)
|
||||
self.assertEqual(set(), pending)
|
||||
finally:
|
||||
call1.close()
|
||||
call2.close()
|
||||
call3.close()
|
||||
call4.close()
|
||||
|
||||
def test_timeout(self):
|
||||
def wait_test():
|
||||
while not future1._waiters:
|
||||
pass
|
||||
call1.set_can()
|
||||
future1 = self.executor.submit(mul, 6, 7)
|
||||
future2 = self.executor.submit(time.sleep, 10)
|
||||
|
||||
call1 = Call(self.Event, manual_finish=True)
|
||||
call2 = Call(self.Event, manual_finish=True)
|
||||
try:
|
||||
future1 = self.executor.submit(call1)
|
||||
future2 = self.executor.submit(call2)
|
||||
|
||||
t = threading.Thread(target=wait_test)
|
||||
t.start()
|
||||
finished, pending = futures.wait(
|
||||
[CANCELLED_AND_NOTIFIED_FUTURE,
|
||||
EXCEPTION_FUTURE,
|
||||
@ -466,34 +252,20 @@ class WaitTests(unittest.TestCase):
|
||||
self.assertEqual(set([future2]), pending)
|
||||
|
||||
|
||||
finally:
|
||||
call1.close()
|
||||
call2.close()
|
||||
|
||||
|
||||
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
|
||||
pass
|
||||
|
||||
|
||||
class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
|
||||
pass
|
||||
|
||||
|
||||
class AsCompletedTests(unittest.TestCase):
|
||||
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
|
||||
def test_no_timeout(self):
|
||||
def wait_test():
|
||||
while not future1._waiters:
|
||||
pass
|
||||
call1.set_can()
|
||||
call2.set_can()
|
||||
future1 = self.executor.submit(mul, 2, 21)
|
||||
future2 = self.executor.submit(mul, 7, 6)
|
||||
|
||||
call1 = Call(self.Event, manual_finish=True)
|
||||
call2 = Call(self.Event, manual_finish=True)
|
||||
try:
|
||||
future1 = self.executor.submit(call1)
|
||||
future2 = self.executor.submit(call2)
|
||||
|
||||
t = threading.Thread(target=wait_test)
|
||||
t.start()
|
||||
completed = set(futures.as_completed(
|
||||
[CANCELLED_AND_NOTIFIED_FUTURE,
|
||||
EXCEPTION_FUTURE,
|
||||
@ -505,14 +277,9 @@ class AsCompletedTests(unittest.TestCase):
|
||||
SUCCESSFUL_FUTURE,
|
||||
future1, future2]),
|
||||
completed)
|
||||
finally:
|
||||
call1.close()
|
||||
call2.close()
|
||||
|
||||
def test_zero_timeout(self):
|
||||
call1 = Call(self.Event, manual_finish=True)
|
||||
try:
|
||||
future1 = self.executor.submit(call1)
|
||||
future1 = self.executor.submit(time.sleep, 2)
|
||||
completed_futures = set()
|
||||
try:
|
||||
for future in futures.as_completed(
|
||||
@ -529,15 +296,16 @@ class AsCompletedTests(unittest.TestCase):
|
||||
EXCEPTION_FUTURE,
|
||||
SUCCESSFUL_FUTURE]),
|
||||
completed_futures)
|
||||
finally:
|
||||
call1.close()
|
||||
|
||||
|
||||
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
|
||||
pass
|
||||
|
||||
|
||||
class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
|
||||
pass
|
||||
|
||||
|
||||
class ExecutorTest(unittest.TestCase):
|
||||
# Executor.shutdown() and context manager usage is tested by
|
||||
# ExecutorShutdownTest.
|
||||
@ -562,28 +330,27 @@ class ExecutorTest(unittest.TestCase):
|
||||
|
||||
def test_map_timeout(self):
|
||||
results = []
|
||||
timeout_call = MapCall(self.Event)
|
||||
try:
|
||||
try:
|
||||
for i in self.executor.map(timeout_call,
|
||||
[False, False, True],
|
||||
for i in self.executor.map(time.sleep,
|
||||
[0, 0, 10],
|
||||
timeout=5):
|
||||
results.append(i)
|
||||
except futures.TimeoutError:
|
||||
pass
|
||||
else:
|
||||
self.fail('expected TimeoutError')
|
||||
finally:
|
||||
timeout_call.close()
|
||||
|
||||
self.assertEqual([42, 42], results)
|
||||
self.assertEqual([None, None], results)
|
||||
|
||||
|
||||
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
|
||||
pass
|
||||
|
||||
|
||||
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
|
||||
pass
|
||||
|
||||
|
||||
class FutureTests(unittest.TestCase):
|
||||
def test_done_callback_with_result(self):
|
||||
callback_result = None
|
||||
|
Loading…
Reference in New Issue
Block a user