mirror of
https://github.com/python/cpython.git
synced 2024-11-24 02:15:30 +08:00
bpo-5001: More-informative multiprocessing error messages (#3079)
* Make error message more informative Replace assertions in error-reporting code with more-informative version that doesn't cause confusion over where and what the error is. * Additional clarification + get travis to check * Change from SystemError to TypeError As suggested in PR comment by @pitrou, changing from SystemError; TypeError appears appropriate. * NEWS file installation; ACKS addition (will do my best to justify it by additional work) * Making current AssertionErrors in multiprocessing more informative * Blurb added re multiprocessing managers.py, queues.py cleanup * Further multiprocessing cleanup - went through pool.py * Fix two asserts in multiprocessing/util.py * Most asserts in multiprocessing more informative * Didn't save right version * Further work on multiprocessing error messages * Correct typo * Correct typo v2 * Blasted colon... serves me right for trying to work on two things at once * Simplify NEWS entry * Update 2017-08-18-17-16-38.bpo-5001.gwnthq.rst * Update 2017-08-18-17-16-38.bpo-5001.gwnthq.rst OK, never mind. * Corrected (thanks to pitrou) error messages for notify * Remove extraneous backslash in docstring.
This commit is contained in:
parent
631fdee6e6
commit
bd73e72b4a
@ -720,7 +720,9 @@ FAILURE = b'#FAILURE#'
|
||||
|
||||
def deliver_challenge(connection, authkey):
|
||||
import hmac
|
||||
assert isinstance(authkey, bytes)
|
||||
if not isinstance(authkey, bytes):
|
||||
raise ValueError(
|
||||
"Authkey must be bytes, not {0!s}".format(type(authkey)))
|
||||
message = os.urandom(MESSAGE_LENGTH)
|
||||
connection.send_bytes(CHALLENGE + message)
|
||||
digest = hmac.new(authkey, message, 'md5').digest()
|
||||
@ -733,7 +735,9 @@ def deliver_challenge(connection, authkey):
|
||||
|
||||
def answer_challenge(connection, authkey):
|
||||
import hmac
|
||||
assert isinstance(authkey, bytes)
|
||||
if not isinstance(authkey, bytes):
|
||||
raise ValueError(
|
||||
"Authkey must be bytes, not {0!s}".format(type(authkey)))
|
||||
message = connection.recv_bytes(256) # reject large message
|
||||
assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
|
||||
message = message[len(CHALLENGE):]
|
||||
|
@ -41,7 +41,10 @@ class DummyProcess(threading.Thread):
|
||||
self._parent = current_process()
|
||||
|
||||
def start(self):
|
||||
assert self._parent is current_process()
|
||||
if self._parent is not current_process():
|
||||
raise RuntimeError(
|
||||
"Parent is {0!r} but current_process is {1!r}".format(
|
||||
self._parent, current_process()))
|
||||
self._start_called = True
|
||||
if hasattr(self._parent, '_children'):
|
||||
self._parent._children[self] = None
|
||||
|
@ -189,7 +189,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
|
||||
|
||||
if alive_r in rfds:
|
||||
# EOF because no more client processes left
|
||||
assert os.read(alive_r, 1) == b''
|
||||
assert os.read(alive_r, 1) == b'', "Not at EOF?"
|
||||
raise SystemExit
|
||||
|
||||
if sig_r in rfds:
|
||||
@ -208,7 +208,10 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
|
||||
if os.WIFSIGNALED(sts):
|
||||
returncode = -os.WTERMSIG(sts)
|
||||
else:
|
||||
assert os.WIFEXITED(sts)
|
||||
if not os.WIFEXITED(sts):
|
||||
raise AssertionError(
|
||||
"Child {0:n} status is {1:n}".format(
|
||||
pid,sts))
|
||||
returncode = os.WEXITSTATUS(sts)
|
||||
# Send exit code to client process
|
||||
try:
|
||||
@ -227,7 +230,10 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
|
||||
with listener.accept()[0] as s:
|
||||
# Receive fds from client
|
||||
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
|
||||
assert len(fds) <= MAXFDS_TO_SEND
|
||||
if len(fds) > MAXFDS_TO_SEND:
|
||||
raise RuntimeError(
|
||||
"Too many ({0:n}) fds to send".format(
|
||||
len(fds)))
|
||||
child_r, child_w, *fds = fds
|
||||
s.close()
|
||||
pid = os.fork()
|
||||
|
@ -211,7 +211,10 @@ class Heap(object):
|
||||
# synchronously sometimes later from malloc() or free(), by calling
|
||||
# _free_pending_blocks() (appending and retrieving from a list is not
|
||||
# strictly thread-safe but under cPython it's atomic thanks to the GIL).
|
||||
assert os.getpid() == self._lastpid
|
||||
if os.getpid() != self._lastpid:
|
||||
raise ValueError(
|
||||
"My pid ({0:n}) is not last pid {1:n}".format(
|
||||
os.getpid(),self._lastpid))
|
||||
if not self._lock.acquire(False):
|
||||
# can't acquire the lock right now, add the block to the list of
|
||||
# pending blocks to free
|
||||
@ -227,7 +230,10 @@ class Heap(object):
|
||||
|
||||
def malloc(self, size):
|
||||
# return a block of right size (possibly rounded up)
|
||||
assert 0 <= size < sys.maxsize
|
||||
if size < 0:
|
||||
raise ValueError("Size {0:n} out of range".format(size))
|
||||
if sys.maxsize <= size:
|
||||
raise OverflowError("Size {0:n} too large".format(size))
|
||||
if os.getpid() != self._lastpid:
|
||||
self.__init__() # reinitialize after fork
|
||||
with self._lock:
|
||||
@ -250,7 +256,10 @@ class BufferWrapper(object):
|
||||
_heap = Heap()
|
||||
|
||||
def __init__(self, size):
|
||||
assert 0 <= size < sys.maxsize
|
||||
if size < 0:
|
||||
raise ValueError("Size {0:n} out of range".format(size))
|
||||
if sys.maxsize <= size:
|
||||
raise OverflowError("Size {0:n} too large".format(size))
|
||||
block = BufferWrapper._heap.malloc(size)
|
||||
self._state = (block, size)
|
||||
util.Finalize(self, BufferWrapper._heap.free, args=(block,))
|
||||
|
@ -23,7 +23,7 @@ from time import time as _time
|
||||
from traceback import format_exc
|
||||
|
||||
from . import connection
|
||||
from .context import reduction, get_spawning_popen
|
||||
from .context import reduction, get_spawning_popen, ProcessError
|
||||
from . import pool
|
||||
from . import process
|
||||
from . import util
|
||||
@ -133,7 +133,10 @@ class Server(object):
|
||||
'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
|
||||
|
||||
def __init__(self, registry, address, authkey, serializer):
|
||||
assert isinstance(authkey, bytes)
|
||||
if not isinstance(authkey, bytes):
|
||||
raise TypeError(
|
||||
"Authkey {0!r} is type {1!s}, not bytes".format(
|
||||
authkey, type(authkey)))
|
||||
self.registry = registry
|
||||
self.authkey = process.AuthenticationString(authkey)
|
||||
Listener, Client = listener_client[serializer]
|
||||
@ -163,7 +166,7 @@ class Server(object):
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
pass
|
||||
finally:
|
||||
if sys.stdout != sys.__stdout__:
|
||||
if sys.stdout != sys.__stdout__: # what about stderr?
|
||||
util.debug('resetting stdout, stderr')
|
||||
sys.stdout = sys.__stdout__
|
||||
sys.stderr = sys.__stderr__
|
||||
@ -316,6 +319,7 @@ class Server(object):
|
||||
'''
|
||||
Return some info --- useful to spot problems with refcounting
|
||||
'''
|
||||
# Perhaps include debug info about 'c'?
|
||||
with self.mutex:
|
||||
result = []
|
||||
keys = list(self.id_to_refcount.keys())
|
||||
@ -356,7 +360,9 @@ class Server(object):
|
||||
self.registry[typeid]
|
||||
|
||||
if callable is None:
|
||||
assert len(args) == 1 and not kwds
|
||||
if kwds or (len(args) != 1):
|
||||
raise ValueError(
|
||||
"Without callable, must have one non-keyword argument")
|
||||
obj = args[0]
|
||||
else:
|
||||
obj = callable(*args, **kwds)
|
||||
@ -364,7 +370,10 @@ class Server(object):
|
||||
if exposed is None:
|
||||
exposed = public_methods(obj)
|
||||
if method_to_typeid is not None:
|
||||
assert type(method_to_typeid) is dict
|
||||
if not isinstance(method_to_typeid, dict):
|
||||
raise TypeError(
|
||||
"Method_to_typeid {0!r}: type {1!s}, not dict".format(
|
||||
method_to_typeid, type(method_to_typeid)))
|
||||
exposed = list(exposed) + list(method_to_typeid)
|
||||
|
||||
ident = '%x' % id(obj) # convert to string because xmlrpclib
|
||||
@ -417,7 +426,11 @@ class Server(object):
|
||||
return
|
||||
|
||||
with self.mutex:
|
||||
assert self.id_to_refcount[ident] >= 1
|
||||
if self.id_to_refcount[ident] <= 0:
|
||||
raise AssertionError(
|
||||
"Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
|
||||
ident, self.id_to_obj[ident],
|
||||
self.id_to_refcount[ident]))
|
||||
self.id_to_refcount[ident] -= 1
|
||||
if self.id_to_refcount[ident] == 0:
|
||||
del self.id_to_refcount[ident]
|
||||
@ -480,7 +493,14 @@ class BaseManager(object):
|
||||
'''
|
||||
Return server object with serve_forever() method and address attribute
|
||||
'''
|
||||
assert self._state.value == State.INITIAL
|
||||
if self._state.value != State.INITIAL:
|
||||
if self._state.value == State.STARTED:
|
||||
raise ProcessError("Already started server")
|
||||
elif self._state.value == State.SHUTDOWN:
|
||||
raise ProcessError("Manager has shut down")
|
||||
else:
|
||||
raise ProcessError(
|
||||
"Unknown state {!r}".format(self._state.value))
|
||||
return Server(self._registry, self._address,
|
||||
self._authkey, self._serializer)
|
||||
|
||||
@ -497,7 +517,14 @@ class BaseManager(object):
|
||||
'''
|
||||
Spawn a server process for this manager object
|
||||
'''
|
||||
assert self._state.value == State.INITIAL
|
||||
if self._state.value != State.INITIAL:
|
||||
if self._state.value == State.STARTED:
|
||||
raise ProcessError("Already started server")
|
||||
elif self._state.value == State.SHUTDOWN:
|
||||
raise ProcessError("Manager has shut down")
|
||||
else:
|
||||
raise ProcessError(
|
||||
"Unknown state {!r}".format(self._state.value))
|
||||
|
||||
if initializer is not None and not callable(initializer):
|
||||
raise TypeError('initializer must be a callable')
|
||||
@ -593,7 +620,14 @@ class BaseManager(object):
|
||||
def __enter__(self):
|
||||
if self._state.value == State.INITIAL:
|
||||
self.start()
|
||||
assert self._state.value == State.STARTED
|
||||
if self._state.value != State.STARTED:
|
||||
if self._state.value == State.INITIAL:
|
||||
raise ProcessError("Unable to start server")
|
||||
elif self._state.value == State.SHUTDOWN:
|
||||
raise ProcessError("Manager has shut down")
|
||||
else:
|
||||
raise ProcessError(
|
||||
"Unknown state {!r}".format(self._state.value))
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
@ -653,7 +687,7 @@ class BaseManager(object):
|
||||
getattr(proxytype, '_method_to_typeid_', None)
|
||||
|
||||
if method_to_typeid:
|
||||
for key, value in list(method_to_typeid.items()):
|
||||
for key, value in list(method_to_typeid.items()): # isinstance?
|
||||
assert type(key) is str, '%r is not a string' % key
|
||||
assert type(value) is str, '%r is not a string' % value
|
||||
|
||||
|
@ -92,7 +92,9 @@ class MaybeEncodingError(Exception):
|
||||
|
||||
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
|
||||
wrap_exception=False):
|
||||
assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
|
||||
if (maxtasks is not None) and not (isinstance(maxtasks, int)
|
||||
and maxtasks >= 1):
|
||||
raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
|
||||
put = outqueue.put
|
||||
get = inqueue.get
|
||||
if hasattr(inqueue, '_writer'):
|
||||
@ -254,8 +256,8 @@ class Pool(object):
|
||||
def apply(self, func, args=(), kwds={}):
|
||||
'''
|
||||
Equivalent of `func(*args, **kwds)`.
|
||||
Pool must be running.
|
||||
'''
|
||||
assert self._state == RUN
|
||||
return self.apply_async(func, args, kwds).get()
|
||||
|
||||
def map(self, func, iterable, chunksize=None):
|
||||
@ -307,6 +309,10 @@ class Pool(object):
|
||||
))
|
||||
return result
|
||||
else:
|
||||
if chunksize < 1:
|
||||
raise ValueError(
|
||||
"Chunksize must be 1+, not {0:n}".format(
|
||||
chunksize))
|
||||
assert chunksize > 1
|
||||
task_batches = Pool._get_tasks(func, iterable, chunksize)
|
||||
result = IMapIterator(self._cache)
|
||||
@ -334,7 +340,9 @@ class Pool(object):
|
||||
))
|
||||
return result
|
||||
else:
|
||||
assert chunksize > 1
|
||||
if chunksize < 1:
|
||||
raise ValueError(
|
||||
"Chunksize must be 1+, not {0!r}".format(chunksize))
|
||||
task_batches = Pool._get_tasks(func, iterable, chunksize)
|
||||
result = IMapUnorderedIterator(self._cache)
|
||||
self._taskqueue.put(
|
||||
@ -466,7 +474,7 @@ class Pool(object):
|
||||
return
|
||||
|
||||
if thread._state:
|
||||
assert thread._state == TERMINATE
|
||||
assert thread._state == TERMINATE, "Thread not in TERMINATE"
|
||||
util.debug('result handler found thread._state=TERMINATE')
|
||||
break
|
||||
|
||||
@ -542,7 +550,10 @@ class Pool(object):
|
||||
|
||||
def join(self):
|
||||
util.debug('joining pool')
|
||||
assert self._state in (CLOSE, TERMINATE)
|
||||
if self._state == RUN:
|
||||
raise ValueError("Pool is still running")
|
||||
elif self._state not in (CLOSE, TERMINATE):
|
||||
raise ValueError("In unknown state")
|
||||
self._worker_handler.join()
|
||||
self._task_handler.join()
|
||||
self._result_handler.join()
|
||||
@ -570,7 +581,9 @@ class Pool(object):
|
||||
util.debug('helping task handler/workers to finish')
|
||||
cls._help_stuff_finish(inqueue, task_handler, len(pool))
|
||||
|
||||
assert result_handler.is_alive() or len(cache) == 0
|
||||
if (not result_handler.is_alive()) and (len(cache) != 0):
|
||||
raise AssertionError(
|
||||
"Cannot have cache with result_hander not alive")
|
||||
|
||||
result_handler._state = TERMINATE
|
||||
outqueue.put(None) # sentinel
|
||||
@ -628,7 +641,8 @@ class ApplyResult(object):
|
||||
return self._event.is_set()
|
||||
|
||||
def successful(self):
|
||||
assert self.ready()
|
||||
if not self.ready():
|
||||
raise ValueError("{0!r} not ready".format(self))
|
||||
return self._success
|
||||
|
||||
def wait(self, timeout=None):
|
||||
|
@ -35,7 +35,7 @@ class Popen(object):
|
||||
if os.WIFSIGNALED(sts):
|
||||
self.returncode = -os.WTERMSIG(sts)
|
||||
else:
|
||||
assert os.WIFEXITED(sts)
|
||||
assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
|
||||
self.returncode = os.WEXITSTATUS(sts)
|
||||
return self.returncode
|
||||
|
||||
|
@ -78,7 +78,7 @@ class Queue(object):
|
||||
self._poll = self._reader.poll
|
||||
|
||||
def put(self, obj, block=True, timeout=None):
|
||||
assert not self._closed
|
||||
assert not self._closed, "Queue {0!r} has been closed".format(self)
|
||||
if not self._sem.acquire(block, timeout):
|
||||
raise Full
|
||||
|
||||
@ -140,7 +140,7 @@ class Queue(object):
|
||||
|
||||
def join_thread(self):
|
||||
debug('Queue.join_thread()')
|
||||
assert self._closed
|
||||
assert self._closed, "Queue {0!r} not closed".format(self)
|
||||
if self._jointhread:
|
||||
self._jointhread()
|
||||
|
||||
@ -281,7 +281,7 @@ class JoinableQueue(Queue):
|
||||
self._cond, self._unfinished_tasks = state[-2:]
|
||||
|
||||
def put(self, obj, block=True, timeout=None):
|
||||
assert not self._closed
|
||||
assert not self._closed, "Queue {0!r} is closed".format(self)
|
||||
if not self._sem.acquire(block, timeout):
|
||||
raise Full
|
||||
|
||||
|
@ -165,7 +165,10 @@ else:
|
||||
if len(cmsg_data) % a.itemsize != 0:
|
||||
raise ValueError
|
||||
a.frombytes(cmsg_data)
|
||||
assert len(a) % 256 == msg[0]
|
||||
if len(a) % 256 != msg[0]:
|
||||
raise AssertionError(
|
||||
"Len is {0:n} but msg[0] is {1!r}".format(
|
||||
len(a), msg[0]))
|
||||
return list(a)
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
@ -125,7 +125,7 @@ class _ResourceSharer(object):
|
||||
|
||||
def _start(self):
|
||||
from .connection import Listener
|
||||
assert self._listener is None
|
||||
assert self._listener is None, "Already have Listener"
|
||||
util.debug('starting listener and thread for sending handles')
|
||||
self._listener = Listener(authkey=process.current_process().authkey)
|
||||
self._address = self._listener.address
|
||||
|
@ -80,7 +80,8 @@ class SemaphoreTracker(object):
|
||||
# bytes are atomic, and that PIPE_BUF >= 512
|
||||
raise ValueError('name too long')
|
||||
nbytes = os.write(self._fd, msg)
|
||||
assert nbytes == len(msg)
|
||||
assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
|
||||
nbytes, len(msg))
|
||||
|
||||
|
||||
_semaphore_tracker = SemaphoreTracker()
|
||||
|
@ -93,7 +93,7 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
|
||||
'''
|
||||
Run code specified by data received over pipe
|
||||
'''
|
||||
assert is_forking(sys.argv)
|
||||
assert is_forking(sys.argv), "Not forking"
|
||||
if sys.platform == 'win32':
|
||||
import msvcrt
|
||||
new_handle = reduction.steal_handle(parent_pid, pipe_handle)
|
||||
|
@ -270,13 +270,16 @@ class Condition(object):
|
||||
|
||||
def notify(self, n=1):
|
||||
assert self._lock._semlock._is_mine(), 'lock is not owned'
|
||||
assert not self._wait_semaphore.acquire(False)
|
||||
assert not self._wait_semaphore.acquire(
|
||||
False), ('notify: Should not have been able to acquire'
|
||||
+ '_wait_semaphore')
|
||||
|
||||
# to take account of timeouts since last notify*() we subtract
|
||||
# woken_count from sleeping_count and rezero woken_count
|
||||
while self._woken_count.acquire(False):
|
||||
res = self._sleeping_count.acquire(False)
|
||||
assert res
|
||||
assert res, ('notify: Bug in sleeping_count.acquire'
|
||||
+ '- res should not be False')
|
||||
|
||||
sleepers = 0
|
||||
while sleepers < n and self._sleeping_count.acquire(False):
|
||||
|
@ -149,12 +149,15 @@ class Finalize(object):
|
||||
Class which supports object finalization using weakrefs
|
||||
'''
|
||||
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
|
||||
assert exitpriority is None or type(exitpriority) is int
|
||||
if (exitpriority is not None) and not isinstance(exitpriority,int):
|
||||
raise TypeError(
|
||||
"Exitpriority ({0!r}) must be None or int, not {1!s}".format(
|
||||
exitpriority, type(exitpriority)))
|
||||
|
||||
if obj is not None:
|
||||
self._weakref = weakref.ref(obj, self)
|
||||
else:
|
||||
assert exitpriority is not None
|
||||
elif exitpriority is None:
|
||||
raise ValueError("Without object, exitpriority cannot be None")
|
||||
|
||||
self._callback = callback
|
||||
self._args = args
|
||||
|
@ -0,0 +1 @@
|
||||
Many asserts in `multiprocessing` are now more informative, and some error types have been changed to more specific ones.
|
Loading…
Reference in New Issue
Block a user