diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index e686a6a1c4c..e735b81f234 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -22,13 +22,13 @@ streams:: '127.0.0.1', 8888) print(f'Send: {message!r}') - await writer.awrite(message.encode()) + await writer.write(message.encode()) data = await reader.read(100) print(f'Received: {data.decode()!r}') print('Close the connection') - await writer.aclose() + await writer.close() asyncio.run(tcp_echo_client('Hello World!')) @@ -226,23 +226,70 @@ StreamWriter directly; use :func:`open_connection` and :func:`start_server` instead. - .. coroutinemethod:: awrite(data) + .. method:: write(data) - Write *data* to the stream. + The method attempts to write the *data* to the underlying socket immediately. + If that fails, the data is queued in an internal write buffer until it can be + sent. - The method respects flow control, execution is paused if the write - buffer reaches the high watermark. + Starting with Python 3.8, it is possible to directly await on the `write()` + method:: - .. versionadded:: 3.8 + await stream.write(data) - .. coroutinemethod:: aclose() + The ``await`` pauses the current coroutine until the data is written to the + socket. - Close the stream. + Below is an equivalent code that works with Python <= 3.7:: - Wait until all closing actions are complete, e.g. SSL shutdown for - secure sockets. + stream.write(data) + await stream.drain() - .. versionadded:: 3.8 + .. versionchanged:: 3.8 + Support ``await stream.write(...)`` syntax. + + .. method:: writelines(data) + + The method writes a list (or any iterable) of bytes to the underlying socket + immediately. + If that fails, the data is queued in an internal write buffer until it can be + sent. + + Starting with Python 3.8, it is possible to directly await on the `write()` + method:: + + await stream.writelines(lines) + + The ``await`` pauses the current coroutine until the data is written to the + socket. + + Below is an equivalent code that works with Python <= 3.7:: + + stream.writelines(lines) + await stream.drain() + + .. versionchanged:: 3.8 + Support ``await stream.writelines()`` syntax. + + .. method:: close() + + The method closes the stream and the underlying socket. + + Starting with Python 3.8, it is possible to directly await on the `close()` + method:: + + await stream.close() + + The ``await`` pauses the current coroutine until the stream and the underlying + socket are closed (and SSL shutdown is performed for a secure connection). + + Below is an equivalent code that works with Python <= 3.7:: + + stream.close() + await stream.wait_closed() + + .. versionchanged:: 3.8 + Support ``await stream.close()`` syntax. .. method:: can_write_eof() @@ -263,21 +310,6 @@ StreamWriter Access optional transport information; see :meth:`BaseTransport.get_extra_info` for details. - .. method:: write(data) - - Write *data* to the stream. - - This method is not subject to flow control. Calls to ``write()`` should - be followed by :meth:`drain`. The :meth:`awrite` method is a - recommended alternative the applies flow control automatically. - - .. method:: writelines(data) - - Write a list (or any iterable) of bytes to the stream. - - This method is not subject to flow control. Calls to ``writelines()`` - should be followed by :meth:`drain`. - .. coroutinemethod:: drain() Wait until it is appropriate to resume writing to the stream. @@ -293,10 +325,6 @@ StreamWriter be resumed. When there is nothing to wait for, the :meth:`drain` returns immediately. - .. method:: close() - - Close the stream. - .. method:: is_closing() Return ``True`` if the stream is closed or in the process of diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 79adf028212..d9a9f5e72d3 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -352,6 +352,8 @@ class StreamWriter: assert reader is None or isinstance(reader, StreamReader) self._reader = reader self._loop = loop + self._complete_fut = self._loop.create_future() + self._complete_fut.set_result(None) def __repr__(self): info = [self.__class__.__name__, f'transport={self._transport!r}'] @@ -365,9 +367,33 @@ class StreamWriter: def write(self, data): self._transport.write(data) + return self._fast_drain() def writelines(self, data): self._transport.writelines(data) + return self._fast_drain() + + def _fast_drain(self): + # The helper tries to use fast-path to return already existing complete future + # object if underlying transport is not paused and actual waiting for writing + # resume is not needed + if self._reader is not None: + # this branch will be simplified after merging reader with writer + exc = self._reader.exception() + if exc is not None: + fut = self._loop.create_future() + fut.set_exception(exc) + return fut + if not self._transport.is_closing(): + if self._protocol._connection_lost: + fut = self._loop.create_future() + fut.set_exception(ConnectionResetError('Connection lost')) + return fut + if not self._protocol._paused: + # fast path, the stream is not paused + # no need to wait for resume signal + return self._complete_fut + return self._loop.create_task(self.drain()) def write_eof(self): return self._transport.write_eof() @@ -377,6 +403,7 @@ class StreamWriter: def close(self): self._transport.close() + return self._protocol._get_close_waiter(self) def is_closing(self): return self._transport.is_closing() @@ -408,14 +435,6 @@ class StreamWriter: raise ConnectionResetError('Connection lost') await self._protocol._drain_helper() - async def aclose(self): - self.close() - await self.wait_closed() - - async def awrite(self, data): - self.write(data) - await self.drain() - class StreamReader: diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 905141ca89c..bf93f30e1aa 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -1035,24 +1035,42 @@ os.close(fd) messages[0]['message']) def test_async_writer_api(self): + async def inner(httpd): + rd, wr = await asyncio.open_connection(*httpd.address) + + await wr.write(b'GET / HTTP/1.0\r\n\r\n') + data = await rd.readline() + self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') + data = await rd.read() + self.assertTrue(data.endswith(b'\r\n\r\nTest message')) + await wr.close() + messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) with test_utils.run_test_server() as httpd: - rd, wr = self.loop.run_until_complete( - asyncio.open_connection(*httpd.address, - loop=self.loop)) + self.loop.run_until_complete(inner(httpd)) - f = wr.awrite(b'GET / HTTP/1.0\r\n\r\n') - self.loop.run_until_complete(f) - f = rd.readline() - data = self.loop.run_until_complete(f) + self.assertEqual(messages, []) + + def test_async_writer_api(self): + async def inner(httpd): + rd, wr = await asyncio.open_connection(*httpd.address) + + await wr.write(b'GET / HTTP/1.0\r\n\r\n') + data = await rd.readline() self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - f = rd.read() - data = self.loop.run_until_complete(f) + data = await rd.read() self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - f = wr.aclose() - self.loop.run_until_complete(f) + wr.close() + with self.assertRaises(ConnectionResetError): + await wr.write(b'data') + + messages = [] + self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) + + with test_utils.run_test_server() as httpd: + self.loop.run_until_complete(inner(httpd)) self.assertEqual(messages, []) @@ -1066,7 +1084,7 @@ os.close(fd) asyncio.open_connection(*httpd.address, loop=self.loop)) - f = wr.aclose() + f = wr.close() self.loop.run_until_complete(f) assert rd.at_eof() f = rd.read() diff --git a/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst b/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst new file mode 100644 index 00000000000..f59863b7b7a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst @@ -0,0 +1,2 @@ +Provide both sync and async calls for StreamWriter.write() and +StreamWriter.close()