gh-126914: Store the Preallocated Thread State's Pointer in a PyInterpreterState Field (gh-126989)

This approach eliminates the originally reported race. It also gets rid of the deadlock reported in gh-96071, so we can remove the workaround added then.
This commit is contained in:
Eric Snow 2024-11-19 12:59:19 -07:00 committed by GitHub
parent 824afbf548
commit 1c0a104eca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 84 additions and 50 deletions

View File

@ -130,6 +130,7 @@ struct _is {
uint64_t next_unique_id;
/* The linked list of threads, newest first. */
PyThreadState *head;
_PyThreadStateImpl *preallocated;
/* The thread currently executing in the __main__ module, if any. */
PyThreadState *main;
/* Used in Modules/_threadmodule.c. */
@ -278,9 +279,10 @@ struct _is {
struct _Py_interp_cached_objects cached_objects;
struct _Py_interp_static_objects static_objects;
Py_ssize_t _interactive_src_count;
/* the initial PyInterpreterState.threads.head */
_PyThreadStateImpl _initial_thread;
Py_ssize_t _interactive_src_count;
};

View File

@ -118,6 +118,9 @@ extern PyTypeObject _PyExc_MemoryError;
{ \
.id_refcount = -1, \
._whence = _PyInterpreterState_WHENCE_NOTSET, \
.threads = { \
.preallocated = &(INTERP)._initial_thread, \
}, \
.imports = IMPORTS_INIT, \
.ceval = { \
.recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \

View File

@ -23,6 +23,7 @@ class StressTests(TestBase):
alive.append(interp)
@support.requires_resource('cpu')
@threading_helper.requires_working_threading()
def test_create_many_threaded(self):
alive = []
def task():
@ -32,6 +33,35 @@ class StressTests(TestBase):
with threading_helper.start_threads(threads):
pass
@support.requires_resource('cpu')
@threading_helper.requires_working_threading()
def test_many_threads_running_interp_in_other_interp(self):
interp = interpreters.create()
script = f"""if True:
import _interpreters
_interpreters.run_string({interp.id}, '1')
"""
def run():
interp = interpreters.create()
alreadyrunning = (f'{interpreters.InterpreterError}: '
'interpreter already running')
success = False
while not success:
try:
interp.exec(script)
except interpreters.ExecutionFailed as exc:
if exc.excinfo.msg != 'interpreter already running':
raise # re-raise
assert exc.excinfo.type.__name__ == 'InterpreterError'
else:
success = True
threads = (threading.Thread(target=run) for _ in range(200))
with threading_helper.start_threads(threads):
pass
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.

View File

@ -629,6 +629,8 @@ init_interpreter(PyInterpreterState *interp,
assert(next != NULL || (interp == runtime->interpreters.main));
interp->next = next;
interp->threads.preallocated = &interp->_initial_thread;
// We would call _PyObject_InitState() at this point
// if interp->feature_flags were alredy set.
@ -766,7 +768,6 @@ PyInterpreterState_New(void)
return interp;
}
static void
interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate)
{
@ -910,6 +911,9 @@ interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate)
// XXX Once we have one allocator per interpreter (i.e.
// per-interpreter GC) we must ensure that all of the interpreter's
// objects have been cleaned up at the point.
// We could clear interp->threads.freelist here
// if it held more than just the initial thread state.
}
@ -1386,22 +1390,45 @@ allocate_chunk(int size_in_bytes, _PyStackChunk* previous)
return res;
}
static _PyThreadStateImpl *
alloc_threadstate(void)
static void
reset_threadstate(_PyThreadStateImpl *tstate)
{
return PyMem_RawCalloc(1, sizeof(_PyThreadStateImpl));
// Set to _PyThreadState_INIT directly?
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
}
static _PyThreadStateImpl *
alloc_threadstate(PyInterpreterState *interp)
{
_PyThreadStateImpl *tstate;
// Try the preallocated tstate first.
tstate = _Py_atomic_exchange_ptr(&interp->threads.preallocated, NULL);
// Fall back to the allocator.
if (tstate == NULL) {
tstate = PyMem_RawCalloc(1, sizeof(_PyThreadStateImpl));
if (tstate == NULL) {
return NULL;
}
reset_threadstate(tstate);
}
return tstate;
}
static void
free_threadstate(_PyThreadStateImpl *tstate)
{
PyInterpreterState *interp = tstate->base.interp;
// The initial thread state of the interpreter is allocated
// as part of the interpreter state so should not be freed.
if (tstate == &tstate->base.interp->_initial_thread) {
// Restore to _PyThreadState_INIT.
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
if (tstate == &interp->_initial_thread) {
// Make it available again.
reset_threadstate(tstate);
assert(interp->threads.preallocated == NULL);
_Py_atomic_store_ptr(&interp->threads.preallocated, tstate);
}
else {
PyMem_RawFree(tstate);
@ -1492,66 +1519,38 @@ add_threadstate(PyInterpreterState *interp, PyThreadState *tstate,
static PyThreadState *
new_threadstate(PyInterpreterState *interp, int whence)
{
_PyThreadStateImpl *tstate;
_PyRuntimeState *runtime = interp->runtime;
// We don't need to allocate a thread state for the main interpreter
// (the common case), but doing it later for the other case revealed a
// reentrancy problem (deadlock). So for now we always allocate before
// taking the interpreters lock. See GH-96071.
_PyThreadStateImpl *new_tstate = alloc_threadstate();
int used_newtstate;
if (new_tstate == NULL) {
// Allocate the thread state.
_PyThreadStateImpl *tstate = alloc_threadstate(interp);
if (tstate == NULL) {
return NULL;
}
#ifdef Py_GIL_DISABLED
Py_ssize_t qsbr_idx = _Py_qsbr_reserve(interp);
if (qsbr_idx < 0) {
PyMem_RawFree(new_tstate);
free_threadstate(tstate);
return NULL;
}
int32_t tlbc_idx = _Py_ReserveTLBCIndex(interp);
if (tlbc_idx < 0) {
PyMem_RawFree(new_tstate);
free_threadstate(tstate);
return NULL;
}
#endif
/* We serialize concurrent creation to protect global state. */
HEAD_LOCK(runtime);
HEAD_LOCK(interp->runtime);
// Initialize the new thread state.
interp->threads.next_unique_id += 1;
uint64_t id = interp->threads.next_unique_id;
// Allocate the thread state and add it to the interpreter.
PyThreadState *old_head = interp->threads.head;
if (old_head == NULL) {
// It's the interpreter's initial thread state.
used_newtstate = 0;
tstate = &interp->_initial_thread;
}
// XXX Re-use interp->_initial_thread if not in use?
else {
// Every valid interpreter must have at least one thread.
assert(id > 1);
assert(old_head->prev == NULL);
used_newtstate = 1;
tstate = new_tstate;
// Set to _PyThreadState_INIT.
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
}
init_threadstate(tstate, interp, id, whence);
// Add the new thread state to the interpreter.
PyThreadState *old_head = interp->threads.head;
add_threadstate(interp, (PyThreadState *)tstate, old_head);
HEAD_UNLOCK(runtime);
if (!used_newtstate) {
// Must be called with lock unlocked to avoid re-entrancy deadlock.
PyMem_RawFree(new_tstate);
}
else {
}
HEAD_UNLOCK(interp->runtime);
#ifdef Py_GIL_DISABLED
// Must be called with lock unlocked to avoid lock ordering deadlocks.