mirror of
https://github.com/python/cpython.git
synced 2024-11-24 10:24:35 +08:00
gh-104341: Add a Separate "Running" Lock for Each Thread (gh-104754)
Having a separate lock means Thread.join() doesn't need to wait for the thread to be cleaned up first. It can wait for the thread's Python target to finish running. This gives us some flexibility in how we clean up threads. (This is a minor cleanup as part of a fix for gh-104341.)
This commit is contained in:
parent
08b4eb83aa
commit
097b7830cd
@ -747,7 +747,7 @@ class ThreadTests(BaseTestCase):
|
|||||||
rc, out, err = assert_python_ok("-c", code)
|
rc, out, err = assert_python_ok("-c", code)
|
||||||
self.assertEqual(err, b"")
|
self.assertEqual(err, b"")
|
||||||
|
|
||||||
def test_tstate_lock(self):
|
def test_running_lock(self):
|
||||||
# Test an implementation detail of Thread objects.
|
# Test an implementation detail of Thread objects.
|
||||||
started = _thread.allocate_lock()
|
started = _thread.allocate_lock()
|
||||||
finish = _thread.allocate_lock()
|
finish = _thread.allocate_lock()
|
||||||
@ -757,29 +757,29 @@ class ThreadTests(BaseTestCase):
|
|||||||
started.release()
|
started.release()
|
||||||
finish.acquire()
|
finish.acquire()
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
# The tstate lock is None until the thread is started
|
# The running lock is None until the thread is started
|
||||||
t = threading.Thread(target=f)
|
t = threading.Thread(target=f)
|
||||||
self.assertIs(t._tstate_lock, None)
|
self.assertIs(t._running_lock, None)
|
||||||
t.start()
|
t.start()
|
||||||
started.acquire()
|
started.acquire()
|
||||||
self.assertTrue(t.is_alive())
|
self.assertTrue(t.is_alive())
|
||||||
# The tstate lock can't be acquired when the thread is running
|
# The running lock can't be acquired when the thread is running
|
||||||
# (or suspended).
|
# (or suspended).
|
||||||
tstate_lock = t._tstate_lock
|
running_lock = t._running_lock
|
||||||
self.assertFalse(tstate_lock.acquire(timeout=0), False)
|
self.assertFalse(running_lock.acquire(timeout=0), False)
|
||||||
finish.release()
|
finish.release()
|
||||||
# When the thread ends, the state_lock can be successfully
|
# When the thread ends, the state_lock can be successfully
|
||||||
# acquired.
|
# acquired.
|
||||||
self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
|
self.assertTrue(running_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
|
||||||
# But is_alive() is still True: we hold _tstate_lock now, which
|
# But is_alive() is still True: we hold _running_lock now, which
|
||||||
# prevents is_alive() from knowing the thread's end-of-life C code
|
# prevents is_alive() from knowing the thread's Python code
|
||||||
# is done.
|
# is done.
|
||||||
self.assertTrue(t.is_alive())
|
self.assertTrue(t.is_alive())
|
||||||
# Let is_alive() find out the C code is done.
|
# Let is_alive() find out the C code is done.
|
||||||
tstate_lock.release()
|
running_lock.release()
|
||||||
self.assertFalse(t.is_alive())
|
self.assertFalse(t.is_alive())
|
||||||
# And verify the thread disposed of _tstate_lock.
|
# And verify the thread disposed of _running_lock.
|
||||||
self.assertIsNone(t._tstate_lock)
|
self.assertIsNone(t._running_lock)
|
||||||
t.join()
|
t.join()
|
||||||
|
|
||||||
def test_repr_stopped(self):
|
def test_repr_stopped(self):
|
||||||
|
@ -908,6 +908,7 @@ class Thread:
|
|||||||
self._ident = None
|
self._ident = None
|
||||||
if _HAVE_THREAD_NATIVE_ID:
|
if _HAVE_THREAD_NATIVE_ID:
|
||||||
self._native_id = None
|
self._native_id = None
|
||||||
|
self._running_lock = None
|
||||||
self._tstate_lock = None
|
self._tstate_lock = None
|
||||||
self._started = Event()
|
self._started = Event()
|
||||||
self._is_stopped = False
|
self._is_stopped = False
|
||||||
@ -926,6 +927,9 @@ class Thread:
|
|||||||
# bpo-42350: If the fork happens when the thread is already stopped
|
# bpo-42350: If the fork happens when the thread is already stopped
|
||||||
# (ex: after threading._shutdown() has been called), _tstate_lock
|
# (ex: after threading._shutdown() has been called), _tstate_lock
|
||||||
# is None. Do nothing in this case.
|
# is None. Do nothing in this case.
|
||||||
|
if self._running_lock is not None:
|
||||||
|
self._running_lock._at_fork_reinit()
|
||||||
|
self._running_lock.acquire()
|
||||||
if self._tstate_lock is not None:
|
if self._tstate_lock is not None:
|
||||||
self._tstate_lock._at_fork_reinit()
|
self._tstate_lock._at_fork_reinit()
|
||||||
self._tstate_lock.acquire()
|
self._tstate_lock.acquire()
|
||||||
@ -933,6 +937,7 @@ class Thread:
|
|||||||
# The thread isn't alive after fork: it doesn't have a tstate
|
# The thread isn't alive after fork: it doesn't have a tstate
|
||||||
# anymore.
|
# anymore.
|
||||||
self._is_stopped = True
|
self._is_stopped = True
|
||||||
|
self._running_lock = None
|
||||||
self._tstate_lock = None
|
self._tstate_lock = None
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
@ -1019,6 +1024,14 @@ class Thread:
|
|||||||
def _set_native_id(self):
|
def _set_native_id(self):
|
||||||
self._native_id = get_native_id()
|
self._native_id = get_native_id()
|
||||||
|
|
||||||
|
def _set_running_lock(self):
|
||||||
|
"""
|
||||||
|
Set a lock object which will be released by the interpreter when
|
||||||
|
the target func has finished running.
|
||||||
|
"""
|
||||||
|
self._running_lock = _allocate_lock()
|
||||||
|
self._running_lock.acquire()
|
||||||
|
|
||||||
def _set_tstate_lock(self):
|
def _set_tstate_lock(self):
|
||||||
"""
|
"""
|
||||||
Set a lock object which will be released by the interpreter when
|
Set a lock object which will be released by the interpreter when
|
||||||
@ -1035,6 +1048,7 @@ class Thread:
|
|||||||
def _bootstrap_inner(self):
|
def _bootstrap_inner(self):
|
||||||
try:
|
try:
|
||||||
self._set_ident()
|
self._set_ident()
|
||||||
|
self._set_running_lock()
|
||||||
self._set_tstate_lock()
|
self._set_tstate_lock()
|
||||||
if _HAVE_THREAD_NATIVE_ID:
|
if _HAVE_THREAD_NATIVE_ID:
|
||||||
self._set_native_id()
|
self._set_native_id()
|
||||||
@ -1054,29 +1068,29 @@ class Thread:
|
|||||||
self._invoke_excepthook(self)
|
self._invoke_excepthook(self)
|
||||||
finally:
|
finally:
|
||||||
self._delete()
|
self._delete()
|
||||||
|
self._running_lock.release()
|
||||||
|
|
||||||
def _stop(self):
|
def _stop(self):
|
||||||
# After calling ._stop(), .is_alive() returns False and .join() returns
|
# After calling ._stop(), .is_alive() returns False and .join() returns
|
||||||
# immediately. ._tstate_lock must be released before calling ._stop().
|
# immediately. ._running_lock must be released before calling ._stop().
|
||||||
#
|
#
|
||||||
# Normal case: C code at the end of the thread's life
|
# Normal case: ._bootstrap_inner() releases ._running_lock, and
|
||||||
# (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
|
# that's detected by our ._wait_for_running_lock(), called by .join()
|
||||||
# that's detected by our ._wait_for_tstate_lock(), called by .join()
|
|
||||||
# and .is_alive(). Any number of threads _may_ call ._stop()
|
# and .is_alive(). Any number of threads _may_ call ._stop()
|
||||||
# simultaneously (for example, if multiple threads are blocked in
|
# simultaneously (for example, if multiple threads are blocked in
|
||||||
# .join() calls), and they're not serialized. That's harmless -
|
# .join() calls), and they're not serialized. That's harmless -
|
||||||
# they'll just make redundant rebindings of ._is_stopped and
|
# they'll just make redundant rebindings of ._is_stopped and
|
||||||
# ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the
|
# ._running_lock. Obscure: we rebind ._running_lock last so that the
|
||||||
# "assert self._is_stopped" in ._wait_for_tstate_lock() always works
|
# "assert self._is_stopped" in ._wait_for_running_lock() always works
|
||||||
# (the assert is executed only if ._tstate_lock is None).
|
# (the assert is executed only if ._running_lock is None).
|
||||||
#
|
#
|
||||||
# Special case: _main_thread releases ._tstate_lock via this
|
# Special case: _main_thread releases ._running_lock via this
|
||||||
# module's _shutdown() function.
|
# module's _shutdown() function.
|
||||||
lock = self._tstate_lock
|
lock = self._running_lock
|
||||||
if lock is not None:
|
if lock is not None:
|
||||||
assert not lock.locked()
|
assert not lock.locked()
|
||||||
self._is_stopped = True
|
self._is_stopped = True
|
||||||
self._tstate_lock = None
|
self._running_lock = None
|
||||||
if not self.daemon:
|
if not self.daemon:
|
||||||
with _shutdown_locks_lock:
|
with _shutdown_locks_lock:
|
||||||
# Remove our lock and other released locks from _shutdown_locks
|
# Remove our lock and other released locks from _shutdown_locks
|
||||||
@ -1123,20 +1137,17 @@ class Thread:
|
|||||||
raise RuntimeError("cannot join current thread")
|
raise RuntimeError("cannot join current thread")
|
||||||
|
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
self._wait_for_tstate_lock()
|
self._wait_for_running_lock()
|
||||||
else:
|
else:
|
||||||
# the behavior of a negative timeout isn't documented, but
|
# the behavior of a negative timeout isn't documented, but
|
||||||
# historically .join(timeout=x) for x<0 has acted as if timeout=0
|
# historically .join(timeout=x) for x<0 has acted as if timeout=0
|
||||||
self._wait_for_tstate_lock(timeout=max(timeout, 0))
|
self._wait_for_running_lock(timeout=max(timeout, 0))
|
||||||
|
|
||||||
def _wait_for_tstate_lock(self, block=True, timeout=-1):
|
def _wait_for_running_lock(self, block=True, timeout=-1):
|
||||||
# Issue #18808: wait for the thread state to be gone.
|
# This method passes its arguments to _running_lock.acquire().
|
||||||
# At the end of the thread's life, after all knowledge of the thread
|
# If the lock is acquired, the python code is done, and self._stop() is
|
||||||
# is removed from C data structures, C code releases our _tstate_lock.
|
# called. That sets ._is_stopped to True, and ._running_lock to None.
|
||||||
# This method passes its arguments to _tstate_lock.acquire().
|
lock = self._running_lock
|
||||||
# If the lock is acquired, the C code is done, and self._stop() is
|
|
||||||
# called. That sets ._is_stopped to True, and ._tstate_lock to None.
|
|
||||||
lock = self._tstate_lock
|
|
||||||
if lock is None:
|
if lock is None:
|
||||||
# already determined that the C code is done
|
# already determined that the C code is done
|
||||||
assert self._is_stopped
|
assert self._is_stopped
|
||||||
@ -1207,7 +1218,7 @@ class Thread:
|
|||||||
assert self._initialized, "Thread.__init__() not called"
|
assert self._initialized, "Thread.__init__() not called"
|
||||||
if self._is_stopped or not self._started.is_set():
|
if self._is_stopped or not self._started.is_set():
|
||||||
return False
|
return False
|
||||||
self._wait_for_tstate_lock(False)
|
self._wait_for_running_lock(False)
|
||||||
return not self._is_stopped
|
return not self._is_stopped
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -1417,7 +1428,7 @@ class _MainThread(Thread):
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
Thread.__init__(self, name="MainThread", daemon=False)
|
Thread.__init__(self, name="MainThread", daemon=False)
|
||||||
self._set_tstate_lock()
|
self._set_running_lock()
|
||||||
self._started.set()
|
self._started.set()
|
||||||
self._set_ident()
|
self._set_ident()
|
||||||
if _HAVE_THREAD_NATIVE_ID:
|
if _HAVE_THREAD_NATIVE_ID:
|
||||||
@ -1558,7 +1569,7 @@ def _shutdown():
|
|||||||
# dubious, but some code does it. We can't wait for C code to release
|
# dubious, but some code does it. We can't wait for C code to release
|
||||||
# the main thread's tstate_lock - that won't happen until the interpreter
|
# the main thread's tstate_lock - that won't happen until the interpreter
|
||||||
# is nearly dead. So we release it here. Note that just calling _stop()
|
# is nearly dead. So we release it here. Note that just calling _stop()
|
||||||
# isn't enough: other threads may already be waiting on _tstate_lock.
|
# isn't enough: other threads may already be waiting on _running_lock.
|
||||||
if _main_thread._is_stopped:
|
if _main_thread._is_stopped:
|
||||||
# _shutdown() was already called
|
# _shutdown() was already called
|
||||||
return
|
return
|
||||||
@ -1573,12 +1584,13 @@ def _shutdown():
|
|||||||
|
|
||||||
# Main thread
|
# Main thread
|
||||||
if _main_thread.ident == get_ident():
|
if _main_thread.ident == get_ident():
|
||||||
tlock = _main_thread._tstate_lock
|
assert _main_thread._tstate_lock is None
|
||||||
# The main thread isn't finished yet, so its thread state lock can't
|
running_lock = _main_thread._running_lock
|
||||||
|
# The main thread isn't finished yet, so its running lock can't
|
||||||
# have been released.
|
# have been released.
|
||||||
assert tlock is not None
|
assert running_lock is not None
|
||||||
assert tlock.locked()
|
assert running_lock.locked()
|
||||||
tlock.release()
|
running_lock.release()
|
||||||
_main_thread._stop()
|
_main_thread._stop()
|
||||||
else:
|
else:
|
||||||
# bpo-1596321: _shutdown() must be called in the main thread.
|
# bpo-1596321: _shutdown() must be called in the main thread.
|
||||||
|
Loading…
Reference in New Issue
Block a user