mirror of
https://mirrors.bfsu.edu.cn/git/linux.git
synced 2024-11-14 15:54:15 +08:00
io_uring: signal registered eventfd to process deferred task work
Some workloads rely on a registered eventfd (via io_uring_register_eventfd(3)) in order to wake up and process the io_uring. In the case of a ring setup with IORING_SETUP_DEFER_TASKRUN, that eventfd also needs to be signalled when there are tasks to run. This changes an old behaviour which assumed 1 eventfd signal implied at least 1 CQE, however only when this new flag is set (and so old users will not notice). This should be expected with the IORING_SETUP_DEFER_TASKRUN flag as it is not guaranteed that every task will result in a CQE. Signed-off-by: Dylan Yudaken <dylany@fb.com> Link: https://lore.kernel.org/r/20220830125013.570060-7-dylany@fb.com [axboe: fold in call_rcu() serialization fix] Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
parent
d8e9214f11
commit
21a091b970
@ -184,6 +184,8 @@ struct io_ev_fd {
|
|||||||
struct eventfd_ctx *cq_ev_fd;
|
struct eventfd_ctx *cq_ev_fd;
|
||||||
unsigned int eventfd_async: 1;
|
unsigned int eventfd_async: 1;
|
||||||
struct rcu_head rcu;
|
struct rcu_head rcu;
|
||||||
|
atomic_t refs;
|
||||||
|
atomic_t ops;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct io_alloc_cache {
|
struct io_alloc_cache {
|
||||||
|
@ -125,6 +125,11 @@ enum {
|
|||||||
IO_CHECK_CQ_DROPPED_BIT,
|
IO_CHECK_CQ_DROPPED_BIT,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
IO_EVENTFD_OP_SIGNAL_BIT,
|
||||||
|
IO_EVENTFD_OP_FREE_BIT,
|
||||||
|
};
|
||||||
|
|
||||||
struct io_defer_entry {
|
struct io_defer_entry {
|
||||||
struct list_head list;
|
struct list_head list;
|
||||||
struct io_kiocb *req;
|
struct io_kiocb *req;
|
||||||
@ -478,33 +483,28 @@ static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_eventfd_put(struct rcu_head *rcu)
|
|
||||||
|
static void io_eventfd_ops(struct rcu_head *rcu)
|
||||||
{
|
{
|
||||||
struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
|
struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
|
||||||
|
int ops = atomic_xchg(&ev_fd->ops, 0);
|
||||||
|
|
||||||
eventfd_ctx_put(ev_fd->cq_ev_fd);
|
if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT))
|
||||||
kfree(ev_fd);
|
eventfd_signal(ev_fd->cq_ev_fd, 1);
|
||||||
|
|
||||||
|
/* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback
|
||||||
|
* ordering in a race but if references are 0 we know we have to free
|
||||||
|
* it regardless.
|
||||||
|
*/
|
||||||
|
if (atomic_dec_and_test(&ev_fd->refs)) {
|
||||||
|
eventfd_ctx_put(ev_fd->cq_ev_fd);
|
||||||
|
kfree(ev_fd);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_eventfd_signal(struct io_ring_ctx *ctx)
|
static void io_eventfd_signal(struct io_ring_ctx *ctx)
|
||||||
{
|
{
|
||||||
struct io_ev_fd *ev_fd;
|
struct io_ev_fd *ev_fd = NULL;
|
||||||
bool skip;
|
|
||||||
|
|
||||||
spin_lock(&ctx->completion_lock);
|
|
||||||
/*
|
|
||||||
* Eventfd should only get triggered when at least one event has been
|
|
||||||
* posted. Some applications rely on the eventfd notification count only
|
|
||||||
* changing IFF a new CQE has been added to the CQ ring. There's no
|
|
||||||
* depedency on 1:1 relationship between how many times this function is
|
|
||||||
* called (and hence the eventfd count) and number of CQEs posted to the
|
|
||||||
* CQ ring.
|
|
||||||
*/
|
|
||||||
skip = ctx->cached_cq_tail == ctx->evfd_last_cq_tail;
|
|
||||||
ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
|
|
||||||
spin_unlock(&ctx->completion_lock);
|
|
||||||
if (skip)
|
|
||||||
return;
|
|
||||||
|
|
||||||
rcu_read_lock();
|
rcu_read_lock();
|
||||||
/*
|
/*
|
||||||
@ -522,13 +522,46 @@ static void io_eventfd_signal(struct io_ring_ctx *ctx)
|
|||||||
goto out;
|
goto out;
|
||||||
if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
|
if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
|
||||||
goto out;
|
goto out;
|
||||||
|
if (ev_fd->eventfd_async && !io_wq_current_is_worker())
|
||||||
|
goto out;
|
||||||
|
|
||||||
if (!ev_fd->eventfd_async || io_wq_current_is_worker())
|
if (likely(eventfd_signal_allowed())) {
|
||||||
eventfd_signal(ev_fd->cq_ev_fd, 1);
|
eventfd_signal(ev_fd->cq_ev_fd, 1);
|
||||||
|
} else {
|
||||||
|
atomic_inc(&ev_fd->refs);
|
||||||
|
if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops))
|
||||||
|
call_rcu(&ev_fd->rcu, io_eventfd_ops);
|
||||||
|
else
|
||||||
|
atomic_dec(&ev_fd->refs);
|
||||||
|
}
|
||||||
|
|
||||||
out:
|
out:
|
||||||
rcu_read_unlock();
|
rcu_read_unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
|
||||||
|
{
|
||||||
|
bool skip;
|
||||||
|
|
||||||
|
spin_lock(&ctx->completion_lock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Eventfd should only get triggered when at least one event has been
|
||||||
|
* posted. Some applications rely on the eventfd notification count
|
||||||
|
* only changing IFF a new CQE has been added to the CQ ring. There's
|
||||||
|
* no depedency on 1:1 relationship between how many times this
|
||||||
|
* function is called (and hence the eventfd count) and number of CQEs
|
||||||
|
* posted to the CQ ring.
|
||||||
|
*/
|
||||||
|
skip = ctx->cached_cq_tail == ctx->evfd_last_cq_tail;
|
||||||
|
ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
|
||||||
|
spin_unlock(&ctx->completion_lock);
|
||||||
|
if (skip)
|
||||||
|
return;
|
||||||
|
|
||||||
|
io_eventfd_signal(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
|
void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
|
||||||
{
|
{
|
||||||
if (ctx->off_timeout_used || ctx->drain_active) {
|
if (ctx->off_timeout_used || ctx->drain_active) {
|
||||||
@ -540,7 +573,7 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
|
|||||||
spin_unlock(&ctx->completion_lock);
|
spin_unlock(&ctx->completion_lock);
|
||||||
}
|
}
|
||||||
if (ctx->has_evfd)
|
if (ctx->has_evfd)
|
||||||
io_eventfd_signal(ctx);
|
io_eventfd_flush_signal(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx)
|
static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx)
|
||||||
@ -1071,6 +1104,8 @@ static void io_req_local_work_add(struct io_kiocb *req)
|
|||||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||||
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||||
|
|
||||||
|
if (ctx->has_evfd)
|
||||||
|
io_eventfd_signal(ctx);
|
||||||
io_cqring_wake(ctx);
|
io_cqring_wake(ctx);
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -2474,6 +2509,8 @@ static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg,
|
|||||||
ev_fd->eventfd_async = eventfd_async;
|
ev_fd->eventfd_async = eventfd_async;
|
||||||
ctx->has_evfd = true;
|
ctx->has_evfd = true;
|
||||||
rcu_assign_pointer(ctx->io_ev_fd, ev_fd);
|
rcu_assign_pointer(ctx->io_ev_fd, ev_fd);
|
||||||
|
atomic_set(&ev_fd->refs, 1);
|
||||||
|
atomic_set(&ev_fd->ops, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2486,7 +2523,8 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx)
|
|||||||
if (ev_fd) {
|
if (ev_fd) {
|
||||||
ctx->has_evfd = false;
|
ctx->has_evfd = false;
|
||||||
rcu_assign_pointer(ctx->io_ev_fd, NULL);
|
rcu_assign_pointer(ctx->io_ev_fd, NULL);
|
||||||
call_rcu(&ev_fd->rcu, io_eventfd_put);
|
if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_FREE_BIT), &ev_fd->ops))
|
||||||
|
call_rcu(&ev_fd->rcu, io_eventfd_ops);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user