diff --git a/fs/io-wq.c b/fs/io-wq.c index f72d53848dcb..a564f36e260c 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -36,8 +36,7 @@ enum { enum { IO_WQ_BIT_EXIT = 0, /* wq exiting */ - IO_WQ_BIT_CANCEL = 1, /* cancel work on list */ - IO_WQ_BIT_ERROR = 2, /* error on setup */ + IO_WQ_BIT_ERROR = 1, /* error on setup */ }; enum { @@ -561,12 +560,6 @@ get_next: next_hashed = wq_next_work(work); io_impersonate_work(worker, work); - /* - * OK to set IO_WQ_WORK_CANCEL even for uncancellable - * work, the worker function will do the right thing. - */ - if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) - work->flags |= IO_WQ_WORK_CANCEL; old_work = work; linked = wq->do_work(work); @@ -732,12 +725,6 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) return acct->nr_workers < acct->max_workers; } -static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) -{ - send_sig(SIGINT, worker->task, 1); - return false; -} - /* * Iterate the passed in list and call the specific function for each * worker that isn't exiting @@ -938,21 +925,6 @@ void io_wq_hash_work(struct io_wq_work *work, void *val) work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); } -void io_wq_cancel_all(struct io_wq *wq) -{ - int node; - - set_bit(IO_WQ_BIT_CANCEL, &wq->state); - - rcu_read_lock(); - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - - io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL); - } - rcu_read_unlock(); -} - struct io_cb_cancel_data { work_cancel_fn *fn; void *data; diff --git a/fs/io-wq.h b/fs/io-wq.h index 069496c6d4f9..b158f8addcf3 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -59,6 +59,7 @@ static inline void wq_list_add_tail(struct io_wq_work_node *node, list->last->next = node; list->last = node; } + node->next = NULL; } static inline void wq_list_cut(struct io_wq_work_list *list, @@ -128,8 +129,6 @@ static inline bool io_wq_is_hashed(struct io_wq_work *work) return work->flags & IO_WQ_WORK_HASHED; } -void io_wq_cancel_all(struct io_wq *wq); - typedef bool (work_cancel_fn)(struct io_wq_work *, void *); enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, diff --git a/fs/io_uring.c b/fs/io_uring.c index 6f9392c35eef..7e35283fc0b1 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1693,6 +1693,11 @@ static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx) return io_wq_current_is_worker(); } +static inline unsigned __io_cqring_events(struct io_ring_ctx *ctx) +{ + return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head); +} + static void io_cqring_ev_posted(struct io_ring_ctx *ctx) { if (waitqueue_active(&ctx->wait)) @@ -1703,15 +1708,6 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx) eventfd_signal(ctx->cq_ev_fd, 1); } -static void io_cqring_mark_overflow(struct io_ring_ctx *ctx) -{ - if (list_empty(&ctx->cq_overflow_list)) { - clear_bit(0, &ctx->sq_check_overflow); - clear_bit(0, &ctx->cq_check_overflow); - ctx->rings->sq_flags &= ~IORING_SQ_CQ_OVERFLOW; - } -} - /* Returns true if there are no backlogged entries after the flush */ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, struct task_struct *tsk, @@ -1721,23 +1717,13 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, struct io_kiocb *req, *tmp; struct io_uring_cqe *cqe; unsigned long flags; + bool all_flushed; LIST_HEAD(list); - if (!force) { - if (list_empty_careful(&ctx->cq_overflow_list)) - return true; - if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) == - rings->cq_ring_entries)) - return false; - } + if (!force && __io_cqring_events(ctx) == rings->cq_ring_entries) + return false; spin_lock_irqsave(&ctx->completion_lock, flags); - - /* if force is set, the ring is going away. always drop after that */ - if (force) - ctx->cq_overflow_flushed = 1; - - cqe = NULL; list_for_each_entry_safe(req, tmp, &ctx->cq_overflow_list, compl.list) { if (!io_match_task(req, tsk, files)) continue; @@ -1758,9 +1744,14 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, } } - io_commit_cqring(ctx); - io_cqring_mark_overflow(ctx); + all_flushed = list_empty(&ctx->cq_overflow_list); + if (all_flushed) { + clear_bit(0, &ctx->sq_check_overflow); + clear_bit(0, &ctx->cq_check_overflow); + ctx->rings->sq_flags &= ~IORING_SQ_CQ_OVERFLOW; + } + io_commit_cqring(ctx); spin_unlock_irqrestore(&ctx->completion_lock, flags); io_cqring_ev_posted(ctx); @@ -1770,7 +1761,7 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, io_put_req(req); } - return cqe != NULL; + return all_flushed; } static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags) @@ -2320,8 +2311,6 @@ static void io_double_put_req(struct io_kiocb *req) static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush) { - struct io_rings *rings = ctx->rings; - if (test_bit(0, &ctx->cq_check_overflow)) { /* * noflush == true is from the waitqueue handler, just ensure @@ -2336,7 +2325,7 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush) /* See comment at the top of this file */ smp_rmb(); - return ctx->cached_cq_tail - READ_ONCE(rings->cq.head); + return __io_cqring_events(ctx); } static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) @@ -3136,9 +3125,7 @@ static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov, iov[0].iov_len = kbuf->len; return 0; } - if (!req->rw.len) - return 0; - else if (req->rw.len > 1) + if (req->rw.len != 1) return -EINVAL; #ifdef CONFIG_COMPAT @@ -3784,6 +3771,8 @@ static int io_shutdown(struct io_kiocb *req, bool force_nonblock) return -ENOTSOCK; ret = __sys_shutdown_sock(sock, req->shutdown.how); + if (ret < 0) + req_set_fail_links(req); io_req_complete(req, ret); return 0; #else @@ -6107,15 +6096,15 @@ static void io_req_drop_files(struct io_kiocb *req) struct io_uring_task *tctx = req->task->io_uring; unsigned long flags; - spin_lock_irqsave(&ctx->inflight_lock, flags); - list_del(&req->inflight_entry); - if (atomic_read(&tctx->in_idle)) - wake_up(&tctx->wait); - spin_unlock_irqrestore(&ctx->inflight_lock, flags); - req->flags &= ~REQ_F_INFLIGHT; put_files_struct(req->work.identity->files); put_nsproxy(req->work.identity->nsproxy); + spin_lock_irqsave(&ctx->inflight_lock, flags); + list_del(&req->inflight_entry); + spin_unlock_irqrestore(&ctx->inflight_lock, flags); + req->flags &= ~REQ_F_INFLIGHT; req->work.flags &= ~IO_WQ_WORK_FILES; + if (atomic_read(&tctx->in_idle)) + wake_up(&tctx->wait); } static void __io_clean_op(struct io_kiocb *req) @@ -6343,19 +6332,28 @@ static struct io_wq_work *io_wq_submit_work(struct io_wq_work *work) } if (ret) { - /* - * io_iopoll_complete() does not hold completion_lock to complete - * polled io, so here for polled io, just mark it done and still let - * io_iopoll_complete() complete it. - */ - if (req->ctx->flags & IORING_SETUP_IOPOLL) { - struct kiocb *kiocb = &req->rw.kiocb; + struct io_ring_ctx *lock_ctx = NULL; - kiocb_done(kiocb, ret, NULL); - } else { - req_set_fail_links(req); - io_req_complete(req, ret); - } + if (req->ctx->flags & IORING_SETUP_IOPOLL) + lock_ctx = req->ctx; + + /* + * io_iopoll_complete() does not hold completion_lock to + * complete polled io, so here for polled io, we can not call + * io_req_complete() directly, otherwise there maybe concurrent + * access to cqring, defer_list, etc, which is not safe. Given + * that io_iopoll_complete() is always called under uring_lock, + * so here for polled io, we also get uring_lock to complete + * it. + */ + if (lock_ctx) + mutex_lock(&lock_ctx->uring_lock); + + req_set_fail_links(req); + io_req_complete(req, ret); + + if (lock_ctx) + mutex_unlock(&lock_ctx->uring_lock); } return io_steal_work(req); @@ -6824,8 +6822,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) /* if we have a backlog and couldn't flush it all, return BUSY */ if (test_bit(0, &ctx->sq_check_overflow)) { - if (!list_empty(&ctx->cq_overflow_list) && - !io_cqring_overflow_flush(ctx, false, NULL, NULL)) + if (!io_cqring_overflow_flush(ctx, false, NULL, NULL)) return -EBUSY; } @@ -8155,10 +8152,13 @@ static void io_unaccount_mem(struct io_ring_ctx *ctx, unsigned long nr_pages, __io_unaccount_mem(ctx->user, nr_pages); if (ctx->mm_account) { - if (acct == ACCT_LOCKED) + if (acct == ACCT_LOCKED) { + mmap_write_lock(ctx->mm_account); ctx->mm_account->locked_vm -= nr_pages; - else if (acct == ACCT_PINNED) + mmap_write_unlock(ctx->mm_account); + }else if (acct == ACCT_PINNED) { atomic64_sub(nr_pages, &ctx->mm_account->pinned_vm); + } } } @@ -8174,10 +8174,13 @@ static int io_account_mem(struct io_ring_ctx *ctx, unsigned long nr_pages, } if (ctx->mm_account) { - if (acct == ACCT_LOCKED) + if (acct == ACCT_LOCKED) { + mmap_write_lock(ctx->mm_account); ctx->mm_account->locked_vm += nr_pages; - else if (acct == ACCT_PINNED) + mmap_write_unlock(ctx->mm_account); + } else if (acct == ACCT_PINNED) { atomic64_add(nr_pages, &ctx->mm_account->pinned_vm); + } } return 0; @@ -8643,10 +8646,19 @@ static void io_ring_exit_work(struct work_struct *work) io_ring_ctx_free(ctx); } +static bool io_cancel_ctx_cb(struct io_wq_work *work, void *data) +{ + struct io_kiocb *req = container_of(work, struct io_kiocb, work); + + return req->ctx == data; +} + static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) { mutex_lock(&ctx->uring_lock); percpu_ref_kill(&ctx->refs); + /* if force is set, the ring is going away. always drop after that */ + ctx->cq_overflow_flushed = 1; if (ctx->rings) io_cqring_overflow_flush(ctx, true, NULL, NULL); mutex_unlock(&ctx->uring_lock); @@ -8655,7 +8667,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) io_poll_remove_all(ctx, NULL, NULL); if (ctx->io_wq) - io_wq_cancel_all(ctx->io_wq); + io_wq_cancel_cb(ctx->io_wq, io_cancel_ctx_cb, ctx, true); /* if we failed setting up the ctx, we might not have any rings */ io_iopoll_try_reap_events(ctx); @@ -8798,9 +8810,9 @@ static void __io_uring_cancel_task_requests(struct io_ring_ctx *ctx, ret |= io_poll_remove_all(ctx, task, NULL); ret |= io_kill_timeouts(ctx, task, NULL); + ret |= io_run_task_work(); if (!ret) break; - io_run_task_work(); cond_resched(); } } @@ -8849,10 +8861,9 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx, static int io_uring_add_task_file(struct io_ring_ctx *ctx, struct file *file) { struct io_uring_task *tctx = current->io_uring; + int ret; if (unlikely(!tctx)) { - int ret; - ret = io_uring_alloc_task_context(current); if (unlikely(ret)) return ret; @@ -8863,7 +8874,12 @@ static int io_uring_add_task_file(struct io_ring_ctx *ctx, struct file *file) if (!old) { get_file(file); - xa_store(&tctx->xa, (unsigned long)file, file, GFP_KERNEL); + ret = xa_err(xa_store(&tctx->xa, (unsigned long)file, + file, GFP_KERNEL)); + if (ret) { + fput(file); + return ret; + } } tctx->last = file; } @@ -8986,9 +9002,9 @@ void __io_uring_task_cancel(void) if (inflight != tctx_inflight(tctx)) continue; schedule(); + finish_wait(&tctx->wait, &wait); } while (1); - finish_wait(&tctx->wait, &wait); atomic_dec(&tctx->in_idle); } @@ -9156,10 +9172,13 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, */ ret = 0; if (ctx->flags & IORING_SETUP_SQPOLL) { - io_ring_submit_lock(ctx, (ctx->flags & IORING_SETUP_IOPOLL)); - if (!list_empty_careful(&ctx->cq_overflow_list)) + if (!list_empty_careful(&ctx->cq_overflow_list)) { + bool needs_lock = ctx->flags & IORING_SETUP_IOPOLL; + + io_ring_submit_lock(ctx, needs_lock); io_cqring_overflow_flush(ctx, false, NULL, NULL); - io_ring_submit_unlock(ctx, (ctx->flags & IORING_SETUP_IOPOLL)); + io_ring_submit_unlock(ctx, needs_lock); + } if (flags & IORING_ENTER_SQ_WAKEUP) wake_up(&ctx->sq_data->wait); if (flags & IORING_ENTER_SQ_WAIT) @@ -9369,55 +9388,52 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx, return 0; } +static int io_uring_install_fd(struct io_ring_ctx *ctx, struct file *file) +{ + int ret, fd; + + fd = get_unused_fd_flags(O_RDWR | O_CLOEXEC); + if (fd < 0) + return fd; + + ret = io_uring_add_task_file(ctx, file); + if (ret) { + put_unused_fd(fd); + return ret; + } + fd_install(fd, file); + return fd; +} + /* * Allocate an anonymous fd, this is what constitutes the application * visible backing of an io_uring instance. The application mmaps this * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled, * we have to tie this fd to a socket for file garbage collection purposes. */ -static int io_uring_get_fd(struct io_ring_ctx *ctx) +static struct file *io_uring_get_file(struct io_ring_ctx *ctx) { struct file *file; - int ret; - int fd; - #if defined(CONFIG_UNIX) + int ret; + ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP, &ctx->ring_sock); if (ret) - return ret; + return ERR_PTR(ret); #endif - ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC); - if (ret < 0) - goto err; - fd = ret; - file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx, O_RDWR | O_CLOEXEC); +#if defined(CONFIG_UNIX) if (IS_ERR(file)) { - put_unused_fd(fd); - ret = PTR_ERR(file); - goto err; + sock_release(ctx->ring_sock); + ctx->ring_sock = NULL; + } else { + ctx->ring_sock->file = file; } - -#if defined(CONFIG_UNIX) - ctx->ring_sock->file = file; #endif - ret = io_uring_add_task_file(ctx, file); - if (ret) { - fput(file); - put_unused_fd(fd); - goto err; - } - fd_install(fd, file); - return fd; -err: -#if defined(CONFIG_UNIX) - sock_release(ctx->ring_sock); - ctx->ring_sock = NULL; -#endif - return ret; + return file; } static int io_uring_create(unsigned entries, struct io_uring_params *p, @@ -9425,6 +9441,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, { struct user_struct *user = NULL; struct io_ring_ctx *ctx; + struct file *file; bool limit_mem; int ret; @@ -9572,13 +9589,22 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, goto err; } + file = io_uring_get_file(ctx); + if (IS_ERR(file)) { + ret = PTR_ERR(file); + goto err; + } + /* * Install ring fd as the very last thing, so we don't risk someone * having closed it before we finish setup */ - ret = io_uring_get_fd(ctx); - if (ret < 0) - goto err; + ret = io_uring_install_fd(ctx, file); + if (ret < 0) { + /* fput will clean it up */ + fput(file); + return ret; + } trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags); return ret;