mirror of
https://github.com/python/cpython.git
synced 2024-11-28 04:15:11 +08:00
bd73e72b4a
* Make error message more informative Replace assertions in error-reporting code with more-informative version that doesn't cause confusion over where and what the error is. * Additional clarification + get travis to check * Change from SystemError to TypeError As suggested in PR comment by @pitrou, changing from SystemError; TypeError appears appropriate. * NEWS file installation; ACKS addition (will do my best to justify it by additional work) * Making current AssertionErrors in multiprocessing more informative * Blurb added re multiprocessing managers.py, queues.py cleanup * Further multiprocessing cleanup - went through pool.py * Fix two asserts in multiprocessing/util.py * Most asserts in multiprocessing more informative * Didn't save right version * Further work on multiprocessing error messages * Correct typo * Correct typo v2 * Blasted colon... serves me right for trying to work on two things at once * Simplify NEWS entry * Update 2017-08-18-17-16-38.bpo-5001.gwnthq.rst * Update 2017-08-18-17-16-38.bpo-5001.gwnthq.rst OK, never mind. * Corrected (thanks to pitrou) error messages for notify * Remove extraneous backslash in docstring.
159 lines
5.2 KiB
Python
159 lines
5.2 KiB
Python
#
|
|
# We use a background thread for sharing fds on Unix, and for sharing sockets on
|
|
# Windows.
|
|
#
|
|
# A client which wants to pickle a resource registers it with the resource
|
|
# sharer and gets an identifier in return. The unpickling process will connect
|
|
# to the resource sharer, sends the identifier and its pid, and then receives
|
|
# the resource.
|
|
#
|
|
|
|
import os
|
|
import signal
|
|
import socket
|
|
import sys
|
|
import threading
|
|
|
|
from . import process
|
|
from .context import reduction
|
|
from . import util
|
|
|
|
__all__ = ['stop']
|
|
|
|
|
|
if sys.platform == 'win32':
|
|
__all__ += ['DupSocket']
|
|
|
|
class DupSocket(object):
|
|
'''Picklable wrapper for a socket.'''
|
|
def __init__(self, sock):
|
|
new_sock = sock.dup()
|
|
def send(conn, pid):
|
|
share = new_sock.share(pid)
|
|
conn.send_bytes(share)
|
|
self._id = _resource_sharer.register(send, new_sock.close)
|
|
|
|
def detach(self):
|
|
'''Get the socket. This should only be called once.'''
|
|
with _resource_sharer.get_connection(self._id) as conn:
|
|
share = conn.recv_bytes()
|
|
return socket.fromshare(share)
|
|
|
|
else:
|
|
__all__ += ['DupFd']
|
|
|
|
class DupFd(object):
|
|
'''Wrapper for fd which can be used at any time.'''
|
|
def __init__(self, fd):
|
|
new_fd = os.dup(fd)
|
|
def send(conn, pid):
|
|
reduction.send_handle(conn, new_fd, pid)
|
|
def close():
|
|
os.close(new_fd)
|
|
self._id = _resource_sharer.register(send, close)
|
|
|
|
def detach(self):
|
|
'''Get the fd. This should only be called once.'''
|
|
with _resource_sharer.get_connection(self._id) as conn:
|
|
return reduction.recv_handle(conn)
|
|
|
|
|
|
class _ResourceSharer(object):
|
|
'''Manager for resouces using background thread.'''
|
|
def __init__(self):
|
|
self._key = 0
|
|
self._cache = {}
|
|
self._old_locks = []
|
|
self._lock = threading.Lock()
|
|
self._listener = None
|
|
self._address = None
|
|
self._thread = None
|
|
util.register_after_fork(self, _ResourceSharer._afterfork)
|
|
|
|
def register(self, send, close):
|
|
'''Register resource, returning an identifier.'''
|
|
with self._lock:
|
|
if self._address is None:
|
|
self._start()
|
|
self._key += 1
|
|
self._cache[self._key] = (send, close)
|
|
return (self._address, self._key)
|
|
|
|
@staticmethod
|
|
def get_connection(ident):
|
|
'''Return connection from which to receive identified resource.'''
|
|
from .connection import Client
|
|
address, key = ident
|
|
c = Client(address, authkey=process.current_process().authkey)
|
|
c.send((key, os.getpid()))
|
|
return c
|
|
|
|
def stop(self, timeout=None):
|
|
'''Stop the background thread and clear registered resources.'''
|
|
from .connection import Client
|
|
with self._lock:
|
|
if self._address is not None:
|
|
c = Client(self._address,
|
|
authkey=process.current_process().authkey)
|
|
c.send(None)
|
|
c.close()
|
|
self._thread.join(timeout)
|
|
if self._thread.is_alive():
|
|
util.sub_warning('_ResourceSharer thread did '
|
|
'not stop when asked')
|
|
self._listener.close()
|
|
self._thread = None
|
|
self._address = None
|
|
self._listener = None
|
|
for key, (send, close) in self._cache.items():
|
|
close()
|
|
self._cache.clear()
|
|
|
|
def _afterfork(self):
|
|
for key, (send, close) in self._cache.items():
|
|
close()
|
|
self._cache.clear()
|
|
# If self._lock was locked at the time of the fork, it may be broken
|
|
# -- see issue 6721. Replace it without letting it be gc'ed.
|
|
self._old_locks.append(self._lock)
|
|
self._lock = threading.Lock()
|
|
if self._listener is not None:
|
|
self._listener.close()
|
|
self._listener = None
|
|
self._address = None
|
|
self._thread = None
|
|
|
|
def _start(self):
|
|
from .connection import Listener
|
|
assert self._listener is None, "Already have Listener"
|
|
util.debug('starting listener and thread for sending handles')
|
|
self._listener = Listener(authkey=process.current_process().authkey)
|
|
self._address = self._listener.address
|
|
t = threading.Thread(target=self._serve)
|
|
t.daemon = True
|
|
t.start()
|
|
self._thread = t
|
|
|
|
def _serve(self):
|
|
if hasattr(signal, 'pthread_sigmask'):
|
|
signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
|
|
while 1:
|
|
try:
|
|
with self._listener.accept() as conn:
|
|
msg = conn.recv()
|
|
if msg is None:
|
|
break
|
|
key, destination_pid = msg
|
|
send, close = self._cache.pop(key)
|
|
try:
|
|
send(conn, destination_pid)
|
|
finally:
|
|
close()
|
|
except:
|
|
if not util.is_exiting():
|
|
sys.excepthook(*sys.exc_info())
|
|
|
|
|
|
_resource_sharer = _ResourceSharer()
|
|
stop = _resource_sharer.stop
|