mirror of
https://github.com/python/cpython.git
synced 2024-11-23 01:45:25 +08:00
gh-71936: Fix race condition in multiprocessing.Pool (GH-124973)
* gh-71936: Fix race condition in multiprocessing.Pool Proxes of shared objects register a Finalizer in BaseProxy._incref(), and it will call BaseProxy._decref() when it is GCed. This may cause a race condition with Pool(maxtasksperchild=None) on Windows. A connection would be closed and raised TypeError when a GC occurs between _ConnectionBase._check_writable() and _ConnectionBase._send_bytes() in _ConnectionBase.send() in the second or later task, and a new object is allocated that shares the id() of a previously deleted one. Instead of using the id() of the token (or the proxy), use a unique, non-reusable number. Co-Authored-By: Akinori Hattori <hattya@gmail.com>
This commit is contained in:
parent
1e40c5ba47
commit
ba088c8f9c
@ -759,22 +759,29 @@ class BaseProxy(object):
|
||||
_address_to_local = {}
|
||||
_mutex = util.ForkAwareThreadLock()
|
||||
|
||||
# Each instance gets a `_serial` number. Unlike `id(...)`, this number
|
||||
# is never reused.
|
||||
_next_serial = 1
|
||||
|
||||
def __init__(self, token, serializer, manager=None,
|
||||
authkey=None, exposed=None, incref=True, manager_owned=False):
|
||||
with BaseProxy._mutex:
|
||||
tls_idset = BaseProxy._address_to_local.get(token.address, None)
|
||||
if tls_idset is None:
|
||||
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
|
||||
BaseProxy._address_to_local[token.address] = tls_idset
|
||||
tls_serials = BaseProxy._address_to_local.get(token.address, None)
|
||||
if tls_serials is None:
|
||||
tls_serials = util.ForkAwareLocal(), ProcessLocalSet()
|
||||
BaseProxy._address_to_local[token.address] = tls_serials
|
||||
|
||||
self._serial = BaseProxy._next_serial
|
||||
BaseProxy._next_serial += 1
|
||||
|
||||
# self._tls is used to record the connection used by this
|
||||
# thread to communicate with the manager at token.address
|
||||
self._tls = tls_idset[0]
|
||||
self._tls = tls_serials[0]
|
||||
|
||||
# self._idset is used to record the identities of all shared
|
||||
# objects for which the current process owns references and
|
||||
# self._all_serials is a set used to record the identities of all
|
||||
# shared objects for which the current process owns references and
|
||||
# which are in the manager at token.address
|
||||
self._idset = tls_idset[1]
|
||||
self._all_serials = tls_serials[1]
|
||||
|
||||
self._token = token
|
||||
self._id = self._token.id
|
||||
@ -857,20 +864,20 @@ class BaseProxy(object):
|
||||
dispatch(conn, None, 'incref', (self._id,))
|
||||
util.debug('INCREF %r', self._token.id)
|
||||
|
||||
self._idset.add(self._id)
|
||||
self._all_serials.add(self._serial)
|
||||
|
||||
state = self._manager and self._manager._state
|
||||
|
||||
self._close = util.Finalize(
|
||||
self, BaseProxy._decref,
|
||||
args=(self._token, self._authkey, state,
|
||||
self._tls, self._idset, self._Client),
|
||||
args=(self._token, self._serial, self._authkey, state,
|
||||
self._tls, self._all_serials, self._Client),
|
||||
exitpriority=10
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _decref(token, authkey, state, tls, idset, _Client):
|
||||
idset.discard(token.id)
|
||||
def _decref(token, serial, authkey, state, tls, idset, _Client):
|
||||
idset.discard(serial)
|
||||
|
||||
# check whether manager is still alive
|
||||
if state is None or state.value == State.STARTED:
|
||||
|
@ -733,6 +733,7 @@ Larry Hastings
|
||||
Tim Hatch
|
||||
Zac Hatfield-Dodds
|
||||
Shane Hathaway
|
||||
Akinori Hattori
|
||||
Michael Haubenwallner
|
||||
Janko Hauser
|
||||
Flavian Hautbois
|
||||
|
@ -0,0 +1 @@
|
||||
Fix a race condition in :class:`multiprocessing.pool.Pool`.
|
Loading…
Reference in New Issue
Block a user