From 841d9ee41a8ad0a8a372f9b84f0fa40b07bcc66b Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Sat, 3 Oct 2015 08:31:42 -0700 Subject: [PATCH] Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel. --- Lib/asyncio/futures.py | 74 +++++++++++++++++++++------ Lib/asyncio/tasks.py | 18 ++++++- Lib/test/test_asyncio/test_futures.py | 2 - Lib/test/test_asyncio/test_tasks.py | 67 ++++++++++++++++++++++++ Misc/ACKS | 1 + Misc/NEWS | 4 ++ 6 files changed, 147 insertions(+), 19 deletions(-) diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index dbe06c4a98b..166bc8047bf 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -390,22 +390,64 @@ class Future: __await__ = __iter__ # make compatible with 'await' expression -def wrap_future(fut, *, loop=None): +def _set_concurrent_future_state(concurrent, source): + """Copy state from a future to a concurrent.futures.Future.""" + assert source.done() + if source.cancelled(): + concurrent.cancel() + if not concurrent.set_running_or_notify_cancel(): + return + exception = source.exception() + if exception is not None: + concurrent.set_exception(exception) + else: + result = source.result() + concurrent.set_result(result) + + +def _chain_future(source, destination): + """Chain two futures so that when one completes, so does the other. + + The result (or exception) of source will be copied to destination. + If destination is cancelled, source gets cancelled too. + Compatible with both asyncio.Future and concurrent.futures.Future. + """ + if not isinstance(source, (Future, concurrent.futures.Future)): + raise TypeError('A future is required for source argument') + if not isinstance(destination, (Future, concurrent.futures.Future)): + raise TypeError('A future is required for destination argument') + source_loop = source._loop if isinstance(source, Future) else None + dest_loop = destination._loop if isinstance(destination, Future) else None + + def _set_state(future, other): + if isinstance(future, Future): + future._copy_state(other) + else: + _set_concurrent_future_state(future, other) + + def _call_check_cancel(destination): + if destination.cancelled(): + if source_loop is None or source_loop is dest_loop: + source.cancel() + else: + source_loop.call_soon_threadsafe(source.cancel) + + def _call_set_state(source): + if dest_loop is None or dest_loop is source_loop: + _set_state(destination, source) + else: + dest_loop.call_soon_threadsafe(_set_state, destination, source) + + destination.add_done_callback(_call_check_cancel) + source.add_done_callback(_call_set_state) + + +def wrap_future(future, *, loop=None): """Wrap concurrent.futures.Future object.""" - if isinstance(fut, Future): - return fut - assert isinstance(fut, concurrent.futures.Future), \ - 'concurrent.futures.Future is expected, got {!r}'.format(fut) - if loop is None: - loop = events.get_event_loop() + if isinstance(future, Future): + return future + assert isinstance(future, concurrent.futures.Future), \ + 'concurrent.futures.Future is expected, got {!r}'.format(future) new_future = Future(loop=loop) - - def _check_cancel_other(f): - if f.cancelled(): - fut.cancel() - - new_future.add_done_callback(_check_cancel_other) - fut.add_done_callback( - lambda future: loop.call_soon_threadsafe( - new_future._copy_state, future)) + _chain_future(future, new_future) return new_future diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 434f498e470..5a7bd9dbcb1 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -3,7 +3,7 @@ __all__ = ['Task', 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 'wait', 'wait_for', 'as_completed', 'sleep', 'async', - 'gather', 'shield', 'ensure_future', + 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', ] import concurrent.futures @@ -692,3 +692,19 @@ def shield(arg, *, loop=None): inner.add_done_callback(_done_callback) return outer + + +def run_coroutine_threadsafe(coro, loop): + """Submit a coroutine object to a given event loop. + + Return a concurrent.futures.Future to access the result. + """ + if not coroutines.iscoroutine(coro): + raise TypeError('A coroutine object is required') + future = concurrent.futures.Future() + + def callback(): + futures._chain_future(ensure_future(coro, loop=loop), future) + + loop.call_soon_threadsafe(callback) + return future diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py index c8b6829fb67..0bc0581d281 100644 --- a/Lib/test/test_asyncio/test_futures.py +++ b/Lib/test/test_asyncio/test_futures.py @@ -174,8 +174,6 @@ class FutureTests(test_utils.TestCase): '') def test_copy_state(self): - # Test the internal _copy_state method since it's being directly - # invoked in other modules. f = asyncio.Future(loop=self.loop) f.set_result(10) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 16d3d9da129..8ec5d9c9fdc 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): self.assertIsInstance(f.exception(), RuntimeError) +class RunCoroutineThreadsafeTests(test_utils.TestCase): + """Test case for futures.submit_to_loop.""" + + def setUp(self): + self.loop = self.new_test_loop(self.time_gen) + + def time_gen(self): + """Handle the timer.""" + yield 0 # second + yield 1 # second + + @asyncio.coroutine + def add(self, a, b, fail=False, cancel=False): + """Wait 1 second and return a + b.""" + yield from asyncio.sleep(1, loop=self.loop) + if fail: + raise RuntimeError("Fail!") + if cancel: + asyncio.tasks.Task.current_task(self.loop).cancel() + yield + return a + b + + def target(self, fail=False, cancel=False, timeout=None): + """Run add coroutine in the event loop.""" + coro = self.add(1, 2, fail=fail, cancel=cancel) + future = asyncio.run_coroutine_threadsafe(coro, self.loop) + try: + return future.result(timeout) + finally: + future.done() or future.cancel() + + def test_run_coroutine_threadsafe(self): + """Test coroutine submission from a thread to an event loop.""" + future = self.loop.run_in_executor(None, self.target) + result = self.loop.run_until_complete(future) + self.assertEqual(result, 3) + + def test_run_coroutine_threadsafe_with_exception(self): + """Test coroutine submission from a thread to an event loop + when an exception is raised.""" + future = self.loop.run_in_executor(None, self.target, True) + with self.assertRaises(RuntimeError) as exc_context: + self.loop.run_until_complete(future) + self.assertIn("Fail!", exc_context.exception.args) + + def test_run_coroutine_threadsafe_with_timeout(self): + """Test coroutine submission from a thread to an event loop + when a timeout is raised.""" + callback = lambda: self.target(timeout=0) + future = self.loop.run_in_executor(None, callback) + with self.assertRaises(asyncio.TimeoutError): + self.loop.run_until_complete(future) + # Clear the time generator and tasks + test_utils.run_briefly(self.loop) + # Check that there's no pending task (add has been cancelled) + for task in asyncio.Task.all_tasks(self.loop): + self.assertTrue(task.done()) + + def test_run_coroutine_threadsafe_task_cancelled(self): + """Test coroutine submission from a tread to an event loop + when the task is cancelled.""" + callback = lambda: self.target(cancel=True) + future = self.loop.run_in_executor(None, callback) + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(future) + + if __name__ == '__main__': unittest.main() diff --git a/Misc/ACKS b/Misc/ACKS index 9e2c57de7af..cae34e6b369 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -929,6 +929,7 @@ Steven Miale Trent Mick Jason Michalski Franck Michea +Vincent Michel Tom Middleton Thomas Miedema Stan Mihai diff --git a/Misc/NEWS b/Misc/NEWS index 1c8bc491e65..d4bea39dc61 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -90,6 +90,10 @@ Core and Builtins Library ------- +- Issue #25304: Add asyncio.run_coroutine_threadsafe(). This lets you + submit a coroutine to a loop from another thread, returning a + concurrent.futures.Future. By Vincent Michel. + - Issue #25232: Fix CGIRequestHandler to split the query from the URL at the first question mark (?) rather than the last. Patch from Xiang Zhang.