Merge 3.4 (asyncio)

This commit is contained in:
Victor Stinner 2015-01-30 00:05:36 +01:00
commit a9373ec966
6 changed files with 167 additions and 105 deletions

View File

@ -3,6 +3,7 @@ import subprocess
import sys
import warnings
from . import futures
from . import protocols
from . import transports
from .coroutines import coroutine
@ -13,27 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def __init__(self, loop, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
waiter=None, extra=None, **kwargs):
super().__init__(extra)
self._closed = False
self._protocol = protocol
self._loop = loop
self._proc = None
self._pid = None
self._returncode = None
self._exit_waiters = []
self._pending_calls = collections.deque()
self._pipes = {}
self._finished = False
if stdin == subprocess.PIPE:
self._pipes[0] = None
if stdout == subprocess.PIPE:
self._pipes[1] = None
if stderr == subprocess.PIPE:
self._pipes[2] = None
self._pending_calls = collections.deque()
self._finished = False
self._returncode = None
# Create the child process: set the _proc attribute
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, **kwargs)
self._pid = self._proc.pid
self._extra['subprocess'] = self._proc
if self._loop.get_debug():
if isinstance(args, (bytes, str)):
program = args
@ -42,6 +48,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
logger.debug('process %r created: pid %s',
program, self._pid)
self._loop.create_task(self._connect_pipes(waiter))
def __repr__(self):
info = [self.__class__.__name__]
if self._closed:
@ -77,12 +85,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def close(self):
self._closed = True
for proto in self._pipes.values():
if proto is None:
continue
proto.pipe.close()
if self._returncode is None:
self.terminate()
if self._proc is not None and self._returncode is None:
if self._loop.get_debug():
logger.warning('Close running child process: kill %r', self)
try:
self._proc.kill()
except ProcessLookupError:
pass
# Don't clear the _proc reference yet because _post_init() may
# still run
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
@ -105,59 +124,42 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
else:
return None
def _check_proc(self):
if self._closed:
raise ValueError("operation on closed transport")
if self._proc is None:
raise ProcessLookupError()
def send_signal(self, signal):
self._check_proc()
self._proc.send_signal(signal)
def terminate(self):
self._check_proc()
self._proc.terminate()
def kill(self):
self._check_proc()
self._proc.kill()
def _kill_wait(self):
"""Close pipes, kill the subprocess and read its return status.
Function called when an exception is raised during the creation
of a subprocess.
"""
self._closed = True
if self._loop.get_debug():
logger.warning('Exception during subprocess creation, '
'kill the subprocess %r',
self,
exc_info=True)
proc = self._proc
if proc.stdout:
proc.stdout.close()
if proc.stderr:
proc.stderr.close()
if proc.stdin:
proc.stdin.close()
try:
proc.kill()
except ProcessLookupError:
pass
self._returncode = proc.wait()
self.close()
@coroutine
def _post_init(self):
def _connect_pipes(self, waiter):
try:
proc = self._proc
loop = self._loop
if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
self._pipes[0] = pipe
if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
self._pipes[1] = pipe
if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 2),
@ -166,13 +168,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
assert self._pending_calls is not None
self._loop.call_soon(self._protocol.connection_made, self)
loop.call_soon(self._protocol.connection_made, self)
for callback, data in self._pending_calls:
self._loop.call_soon(callback, *data)
loop.call_soon(callback, *data)
self._pending_calls = None
except:
self._kill_wait()
raise
except Exception as exc:
if waiter is not None and not waiter.cancelled():
waiter.set_exception(exc)
else:
if waiter is not None and not waiter.cancelled():
waiter.set_result(None)
def _call(self, cb, *data):
if self._pending_calls is not None:
@ -197,6 +202,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self._call(self._protocol.process_exited)
self._try_finish()
# wake up futures waiting for wait()
for waiter in self._exit_waiters:
if not waiter.cancelled():
waiter.set_result(returncode)
self._exit_waiters = None
def wait(self):
"""Wait until the process exit and return the process return code.
This method is a coroutine."""
if self._returncode is not None:
return self._returncode
waiter = futures.Future(loop=self._loop)
self._exit_waiters.append(waiter)
return (yield from waiter)
def _try_finish(self):
assert not self._finished
if self._returncode is None:
@ -210,9 +232,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
try:
self._protocol.connection_lost(exc)
finally:
self._loop = None
self._proc = None
self._protocol = None
self._loop = None
class WriteSubprocessPipeProto(protocols.BaseProtocol):

View File

@ -25,8 +25,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
super().__init__(loop=loop)
self._limit = limit
self.stdin = self.stdout = self.stderr = None
self.waiter = futures.Future(loop=loop)
self._waiters = collections.deque()
self._transport = None
def __repr__(self):
@ -61,9 +59,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader=None,
loop=self._loop)
if not self.waiter.cancelled():
self.waiter.set_result(None)
def pipe_data_received(self, fd, data):
if fd == 1:
reader = self.stdout
@ -94,16 +89,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader.set_exception(exc)
def process_exited(self):
returncode = self._transport.get_returncode()
self._transport.close()
self._transport = None
# wake up futures waiting for wait()
while self._waiters:
waiter = self._waiters.popleft()
if not waiter.cancelled():
waiter.set_result(returncode)
class Process:
def __init__(self, transport, protocol, loop):
@ -124,30 +112,18 @@ class Process:
@coroutine
def wait(self):
"""Wait until the process exit and return the process return code."""
returncode = self._transport.get_returncode()
if returncode is not None:
return returncode
"""Wait until the process exit and return the process return code.
waiter = futures.Future(loop=self._loop)
self._protocol._waiters.append(waiter)
yield from waiter
return waiter.result()
def _check_alive(self):
if self._transport.get_returncode() is not None:
raise ProcessLookupError()
This method is a coroutine."""
return (yield from self._transport.wait())
def send_signal(self, signal):
self._check_alive()
self._transport.send_signal(signal)
def terminate(self):
self._check_alive()
self._transport.terminate()
def kill(self):
self._check_alive()
self._transport.kill()
@coroutine
@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
protocol_factory,
cmd, stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
try:
yield from protocol.waiter
except:
transport._kill_wait()
raise
return Process(transport, protocol, loop)
@coroutine
@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None,
program, *args,
stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
try:
yield from protocol.waiter
except:
transport._kill_wait()
raise
return Process(transport, protocol, loop)

View File

@ -16,6 +16,7 @@ from . import base_subprocess
from . import constants
from . import coroutines
from . import events
from . import futures
from . import selector_events
from . import selectors
from . import transports
@ -175,16 +176,20 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with events.get_child_watcher() as watcher:
waiter = futures.Future(loop=self)
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=extra, **kwargs)
try:
yield from transp._post_init()
except:
transp.close()
raise
waiter=waiter, extra=extra,
**kwargs)
watcher.add_child_handler(transp.get_pid(),
self._child_watcher_callback, transp)
try:
yield from waiter
except:
transp.close()
yield from transp.wait()
raise
return transp
@ -774,7 +779,7 @@ class SafeChildWatcher(BaseChildWatcher):
pass
def add_child_handler(self, pid, callback, *args):
self._callbacks[pid] = callback, args
self._callbacks[pid] = (callback, args)
# Prevent a race condition in case the child is already terminated.
self._do_waitpid(pid)

View File

@ -366,13 +366,16 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
waiter = futures.Future(loop=self)
transp = _WindowsSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=extra, **kwargs)
waiter=waiter, extra=extra,
**kwargs)
try:
yield from transp._post_init()
yield from waiter
except:
transp.close()
yield from transp.wait()
raise
return transp

View File

@ -1551,9 +1551,10 @@ class SubprocessTestsMixin:
stdin = transp.get_pipe_transport(0)
stdin.write(b'Python The Winner')
self.loop.run_until_complete(proto.got_data[1].wait())
transp.close()
with test_utils.disable_logger():
transp.close()
self.loop.run_until_complete(proto.completed)
self.check_terminated(proto.returncode)
self.check_killed(proto.returncode)
self.assertEqual(b'Python The Winner', proto.data[1])
def test_subprocess_interactive(self):
@ -1567,21 +1568,20 @@ class SubprocessTestsMixin:
self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
try:
stdin = transp.get_pipe_transport(0)
stdin.write(b'Python ')
self.loop.run_until_complete(proto.got_data[1].wait())
proto.got_data[1].clear()
self.assertEqual(b'Python ', proto.data[1])
stdin = transp.get_pipe_transport(0)
stdin.write(b'Python ')
self.loop.run_until_complete(proto.got_data[1].wait())
proto.got_data[1].clear()
self.assertEqual(b'Python ', proto.data[1])
stdin.write(b'The Winner')
self.loop.run_until_complete(proto.got_data[1].wait())
self.assertEqual(b'Python The Winner', proto.data[1])
finally:
stdin.write(b'The Winner')
self.loop.run_until_complete(proto.got_data[1].wait())
self.assertEqual(b'Python The Winner', proto.data[1])
with test_utils.disable_logger():
transp.close()
self.loop.run_until_complete(proto.completed)
self.check_terminated(proto.returncode)
self.check_killed(proto.returncode)
def test_subprocess_shell(self):
connect = self.loop.subprocess_shell(
@ -1739,9 +1739,10 @@ class SubprocessTestsMixin:
# GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
# WriteFile() we get ERROR_BROKEN_PIPE as expected.)
self.assertEqual(b'ERR:OSError', proto.data[2])
transp.close()
with test_utils.disable_logger():
transp.close()
self.loop.run_until_complete(proto.completed)
self.check_terminated(proto.returncode)
self.check_killed(proto.returncode)
def test_subprocess_wait_no_same_group(self):
# start the new process in a new session

View File

@ -4,6 +4,7 @@ import unittest
from unittest import mock
import asyncio
from asyncio import base_subprocess
from asyncio import subprocess
from asyncio import test_utils
try:
@ -23,6 +24,70 @@ PROGRAM_CAT = [
'data = sys.stdin.buffer.read()',
'sys.stdout.buffer.write(data)'))]
class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
def _start(self, *args, **kwargs):
self._proc = mock.Mock()
self._proc.stdin = None
self._proc.stdout = None
self._proc.stderr = None
class SubprocessTransportTests(test_utils.TestCase):
def setUp(self):
self.loop = self.new_test_loop()
self.set_event_loop(self.loop)
def create_transport(self, waiter=None):
protocol = mock.Mock()
protocol.connection_made._is_coroutine = False
protocol.process_exited._is_coroutine = False
transport = TestSubprocessTransport(
self.loop, protocol, ['test'], False,
None, None, None, 0, waiter=waiter)
return (transport, protocol)
def test_close(self):
waiter = asyncio.Future(loop=self.loop)
transport, protocol = self.create_transport(waiter)
transport._process_exited(0)
transport.close()
# The loop didn't run yet
self.assertFalse(protocol.connection_made.called)
# methods must raise ProcessLookupError if the transport was closed
self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM)
self.assertRaises(ValueError, transport.terminate)
self.assertRaises(ValueError, transport.kill)
self.loop.run_until_complete(waiter)
def test_proc_exited(self):
waiter = asyncio.Future(loop=self.loop)
transport, protocol = self.create_transport(waiter)
transport._process_exited(6)
self.loop.run_until_complete(waiter)
self.assertEqual(transport.get_returncode(), 6)
self.assertTrue(protocol.connection_made.called)
self.assertTrue(protocol.process_exited.called)
self.assertTrue(protocol.connection_lost.called)
self.assertEqual(protocol.connection_lost.call_args[0], (None,))
self.assertFalse(transport._closed)
self.assertIsNone(transport._loop)
self.assertIsNone(transport._proc)
self.assertIsNone(transport._protocol)
# methods must raise ProcessLookupError if the process exited
self.assertRaises(ProcessLookupError,
transport.send_signal, signal.SIGTERM)
self.assertRaises(ProcessLookupError, transport.terminate)
self.assertRaises(ProcessLookupError, transport.kill)
class SubprocessMixin:
def test_stdin_stdout(self):