Issue #11815: Remove dead code in concurrent.futures (since a blocking Queue

cannot raise queue.Empty).
This commit is contained in:
Antoine Pitrou 2011-04-12 17:50:20 +02:00
commit 3fdd9b681d
2 changed files with 28 additions and 51 deletions

View File

@ -104,7 +104,7 @@ class _CallItem(object):
self.args = args
self.kwargs = kwargs
def _process_worker(call_queue, result_queue, shutdown):
def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process.
@ -118,24 +118,19 @@ def _process_worker(call_queue, result_queue, shutdown):
worker that it should exit when call_queue is empty.
"""
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(None)
return
try:
call_item = call_queue.get(block=True)
except queue.Empty:
if shutdown.is_set():
return
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:
if call_item is None:
# Wake up queue management thread
result_queue.put(None)
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))
result_queue.put(_ResultItem(call_item.work_id,
result=r))
def _add_call_item_to_queue(pending_work_items,
work_ids,
@ -179,8 +174,7 @@ def _queue_manangement_worker(executor_reference,
pending_work_items,
work_ids_queue,
call_queue,
result_queue,
shutdown_process_event):
result_queue):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
@ -198,9 +192,6 @@ def _queue_manangement_worker(executor_reference,
derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
shutdown_process_event: A multiprocessing.Event used to signal the
process workers that they should exit when their work queue is
empty.
"""
nb_shutdown_processes = 0
def shutdown_one_process():
@ -213,20 +204,16 @@ def _queue_manangement_worker(executor_reference,
work_ids_queue,
call_queue)
try:
result_item = result_queue.get(block=True)
except queue.Empty:
pass
else:
if result_item is not None:
work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
result_item = result_queue.get(block=True)
if result_item is not None:
work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
continue
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
continue
# If we come here, we either got a timeout or were explicitly woken up.
# In either case, check whether we should start shutting down.
executor = executor_reference()
@ -238,8 +225,6 @@ def _queue_manangement_worker(executor_reference,
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_process_event.set()
while nb_shutdown_processes < len(processes):
shutdown_one_process()
# If .join() is not called on the created processes then
@ -306,7 +291,6 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_process_event = multiprocessing.Event()
self._shutdown_lock = threading.Lock()
self._queue_count = 0
self._pending_work_items = {}
@ -324,8 +308,7 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue,
self._shutdown_process_event))
self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
@ -335,8 +318,7 @@ class ProcessPoolExecutor(_base.Executor):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._shutdown_process_event))
self._result_queue))
p.start()
self._processes.add(p)
@ -372,7 +354,6 @@ class ProcessPoolExecutor(_base.Executor):
self._queue_management_thread = None
self._call_queue = None
self._result_queue = None
self._shutdown_process_event = None
self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__

View File

@ -60,14 +60,10 @@ class _WorkItem(object):
def _worker(executor_reference, work_queue):
try:
while True:
try:
work_item = work_queue.get(block=True)
except queue.Empty:
pass
else:
if work_item is not None:
work_item.run()
continue
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR