io_uring: IORING_OP_TIMEOUT support

There's been a few requests for functionality similar to io_getevents()
and epoll_wait(), where the user can specify a timeout for waiting on
events. I deliberately did not add support for this through the system
call initially to avoid overloading the args, but I can see that the use
cases for this are valid.

This adds support for IORING_OP_TIMEOUT. If a user wants to get woken
when waiting for events, simply submit one of these timeout commands
with your wait call (or before). This ensures that the application
sleeping on the CQ ring waiting for events will get woken. The timeout
command is passed in as a pointer to a struct timespec. Timeouts are
relative. The timeout command also includes a way to auto-cancel after
N events has passed.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
Jens Axboe 2019-09-17 12:26:57 -06:00
parent 9831a90ce6
commit 5262f56798
2 changed files with 146 additions and 5 deletions

View File

@ -200,6 +200,7 @@ struct io_ring_ctx {
struct io_uring_sqe *sq_sqes; struct io_uring_sqe *sq_sqes;
struct list_head defer_list; struct list_head defer_list;
struct list_head timeout_list;
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
/* IO offload */ /* IO offload */
@ -216,6 +217,7 @@ struct io_ring_ctx {
struct wait_queue_head cq_wait; struct wait_queue_head cq_wait;
struct fasync_struct *cq_fasync; struct fasync_struct *cq_fasync;
struct eventfd_ctx *cq_ev_fd; struct eventfd_ctx *cq_ev_fd;
atomic_t cq_timeouts;
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
struct io_rings *rings; struct io_rings *rings;
@ -283,6 +285,11 @@ struct io_poll_iocb {
struct wait_queue_entry wait; struct wait_queue_entry wait;
}; };
struct io_timeout {
struct file *file;
struct hrtimer timer;
};
/* /*
* NOTE! Each of the iocb union members has the file pointer * NOTE! Each of the iocb union members has the file pointer
* as the first entry in their struct definition. So you can * as the first entry in their struct definition. So you can
@ -294,6 +301,7 @@ struct io_kiocb {
struct file *file; struct file *file;
struct kiocb rw; struct kiocb rw;
struct io_poll_iocb poll; struct io_poll_iocb poll;
struct io_timeout timeout;
}; };
struct sqe_submit submit; struct sqe_submit submit;
@ -313,6 +321,7 @@ struct io_kiocb {
#define REQ_F_LINK_DONE 128 /* linked sqes done */ #define REQ_F_LINK_DONE 128 /* linked sqes done */
#define REQ_F_FAIL_LINK 256 /* fail rest of links */ #define REQ_F_FAIL_LINK 256 /* fail rest of links */
#define REQ_F_SHADOW_DRAIN 512 /* link-drain shadow req */ #define REQ_F_SHADOW_DRAIN 512 /* link-drain shadow req */
#define REQ_F_TIMEOUT 1024 /* timeout request */
u64 user_data; u64 user_data;
u32 result; u32 result;
u32 sequence; u32 sequence;
@ -344,6 +353,8 @@ struct io_submit_state {
}; };
static void io_sq_wq_submit_work(struct work_struct *work); static void io_sq_wq_submit_work(struct work_struct *work);
static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
long res);
static void __io_free_req(struct io_kiocb *req); static void __io_free_req(struct io_kiocb *req);
static struct kmem_cache *req_cachep; static struct kmem_cache *req_cachep;
@ -400,26 +411,30 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->poll_list); INIT_LIST_HEAD(&ctx->poll_list);
INIT_LIST_HEAD(&ctx->cancel_list); INIT_LIST_HEAD(&ctx->cancel_list);
INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
return ctx; return ctx;
} }
static inline bool io_sequence_defer(struct io_ring_ctx *ctx, static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
struct io_kiocb *req) struct io_kiocb *req)
{ {
if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN) /* timeout requests always honor sequence */
if (!(req->flags & REQ_F_TIMEOUT) &&
(req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
return false; return false;
return req->sequence != ctx->cached_cq_tail + ctx->rings->sq_dropped; return req->sequence != ctx->cached_cq_tail + ctx->rings->sq_dropped;
} }
static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) static struct io_kiocb *__io_get_deferred_req(struct io_ring_ctx *ctx,
struct list_head *list)
{ {
struct io_kiocb *req; struct io_kiocb *req;
if (list_empty(&ctx->defer_list)) if (list_empty(list))
return NULL; return NULL;
req = list_first_entry(&ctx->defer_list, struct io_kiocb, list); req = list_first_entry(list, struct io_kiocb, list);
if (!io_sequence_defer(ctx, req)) { if (!io_sequence_defer(ctx, req)) {
list_del_init(&req->list); list_del_init(&req->list);
return req; return req;
@ -428,6 +443,16 @@ static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
return NULL; return NULL;
} }
static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
{
return __io_get_deferred_req(ctx, &ctx->defer_list);
}
static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
{
return __io_get_deferred_req(ctx, &ctx->timeout_list);
}
static void __io_commit_cqring(struct io_ring_ctx *ctx) static void __io_commit_cqring(struct io_ring_ctx *ctx)
{ {
struct io_rings *rings = ctx->rings; struct io_rings *rings = ctx->rings;
@ -460,10 +485,36 @@ static inline void io_queue_async_work(struct io_ring_ctx *ctx,
queue_work(ctx->sqo_wq[rw], &req->work); queue_work(ctx->sqo_wq[rw], &req->work);
} }
static void io_kill_timeout(struct io_kiocb *req)
{
int ret;
ret = hrtimer_try_to_cancel(&req->timeout.timer);
if (ret != -1) {
atomic_inc(&req->ctx->cq_timeouts);
list_del(&req->list);
io_cqring_fill_event(req->ctx, req->user_data, 0);
__io_free_req(req);
}
}
static void io_kill_timeouts(struct io_ring_ctx *ctx)
{
struct io_kiocb *req, *tmp;
spin_lock_irq(&ctx->completion_lock);
list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
io_kill_timeout(req);
spin_unlock_irq(&ctx->completion_lock);
}
static void io_commit_cqring(struct io_ring_ctx *ctx) static void io_commit_cqring(struct io_ring_ctx *ctx)
{ {
struct io_kiocb *req; struct io_kiocb *req;
while ((req = io_get_timeout_req(ctx)) != NULL)
io_kill_timeout(req);
__io_commit_cqring(ctx); __io_commit_cqring(ctx);
while ((req = io_get_deferred_req(ctx)) != NULL) { while ((req = io_get_deferred_req(ctx)) != NULL) {
@ -1765,6 +1816,81 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return ipt.error; return ipt.error;
} }
static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
{
struct io_ring_ctx *ctx;
struct io_kiocb *req;
unsigned long flags;
req = container_of(timer, struct io_kiocb, timeout.timer);
ctx = req->ctx;
atomic_inc(&ctx->cq_timeouts);
spin_lock_irqsave(&ctx->completion_lock, flags);
list_del(&req->list);
io_cqring_fill_event(ctx, req->user_data, -ETIME);
io_commit_cqring(ctx);
spin_unlock_irqrestore(&ctx->completion_lock, flags);
io_cqring_ev_posted(ctx);
io_put_req(req);
return HRTIMER_NORESTART;
}
static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
unsigned count, req_dist, tail_index;
struct io_ring_ctx *ctx = req->ctx;
struct list_head *entry;
struct timespec ts;
if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->timeout_flags ||
sqe->len != 1)
return -EINVAL;
if (copy_from_user(&ts, (void __user *) (unsigned long) sqe->addr,
sizeof(ts)))
return -EFAULT;
/*
* sqe->off holds how many events that need to occur for this
* timeout event to be satisfied.
*/
count = READ_ONCE(sqe->off);
if (!count)
count = 1;
req->sequence = ctx->cached_sq_head + count - 1;
req->flags |= REQ_F_TIMEOUT;
/*
* Insertion sort, ensuring the first entry in the list is always
* the one we need first.
*/
tail_index = ctx->cached_cq_tail - ctx->rings->sq_dropped;
req_dist = req->sequence - tail_index;
spin_lock_irq(&ctx->completion_lock);
list_for_each_prev(entry, &ctx->timeout_list) {
struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
unsigned dist;
dist = nxt->sequence - tail_index;
if (req_dist >= dist)
break;
}
list_add(&req->list, entry);
spin_unlock_irq(&ctx->completion_lock);
hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
req->timeout.timer.function = io_timeout_fn;
hrtimer_start(&req->timeout.timer, timespec_to_ktime(ts),
HRTIMER_MODE_REL);
return 0;
}
static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req, static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
const struct io_uring_sqe *sqe) const struct io_uring_sqe *sqe)
{ {
@ -1842,6 +1968,9 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
case IORING_OP_RECVMSG: case IORING_OP_RECVMSG:
ret = io_recvmsg(req, s->sqe, force_nonblock); ret = io_recvmsg(req, s->sqe, force_nonblock);
break; break;
case IORING_OP_TIMEOUT:
ret = io_timeout(req, s->sqe);
break;
default: default:
ret = -EINVAL; ret = -EINVAL;
break; break;
@ -2599,6 +2728,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
const sigset_t __user *sig, size_t sigsz) const sigset_t __user *sig, size_t sigsz)
{ {
struct io_rings *rings = ctx->rings; struct io_rings *rings = ctx->rings;
unsigned nr_timeouts;
int ret; int ret;
if (io_cqring_events(rings) >= min_events) if (io_cqring_events(rings) >= min_events)
@ -2617,7 +2747,15 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
return ret; return ret;
} }
ret = wait_event_interruptible(ctx->wait, io_cqring_events(rings) >= min_events); nr_timeouts = atomic_read(&ctx->cq_timeouts);
/*
* Return if we have enough events, or if a timeout occured since
* we started waiting. For timeouts, we always want to return to
* userspace.
*/
ret = wait_event_interruptible(ctx->wait,
io_cqring_events(rings) >= min_events ||
atomic_read(&ctx->cq_timeouts) != nr_timeouts);
restore_saved_sigmask_unless(ret == -ERESTARTSYS); restore_saved_sigmask_unless(ret == -ERESTARTSYS);
if (ret == -ERESTARTSYS) if (ret == -ERESTARTSYS)
ret = -EINTR; ret = -EINTR;
@ -3288,6 +3426,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
percpu_ref_kill(&ctx->refs); percpu_ref_kill(&ctx->refs);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
io_kill_timeouts(ctx);
io_poll_remove_all(ctx); io_poll_remove_all(ctx);
io_iopoll_reap_events(ctx); io_iopoll_reap_events(ctx);
wait_for_completion(&ctx->ctx_done); wait_for_completion(&ctx->ctx_done);

View File

@ -28,6 +28,7 @@ struct io_uring_sqe {
__u16 poll_events; __u16 poll_events;
__u32 sync_range_flags; __u32 sync_range_flags;
__u32 msg_flags; __u32 msg_flags;
__u32 timeout_flags;
}; };
__u64 user_data; /* data to be passed back at completion time */ __u64 user_data; /* data to be passed back at completion time */
union { union {
@ -61,6 +62,7 @@ struct io_uring_sqe {
#define IORING_OP_SYNC_FILE_RANGE 8 #define IORING_OP_SYNC_FILE_RANGE 8
#define IORING_OP_SENDMSG 9 #define IORING_OP_SENDMSG 9
#define IORING_OP_RECVMSG 10 #define IORING_OP_RECVMSG 10
#define IORING_OP_TIMEOUT 11
/* /*
* sqe->fsync_flags * sqe->fsync_flags