mirror of
https://github.com/python/cpython.git
synced 2024-11-27 20:04:41 +08:00
cca30230d9
If a thread blocks while waiting on the `shared->mutex` lock, the array of QSBR states may be reallocated. The `tstate->qsbr` values before the lock is acquired may not be the same as the value after the lock is acquired.
283 lines
8.9 KiB
C
283 lines
8.9 KiB
C
/*
|
|
* Implementation of safe memory reclamation scheme using
|
|
* quiescent states.
|
|
*
|
|
* This is dervied from the "GUS" safe memory reclamation technique
|
|
* in FreeBSD written by Jeffrey Roberson. It is heavily modified. Any bugs
|
|
* in this code are likely due to the modifications.
|
|
*
|
|
* The original copyright is preserved below.
|
|
*
|
|
* Copyright (c) 2019,2020 Jeffrey Roberson <jeff@FreeBSD.org>
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice unmodified, this list of conditions, and the following
|
|
* disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
|
|
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
|
|
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
|
|
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
|
|
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
|
|
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
#include "Python.h"
|
|
#include "pycore_initconfig.h" // _PyStatus_NO_MEMORY()
|
|
#include "pycore_lock.h" // PyMutex_Lock()
|
|
#include "pycore_qsbr.h"
|
|
#include "pycore_pystate.h" // _PyThreadState_GET()
|
|
|
|
|
|
// Starting size of the array of qsbr thread states
|
|
#define MIN_ARRAY_SIZE 8
|
|
|
|
// For _Py_qsbr_deferred_advance(): the number of deferrals before advancing
|
|
// the write sequence.
|
|
#define QSBR_DEFERRED_LIMIT 10
|
|
|
|
// Allocate a QSBR thread state from the freelist
|
|
static struct _qsbr_thread_state *
|
|
qsbr_allocate(struct _qsbr_shared *shared)
|
|
{
|
|
struct _qsbr_thread_state *qsbr = shared->freelist;
|
|
if (qsbr == NULL) {
|
|
return NULL;
|
|
}
|
|
shared->freelist = qsbr->freelist_next;
|
|
qsbr->freelist_next = NULL;
|
|
qsbr->shared = shared;
|
|
qsbr->allocated = true;
|
|
return qsbr;
|
|
}
|
|
|
|
// Initialize (or reintialize) the freelist of QSBR thread states
|
|
static void
|
|
initialize_new_array(struct _qsbr_shared *shared)
|
|
{
|
|
for (Py_ssize_t i = 0; i != shared->size; i++) {
|
|
struct _qsbr_thread_state *qsbr = &shared->array[i].qsbr;
|
|
if (qsbr->tstate != NULL) {
|
|
// Update the thread state pointer to its QSBR state
|
|
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *)qsbr->tstate;
|
|
tstate->qsbr = qsbr;
|
|
}
|
|
if (!qsbr->allocated) {
|
|
// Push to freelist
|
|
qsbr->freelist_next = shared->freelist;
|
|
shared->freelist = qsbr;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Grow the array of QSBR thread states. Returns 0 on success, -1 on failure.
|
|
static int
|
|
grow_thread_array(struct _qsbr_shared *shared)
|
|
{
|
|
Py_ssize_t new_size = shared->size * 2;
|
|
if (new_size < MIN_ARRAY_SIZE) {
|
|
new_size = MIN_ARRAY_SIZE;
|
|
}
|
|
|
|
struct _qsbr_pad *array = PyMem_RawCalloc(new_size, sizeof(*array));
|
|
if (array == NULL) {
|
|
return -1;
|
|
}
|
|
|
|
struct _qsbr_pad *old = shared->array;
|
|
if (old != NULL) {
|
|
memcpy(array, shared->array, shared->size * sizeof(*array));
|
|
}
|
|
|
|
shared->array = array;
|
|
shared->size = new_size;
|
|
shared->freelist = NULL;
|
|
initialize_new_array(shared);
|
|
|
|
PyMem_RawFree(old);
|
|
return 0;
|
|
}
|
|
|
|
uint64_t
|
|
_Py_qsbr_advance(struct _qsbr_shared *shared)
|
|
{
|
|
// NOTE: with 64-bit sequence numbers, we don't have to worry too much
|
|
// about the wr_seq getting too far ahead of rd_seq, but if we ever use
|
|
// 32-bit sequence numbers, we'll need to be more careful.
|
|
return _Py_atomic_add_uint64(&shared->wr_seq, QSBR_INCR) + QSBR_INCR;
|
|
}
|
|
|
|
uint64_t
|
|
_Py_qsbr_deferred_advance(struct _qsbr_thread_state *qsbr)
|
|
{
|
|
if (++qsbr->deferrals < QSBR_DEFERRED_LIMIT) {
|
|
return _Py_qsbr_shared_current(qsbr->shared) + QSBR_INCR;
|
|
}
|
|
qsbr->deferrals = 0;
|
|
return _Py_qsbr_advance(qsbr->shared);
|
|
}
|
|
|
|
static uint64_t
|
|
qsbr_poll_scan(struct _qsbr_shared *shared)
|
|
{
|
|
// Synchronize with store in _Py_qsbr_attach(). We need to ensure that
|
|
// the reads from each thread's sequence number are not reordered to see
|
|
// earlier "offline" states.
|
|
_Py_atomic_fence_seq_cst();
|
|
|
|
// Compute the minimum sequence number of all attached threads
|
|
uint64_t min_seq = _Py_atomic_load_uint64(&shared->wr_seq);
|
|
struct _qsbr_pad *array = shared->array;
|
|
for (Py_ssize_t i = 0, size = shared->size; i != size; i++) {
|
|
struct _qsbr_thread_state *qsbr = &array[i].qsbr;
|
|
|
|
uint64_t seq = _Py_atomic_load_uint64(&qsbr->seq);
|
|
if (seq != QSBR_OFFLINE && QSBR_LT(seq, min_seq)) {
|
|
min_seq = seq;
|
|
}
|
|
}
|
|
|
|
// Update the shared read sequence
|
|
uint64_t rd_seq = _Py_atomic_load_uint64(&shared->rd_seq);
|
|
if (QSBR_LT(rd_seq, min_seq)) {
|
|
// It's okay if the compare-exchange failed: another thread updated it
|
|
(void)_Py_atomic_compare_exchange_uint64(&shared->rd_seq, &rd_seq, min_seq);
|
|
rd_seq = min_seq;
|
|
}
|
|
|
|
return rd_seq;
|
|
}
|
|
|
|
bool
|
|
_Py_qsbr_poll(struct _qsbr_thread_state *qsbr, uint64_t goal)
|
|
{
|
|
assert(_PyThreadState_GET()->state == _Py_THREAD_ATTACHED);
|
|
if (_Py_qbsr_goal_reached(qsbr, goal)) {
|
|
return true;
|
|
}
|
|
|
|
uint64_t rd_seq = qsbr_poll_scan(qsbr->shared);
|
|
return QSBR_LEQ(goal, rd_seq);
|
|
}
|
|
|
|
void
|
|
_Py_qsbr_attach(struct _qsbr_thread_state *qsbr)
|
|
{
|
|
assert(qsbr->seq == 0 && "already attached");
|
|
|
|
uint64_t seq = _Py_qsbr_shared_current(qsbr->shared);
|
|
_Py_atomic_store_uint64(&qsbr->seq, seq); // needs seq_cst
|
|
}
|
|
|
|
void
|
|
_Py_qsbr_detach(struct _qsbr_thread_state *qsbr)
|
|
{
|
|
assert(qsbr->seq != 0 && "already detached");
|
|
|
|
_Py_atomic_store_uint64_release(&qsbr->seq, QSBR_OFFLINE);
|
|
}
|
|
|
|
Py_ssize_t
|
|
_Py_qsbr_reserve(PyInterpreterState *interp)
|
|
{
|
|
struct _qsbr_shared *shared = &interp->qsbr;
|
|
|
|
PyMutex_Lock(&shared->mutex);
|
|
// Try allocating from our internal freelist
|
|
struct _qsbr_thread_state *qsbr = qsbr_allocate(shared);
|
|
|
|
// If there are no free entries, we pause all threads, grow the array,
|
|
// and update the pointers in PyThreadState to entries in the new array.
|
|
if (qsbr == NULL) {
|
|
_PyEval_StopTheWorld(interp);
|
|
if (grow_thread_array(shared) == 0) {
|
|
qsbr = qsbr_allocate(shared);
|
|
}
|
|
_PyEval_StartTheWorld(interp);
|
|
}
|
|
PyMutex_Unlock(&shared->mutex);
|
|
|
|
if (qsbr == NULL) {
|
|
return -1;
|
|
}
|
|
|
|
// Return an index rather than the pointer because the array may be
|
|
// resized and the pointer invalidated.
|
|
return (struct _qsbr_pad *)qsbr - shared->array;
|
|
}
|
|
|
|
void
|
|
_Py_qsbr_register(_PyThreadStateImpl *tstate, PyInterpreterState *interp,
|
|
Py_ssize_t index)
|
|
{
|
|
// Associate the QSBR state with the thread state
|
|
struct _qsbr_shared *shared = &interp->qsbr;
|
|
|
|
PyMutex_Lock(&shared->mutex);
|
|
struct _qsbr_thread_state *qsbr = &interp->qsbr.array[index].qsbr;
|
|
assert(qsbr->allocated && qsbr->tstate == NULL);
|
|
qsbr->tstate = (PyThreadState *)tstate;
|
|
tstate->qsbr = qsbr;
|
|
PyMutex_Unlock(&shared->mutex);
|
|
}
|
|
|
|
void
|
|
_Py_qsbr_unregister(_PyThreadStateImpl *tstate)
|
|
{
|
|
struct _qsbr_shared *shared = tstate->qsbr->shared;
|
|
|
|
PyMutex_Lock(&shared->mutex);
|
|
// NOTE: we must load (or reload) the thread state's qbsr inside the mutex
|
|
// because the array may have been resized (changing tstate->qsbr) while
|
|
// we waited to acquire the mutex.
|
|
struct _qsbr_thread_state *qsbr = tstate->qsbr;
|
|
|
|
assert(qsbr->seq == 0 && "thread state must be detached");
|
|
assert(qsbr->allocated && qsbr->tstate == (PyThreadState *)tstate);
|
|
|
|
tstate->qsbr = NULL;
|
|
qsbr->tstate = NULL;
|
|
qsbr->allocated = false;
|
|
qsbr->freelist_next = shared->freelist;
|
|
shared->freelist = qsbr;
|
|
PyMutex_Unlock(&shared->mutex);
|
|
}
|
|
|
|
void
|
|
_Py_qsbr_fini(PyInterpreterState *interp)
|
|
{
|
|
struct _qsbr_shared *shared = &interp->qsbr;
|
|
PyMem_RawFree(shared->array);
|
|
shared->array = NULL;
|
|
shared->size = 0;
|
|
shared->freelist = NULL;
|
|
}
|
|
|
|
void
|
|
_Py_qsbr_after_fork(_PyThreadStateImpl *tstate)
|
|
{
|
|
struct _qsbr_thread_state *this_qsbr = tstate->qsbr;
|
|
struct _qsbr_shared *shared = this_qsbr->shared;
|
|
|
|
_PyMutex_at_fork_reinit(&shared->mutex);
|
|
|
|
for (Py_ssize_t i = 0; i != shared->size; i++) {
|
|
struct _qsbr_thread_state *qsbr = &shared->array[i].qsbr;
|
|
if (qsbr != this_qsbr && qsbr->allocated) {
|
|
qsbr->tstate = NULL;
|
|
qsbr->allocated = false;
|
|
qsbr->freelist_next = shared->freelist;
|
|
shared->freelist = qsbr;
|
|
}
|
|
}
|
|
}
|