mirror of
https://github.com/python/cpython.git
synced 2024-12-12 03:04:15 +08:00
925907ea36
* use the ParkingLot API to manage waiting threads * use Argument Clinic's critical section directive to protect queue methods * remove unnecessary overflow check Co-authored-by: Erlend E. Aasland <erlend.aasland@protonmail.com>
618 lines
16 KiB
C
618 lines
16 KiB
C
#ifndef Py_BUILD_CORE_BUILTIN
|
|
# define Py_BUILD_CORE_MODULE 1
|
|
#endif
|
|
|
|
#include "Python.h"
|
|
#include "pycore_ceval.h" // Py_MakePendingCalls()
|
|
#include "pycore_moduleobject.h" // _PyModule_GetState()
|
|
#include "pycore_parking_lot.h"
|
|
#include "pycore_time.h" // _PyTime_t
|
|
|
|
#include <stdbool.h>
|
|
#include <stddef.h> // offsetof()
|
|
|
|
typedef struct {
|
|
PyTypeObject *SimpleQueueType;
|
|
PyObject *EmptyError;
|
|
} simplequeue_state;
|
|
|
|
static simplequeue_state *
|
|
simplequeue_get_state(PyObject *module)
|
|
{
|
|
simplequeue_state *state = _PyModule_GetState(module);
|
|
assert(state);
|
|
return state;
|
|
}
|
|
static struct PyModuleDef queuemodule;
|
|
#define simplequeue_get_state_by_type(type) \
|
|
(simplequeue_get_state(PyType_GetModuleByDef(type, &queuemodule)))
|
|
|
|
static const Py_ssize_t INITIAL_RING_BUF_CAPACITY = 8;
|
|
|
|
typedef struct {
|
|
// Where to place the next item
|
|
Py_ssize_t put_idx;
|
|
|
|
// Where to get the next item
|
|
Py_ssize_t get_idx;
|
|
|
|
PyObject **items;
|
|
|
|
// Total number of items that may be stored
|
|
Py_ssize_t items_cap;
|
|
|
|
// Number of items stored
|
|
Py_ssize_t num_items;
|
|
} RingBuf;
|
|
|
|
static int
|
|
RingBuf_Init(RingBuf *buf)
|
|
{
|
|
buf->put_idx = 0;
|
|
buf->get_idx = 0;
|
|
buf->items_cap = INITIAL_RING_BUF_CAPACITY;
|
|
buf->num_items = 0;
|
|
buf->items = PyMem_Calloc(buf->items_cap, sizeof(PyObject *));
|
|
if (buf->items == NULL) {
|
|
PyErr_NoMemory();
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static PyObject *
|
|
RingBuf_At(RingBuf *buf, Py_ssize_t idx)
|
|
{
|
|
assert(idx >= 0 && idx < buf->num_items);
|
|
return buf->items[(buf->get_idx + idx) % buf->items_cap];
|
|
}
|
|
|
|
static void
|
|
RingBuf_Fini(RingBuf *buf)
|
|
{
|
|
PyObject **items = buf->items;
|
|
Py_ssize_t num_items = buf->num_items;
|
|
Py_ssize_t cap = buf->items_cap;
|
|
Py_ssize_t idx = buf->get_idx;
|
|
buf->items = NULL;
|
|
buf->put_idx = 0;
|
|
buf->get_idx = 0;
|
|
buf->num_items = 0;
|
|
buf->items_cap = 0;
|
|
for (Py_ssize_t n = num_items; n > 0; idx = (idx + 1) % cap, n--) {
|
|
Py_DECREF(items[idx]);
|
|
}
|
|
PyMem_Free(items);
|
|
}
|
|
|
|
// Resize the underlying items array of buf to the new capacity and arrange
|
|
// the items contiguously in the new items array.
|
|
//
|
|
// Returns -1 on allocation failure or 0 on success.
|
|
static int
|
|
resize_ringbuf(RingBuf *buf, Py_ssize_t capacity)
|
|
{
|
|
Py_ssize_t new_capacity = Py_MAX(INITIAL_RING_BUF_CAPACITY, capacity);
|
|
if (new_capacity == buf->items_cap) {
|
|
return 0;
|
|
}
|
|
assert(buf->num_items <= new_capacity);
|
|
|
|
PyObject **new_items = PyMem_Calloc(new_capacity, sizeof(PyObject *));
|
|
if (new_items == NULL) {
|
|
return -1;
|
|
}
|
|
|
|
// Copy the "tail" of the old items array. This corresponds to "head" of
|
|
// the abstract ring buffer.
|
|
Py_ssize_t tail_size =
|
|
Py_MIN(buf->num_items, buf->items_cap - buf->get_idx);
|
|
if (tail_size > 0) {
|
|
memcpy(new_items, buf->items + buf->get_idx,
|
|
tail_size * sizeof(PyObject *));
|
|
}
|
|
|
|
// Copy the "head" of the old items array, if any. This corresponds to the
|
|
// "tail" of the abstract ring buffer.
|
|
Py_ssize_t head_size = buf->num_items - tail_size;
|
|
if (head_size > 0) {
|
|
memcpy(new_items + tail_size, buf->items,
|
|
head_size * sizeof(PyObject *));
|
|
}
|
|
|
|
PyMem_Free(buf->items);
|
|
buf->items = new_items;
|
|
buf->items_cap = new_capacity;
|
|
buf->get_idx = 0;
|
|
buf->put_idx = buf->num_items;
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Returns a strong reference from the head of the buffer.
|
|
static PyObject *
|
|
RingBuf_Get(RingBuf *buf)
|
|
{
|
|
assert(buf->num_items > 0);
|
|
|
|
if (buf->num_items < (buf->items_cap / 4)) {
|
|
// Items is less than 25% occupied, shrink it by 50%. This allows for
|
|
// growth without immediately needing to resize the underlying items
|
|
// array.
|
|
//
|
|
// It's safe it ignore allocation failures here; shrinking is an
|
|
// optimization that isn't required for correctness.
|
|
(void)resize_ringbuf(buf, buf->items_cap / 2);
|
|
}
|
|
|
|
PyObject *item = buf->items[buf->get_idx];
|
|
buf->items[buf->get_idx] = NULL;
|
|
buf->get_idx = (buf->get_idx + 1) % buf->items_cap;
|
|
buf->num_items--;
|
|
return item;
|
|
}
|
|
|
|
// Returns 0 on success or -1 if the buffer failed to grow.
|
|
//
|
|
// Steals a reference to item.
|
|
static int
|
|
RingBuf_Put(RingBuf *buf, PyObject *item)
|
|
{
|
|
assert(buf->num_items <= buf->items_cap);
|
|
|
|
if (buf->num_items == buf->items_cap) {
|
|
// Buffer is full, grow it.
|
|
if (resize_ringbuf(buf, buf->items_cap * 2) < 0) {
|
|
PyErr_NoMemory();
|
|
return -1;
|
|
}
|
|
}
|
|
buf->items[buf->put_idx] = item;
|
|
buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
|
|
buf->num_items++;
|
|
return 0;
|
|
}
|
|
|
|
static Py_ssize_t
|
|
RingBuf_Len(RingBuf *buf)
|
|
{
|
|
return buf->num_items;
|
|
}
|
|
|
|
static bool
|
|
RingBuf_IsEmpty(RingBuf *buf)
|
|
{
|
|
return buf->num_items == 0;
|
|
}
|
|
|
|
typedef struct {
|
|
PyObject_HEAD
|
|
|
|
// Are there threads waiting for items
|
|
bool has_threads_waiting;
|
|
|
|
// Items in the queue
|
|
RingBuf buf;
|
|
|
|
PyObject *weakreflist;
|
|
} simplequeueobject;
|
|
|
|
/*[clinic input]
|
|
module _queue
|
|
class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(type)->SimpleQueueType"
|
|
[clinic start generated code]*/
|
|
/*[clinic end generated code: output=da39a3ee5e6b4b0d input=0a4023fe4d198c8d]*/
|
|
|
|
static int
|
|
simplequeue_clear(simplequeueobject *self)
|
|
{
|
|
RingBuf_Fini(&self->buf);
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
simplequeue_dealloc(simplequeueobject *self)
|
|
{
|
|
PyTypeObject *tp = Py_TYPE(self);
|
|
|
|
PyObject_GC_UnTrack(self);
|
|
(void)simplequeue_clear(self);
|
|
if (self->weakreflist != NULL)
|
|
PyObject_ClearWeakRefs((PyObject *) self);
|
|
Py_TYPE(self)->tp_free(self);
|
|
Py_DECREF(tp);
|
|
}
|
|
|
|
static int
|
|
simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg)
|
|
{
|
|
RingBuf *buf = &self->buf;
|
|
for (Py_ssize_t i = 0, num_items = buf->num_items; i < num_items; i++) {
|
|
Py_VISIT(RingBuf_At(buf, i));
|
|
}
|
|
Py_VISIT(Py_TYPE(self));
|
|
return 0;
|
|
}
|
|
|
|
/*[clinic input]
|
|
@classmethod
|
|
_queue.SimpleQueue.__new__ as simplequeue_new
|
|
|
|
Simple, unbounded, reentrant FIFO queue.
|
|
[clinic start generated code]*/
|
|
|
|
static PyObject *
|
|
simplequeue_new_impl(PyTypeObject *type)
|
|
/*[clinic end generated code: output=ba97740608ba31cd input=a0674a1643e3e2fb]*/
|
|
{
|
|
simplequeueobject *self;
|
|
|
|
self = (simplequeueobject *) type->tp_alloc(type, 0);
|
|
if (self != NULL) {
|
|
self->weakreflist = NULL;
|
|
if (RingBuf_Init(&self->buf) < 0) {
|
|
Py_DECREF(self);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
return (PyObject *) self;
|
|
}
|
|
|
|
typedef struct {
|
|
bool handed_off;
|
|
simplequeueobject *queue;
|
|
PyObject *item;
|
|
} HandoffData;
|
|
|
|
static void
|
|
maybe_handoff_item(HandoffData *data, PyObject **item, int has_more_waiters)
|
|
{
|
|
if (item == NULL) {
|
|
// No threads were waiting
|
|
data->handed_off = false;
|
|
}
|
|
else {
|
|
// There was at least one waiting thread, hand off the item
|
|
*item = data->item;
|
|
data->handed_off = true;
|
|
}
|
|
data->queue->has_threads_waiting = has_more_waiters;
|
|
}
|
|
|
|
/*[clinic input]
|
|
@critical_section
|
|
_queue.SimpleQueue.put
|
|
item: object
|
|
block: bool = True
|
|
timeout: object = None
|
|
|
|
Put the item on the queue.
|
|
|
|
The optional 'block' and 'timeout' arguments are ignored, as this method
|
|
never blocks. They are provided for compatibility with the Queue class.
|
|
|
|
[clinic start generated code]*/
|
|
|
|
static PyObject *
|
|
_queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
|
|
int block, PyObject *timeout)
|
|
/*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/
|
|
{
|
|
HandoffData data = {
|
|
.handed_off = 0,
|
|
.item = Py_NewRef(item),
|
|
.queue = self,
|
|
};
|
|
if (self->has_threads_waiting) {
|
|
// Try to hand the item off directly if there are threads waiting
|
|
_PyParkingLot_Unpark(&self->has_threads_waiting,
|
|
(_Py_unpark_fn_t *)maybe_handoff_item, &data);
|
|
}
|
|
if (!data.handed_off) {
|
|
if (RingBuf_Put(&self->buf, item) < 0) {
|
|
return NULL;
|
|
}
|
|
}
|
|
Py_RETURN_NONE;
|
|
}
|
|
|
|
/*[clinic input]
|
|
@critical_section
|
|
_queue.SimpleQueue.put_nowait
|
|
item: object
|
|
|
|
Put an item into the queue without blocking.
|
|
|
|
This is exactly equivalent to `put(item)` and is only provided
|
|
for compatibility with the Queue class.
|
|
|
|
[clinic start generated code]*/
|
|
|
|
static PyObject *
|
|
_queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
|
|
/*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/
|
|
{
|
|
return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
|
|
}
|
|
|
|
static PyObject *
|
|
empty_error(PyTypeObject *cls)
|
|
{
|
|
PyObject *module = PyType_GetModule(cls);
|
|
assert(module != NULL);
|
|
simplequeue_state *state = simplequeue_get_state(module);
|
|
PyErr_SetNone(state->EmptyError);
|
|
return NULL;
|
|
}
|
|
|
|
/*[clinic input]
|
|
@critical_section
|
|
_queue.SimpleQueue.get
|
|
|
|
cls: defining_class
|
|
/
|
|
block: bool = True
|
|
timeout as timeout_obj: object = None
|
|
|
|
Remove and return an item from the queue.
|
|
|
|
If optional args 'block' is true and 'timeout' is None (the default),
|
|
block if necessary until an item is available. If 'timeout' is
|
|
a non-negative number, it blocks at most 'timeout' seconds and raises
|
|
the Empty exception if no item was available within that time.
|
|
Otherwise ('block' is false), return an item if one is immediately
|
|
available, else raise the Empty exception ('timeout' is ignored
|
|
in that case).
|
|
|
|
[clinic start generated code]*/
|
|
|
|
static PyObject *
|
|
_queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
|
|
int block, PyObject *timeout_obj)
|
|
/*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/
|
|
{
|
|
_PyTime_t endtime = 0;
|
|
|
|
// XXX Use PyThread_ParseTimeoutArg().
|
|
|
|
if (block != 0 && !Py_IsNone(timeout_obj)) {
|
|
/* With timeout */
|
|
_PyTime_t timeout;
|
|
if (_PyTime_FromSecondsObject(&timeout,
|
|
timeout_obj, _PyTime_ROUND_CEILING) < 0) {
|
|
return NULL;
|
|
}
|
|
if (timeout < 0) {
|
|
PyErr_SetString(PyExc_ValueError,
|
|
"'timeout' must be a non-negative number");
|
|
return NULL;
|
|
}
|
|
endtime = _PyDeadline_Init(timeout);
|
|
}
|
|
|
|
for (;;) {
|
|
if (!RingBuf_IsEmpty(&self->buf)) {
|
|
return RingBuf_Get(&self->buf);
|
|
}
|
|
|
|
if (!block) {
|
|
return empty_error(cls);
|
|
}
|
|
|
|
int64_t timeout_ns = -1;
|
|
if (endtime != 0) {
|
|
timeout_ns = _PyDeadline_Get(endtime);
|
|
if (timeout_ns < 0) {
|
|
return empty_error(cls);
|
|
}
|
|
}
|
|
|
|
bool waiting = 1;
|
|
self->has_threads_waiting = waiting;
|
|
|
|
PyObject *item = NULL;
|
|
int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting,
|
|
sizeof(bool), timeout_ns, &item,
|
|
/* detach */ 1);
|
|
switch (st) {
|
|
case Py_PARK_OK: {
|
|
assert(item != NULL);
|
|
return item;
|
|
}
|
|
case Py_PARK_TIMEOUT: {
|
|
return empty_error(cls);
|
|
}
|
|
case Py_PARK_INTR: {
|
|
// Interrupted
|
|
if (Py_MakePendingCalls() < 0) {
|
|
return NULL;
|
|
}
|
|
break;
|
|
}
|
|
case Py_PARK_AGAIN: {
|
|
// This should be impossible with the current implementation of
|
|
// PyParkingLot, but would be possible if critical sections /
|
|
// the GIL were released before the thread was added to the
|
|
// internal thread queue in the parking lot.
|
|
break;
|
|
}
|
|
default: {
|
|
Py_UNREACHABLE();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/*[clinic input]
|
|
@critical_section
|
|
_queue.SimpleQueue.get_nowait
|
|
|
|
cls: defining_class
|
|
/
|
|
|
|
Remove and return an item from the queue without blocking.
|
|
|
|
Only get an item if one is immediately available. Otherwise
|
|
raise the Empty exception.
|
|
[clinic start generated code]*/
|
|
|
|
static PyObject *
|
|
_queue_SimpleQueue_get_nowait_impl(simplequeueobject *self,
|
|
PyTypeObject *cls)
|
|
/*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/
|
|
{
|
|
return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None);
|
|
}
|
|
|
|
/*[clinic input]
|
|
@critical_section
|
|
_queue.SimpleQueue.empty -> bool
|
|
|
|
Return True if the queue is empty, False otherwise (not reliable!).
|
|
[clinic start generated code]*/
|
|
|
|
static int
|
|
_queue_SimpleQueue_empty_impl(simplequeueobject *self)
|
|
/*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/
|
|
{
|
|
return RingBuf_IsEmpty(&self->buf);
|
|
}
|
|
|
|
/*[clinic input]
|
|
@critical_section
|
|
_queue.SimpleQueue.qsize -> Py_ssize_t
|
|
|
|
Return the approximate size of the queue (not reliable!).
|
|
[clinic start generated code]*/
|
|
|
|
static Py_ssize_t
|
|
_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
|
|
/*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/
|
|
{
|
|
return RingBuf_Len(&self->buf);
|
|
}
|
|
|
|
static int
|
|
queue_traverse(PyObject *m, visitproc visit, void *arg)
|
|
{
|
|
simplequeue_state *state = simplequeue_get_state(m);
|
|
Py_VISIT(state->SimpleQueueType);
|
|
Py_VISIT(state->EmptyError);
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
queue_clear(PyObject *m)
|
|
{
|
|
simplequeue_state *state = simplequeue_get_state(m);
|
|
Py_CLEAR(state->SimpleQueueType);
|
|
Py_CLEAR(state->EmptyError);
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
queue_free(void *m)
|
|
{
|
|
queue_clear((PyObject *)m);
|
|
}
|
|
|
|
#include "clinic/_queuemodule.c.h"
|
|
|
|
|
|
static PyMethodDef simplequeue_methods[] = {
|
|
_QUEUE_SIMPLEQUEUE_EMPTY_METHODDEF
|
|
_QUEUE_SIMPLEQUEUE_GET_METHODDEF
|
|
_QUEUE_SIMPLEQUEUE_GET_NOWAIT_METHODDEF
|
|
_QUEUE_SIMPLEQUEUE_PUT_METHODDEF
|
|
_QUEUE_SIMPLEQUEUE_PUT_NOWAIT_METHODDEF
|
|
_QUEUE_SIMPLEQUEUE_QSIZE_METHODDEF
|
|
{"__class_getitem__", Py_GenericAlias,
|
|
METH_O|METH_CLASS, PyDoc_STR("See PEP 585")},
|
|
{NULL, NULL} /* sentinel */
|
|
};
|
|
|
|
static struct PyMemberDef simplequeue_members[] = {
|
|
{"__weaklistoffset__", Py_T_PYSSIZET, offsetof(simplequeueobject, weakreflist), Py_READONLY},
|
|
{NULL},
|
|
};
|
|
|
|
static PyType_Slot simplequeue_slots[] = {
|
|
{Py_tp_dealloc, simplequeue_dealloc},
|
|
{Py_tp_doc, (void *)simplequeue_new__doc__},
|
|
{Py_tp_traverse, simplequeue_traverse},
|
|
{Py_tp_clear, simplequeue_clear},
|
|
{Py_tp_members, simplequeue_members},
|
|
{Py_tp_methods, simplequeue_methods},
|
|
{Py_tp_new, simplequeue_new},
|
|
{0, NULL},
|
|
};
|
|
|
|
static PyType_Spec simplequeue_spec = {
|
|
.name = "_queue.SimpleQueue",
|
|
.basicsize = sizeof(simplequeueobject),
|
|
.flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC |
|
|
Py_TPFLAGS_IMMUTABLETYPE),
|
|
.slots = simplequeue_slots,
|
|
};
|
|
|
|
|
|
/* Initialization function */
|
|
|
|
PyDoc_STRVAR(queue_module_doc,
|
|
"C implementation of the Python queue module.\n\
|
|
This module is an implementation detail, please do not use it directly.");
|
|
|
|
static int
|
|
queuemodule_exec(PyObject *module)
|
|
{
|
|
simplequeue_state *state = simplequeue_get_state(module);
|
|
|
|
state->EmptyError = PyErr_NewExceptionWithDoc(
|
|
"_queue.Empty",
|
|
"Exception raised by Queue.get(block=0)/get_nowait().",
|
|
NULL, NULL);
|
|
if (state->EmptyError == NULL) {
|
|
return -1;
|
|
}
|
|
if (PyModule_AddObjectRef(module, "Empty", state->EmptyError) < 0) {
|
|
return -1;
|
|
}
|
|
|
|
state->SimpleQueueType = (PyTypeObject *)PyType_FromModuleAndSpec(
|
|
module, &simplequeue_spec, NULL);
|
|
if (state->SimpleQueueType == NULL) {
|
|
return -1;
|
|
}
|
|
if (PyModule_AddType(module, state->SimpleQueueType) < 0) {
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static PyModuleDef_Slot queuemodule_slots[] = {
|
|
{Py_mod_exec, queuemodule_exec},
|
|
{Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
|
|
{0, NULL}
|
|
};
|
|
|
|
|
|
static struct PyModuleDef queuemodule = {
|
|
.m_base = PyModuleDef_HEAD_INIT,
|
|
.m_name = "_queue",
|
|
.m_doc = queue_module_doc,
|
|
.m_size = sizeof(simplequeue_state),
|
|
.m_slots = queuemodule_slots,
|
|
.m_traverse = queue_traverse,
|
|
.m_clear = queue_clear,
|
|
.m_free = queue_free,
|
|
};
|
|
|
|
|
|
PyMODINIT_FUNC
|
|
PyInit__queue(void)
|
|
{
|
|
return PyModuleDef_Init(&queuemodule);
|
|
}
|