diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c index d82efd722a48..e1e49715459e 100644 --- a/fs/btrfs/async-thread.c +++ b/fs/btrfs/async-thread.c @@ -23,6 +23,10 @@ # include #include "async-thread.h" +#define WORK_QUEUED_BIT 0 +#define WORK_DONE_BIT 1 +#define WORK_ORDER_DONE_BIT 2 + /* * container for the kthread task pointer and the list of pending work * One of these is allocated per thread. @@ -88,6 +92,47 @@ static void check_busy_worker(struct btrfs_worker_thread *worker) } } +static noinline int run_ordered_completions(struct btrfs_workers *workers, + struct btrfs_work *work) +{ + unsigned long flags; + + if (!workers->ordered) + return 0; + + set_bit(WORK_DONE_BIT, &work->flags); + + spin_lock_irqsave(&workers->lock, flags); + + while(!list_empty(&workers->order_list)) { + work = list_entry(workers->order_list.next, + struct btrfs_work, order_list); + + if (!test_bit(WORK_DONE_BIT, &work->flags)) + break; + + /* we are going to call the ordered done function, but + * we leave the work item on the list as a barrier so + * that later work items that are done don't have their + * functions called before this one returns + */ + if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) + break; + + spin_unlock_irqrestore(&workers->lock, flags); + + work->ordered_func(work); + + /* now take the lock again and call the freeing code */ + spin_lock_irqsave(&workers->lock, flags); + list_del(&work->order_list); + work->ordered_free(work); + } + + spin_unlock_irqrestore(&workers->lock, flags); + return 0; +} + /* * main loop for servicing work items */ @@ -102,7 +147,7 @@ static int worker_loop(void *arg) cur = worker->pending.next; work = list_entry(cur, struct btrfs_work, list); list_del(&work->list); - clear_bit(0, &work->flags); + clear_bit(WORK_QUEUED_BIT, &work->flags); work->worker = worker; spin_unlock_irq(&worker->lock); @@ -110,8 +155,15 @@ static int worker_loop(void *arg) work->func(work); atomic_dec(&worker->num_pending); + /* + * unless this is an ordered work queue, + * 'work' was probably freed by func above. + */ + run_ordered_completions(worker->workers, work); + spin_lock_irq(&worker->lock); check_idle_worker(worker); + } worker->working = 0; if (freezing(current)) { @@ -154,10 +206,12 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max) workers->num_workers = 0; INIT_LIST_HEAD(&workers->worker_list); INIT_LIST_HEAD(&workers->idle_list); + INIT_LIST_HEAD(&workers->order_list); spin_lock_init(&workers->lock); workers->max_workers = max; workers->idle_thresh = 32; workers->name = name; + workers->ordered = 0; } /* @@ -296,7 +350,7 @@ int btrfs_requeue_work(struct btrfs_work *work) struct btrfs_worker_thread *worker = work->worker; unsigned long flags; - if (test_and_set_bit(0, &work->flags)) + if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) goto out; spin_lock_irqsave(&worker->lock, flags); @@ -330,10 +384,17 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) int wake = 0; /* don't requeue something already on a list */ - if (test_and_set_bit(0, &work->flags)) + if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) goto out; worker = find_worker(workers); + if (workers->ordered) { + spin_lock_irqsave(&workers->lock, flags); + list_add_tail(&work->order_list, &workers->order_list); + spin_unlock_irqrestore(&workers->lock, flags); + } else { + INIT_LIST_HEAD(&work->order_list); + } spin_lock_irqsave(&worker->lock, flags); atomic_inc(&worker->num_pending); diff --git a/fs/btrfs/async-thread.h b/fs/btrfs/async-thread.h index 4ec9a2ee0f9d..31be4ed8b63e 100644 --- a/fs/btrfs/async-thread.h +++ b/fs/btrfs/async-thread.h @@ -37,10 +37,16 @@ struct btrfs_worker_thread; */ struct btrfs_work { /* - * only func should be set to the function you want called + * func should be set to the function you want called * your work struct is passed as the only arg + * + * ordered_func must be set for work sent to an ordered work queue, + * and it is called to complete a given work item in the same + * order they were sent to the queue. */ void (*func)(struct btrfs_work *work); + void (*ordered_func)(struct btrfs_work *work); + void (*ordered_free)(struct btrfs_work *work); /* * flags should be set to zero. It is used to make sure the @@ -51,6 +57,7 @@ struct btrfs_work { /* don't touch these */ struct btrfs_worker_thread *worker; struct list_head list; + struct list_head order_list; }; struct btrfs_workers { @@ -63,6 +70,9 @@ struct btrfs_workers { /* once a worker has this many requests or fewer, it is idle */ int idle_thresh; + /* force completions in the order they were queued */ + int ordered; + /* list with all the work threads. The workers on the idle thread * may be actively servicing jobs, but they haven't yet hit the * idle thresh limit above. @@ -70,6 +80,12 @@ struct btrfs_workers { struct list_head worker_list; struct list_head idle_list; + /* + * when operating in ordered mode, this maintains the list + * of work items waiting for completion + */ + struct list_head order_list; + /* lock for finding the next worker thread to queue on */ spinlock_t lock; diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c index 94b4e50f6b2c..e0a28f705a64 100644 --- a/fs/btrfs/disk-io.c +++ b/fs/btrfs/disk-io.c @@ -80,7 +80,8 @@ struct async_submit_bio { struct inode *inode; struct bio *bio; struct list_head list; - extent_submit_bio_hook_t *submit_bio_hook; + extent_submit_bio_hook_t *submit_bio_start; + extent_submit_bio_hook_t *submit_bio_done; int rw; int mirror_num; unsigned long bio_flags; @@ -452,7 +453,18 @@ int btrfs_congested_async(struct btrfs_fs_info *info, int iodone) btrfs_async_submit_limit(info); } -static void run_one_async_submit(struct btrfs_work *work) +static void run_one_async_start(struct btrfs_work *work) +{ + struct btrfs_fs_info *fs_info; + struct async_submit_bio *async; + + async = container_of(work, struct async_submit_bio, work); + fs_info = BTRFS_I(async->inode)->root->fs_info; + async->submit_bio_start(async->inode, async->rw, async->bio, + async->mirror_num, async->bio_flags); +} + +static void run_one_async_done(struct btrfs_work *work) { struct btrfs_fs_info *fs_info; struct async_submit_bio *async; @@ -470,15 +482,23 @@ static void run_one_async_submit(struct btrfs_work *work) waitqueue_active(&fs_info->async_submit_wait)) wake_up(&fs_info->async_submit_wait); - async->submit_bio_hook(async->inode, async->rw, async->bio, + async->submit_bio_done(async->inode, async->rw, async->bio, async->mirror_num, async->bio_flags); +} + +static void run_one_async_free(struct btrfs_work *work) +{ + struct async_submit_bio *async; + + async = container_of(work, struct async_submit_bio, work); kfree(async); } int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, int rw, struct bio *bio, int mirror_num, unsigned long bio_flags, - extent_submit_bio_hook_t *submit_bio_hook) + extent_submit_bio_hook_t *submit_bio_start, + extent_submit_bio_hook_t *submit_bio_done) { struct async_submit_bio *async; int limit = btrfs_async_submit_limit(fs_info); @@ -491,8 +511,13 @@ int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, async->rw = rw; async->bio = bio; async->mirror_num = mirror_num; - async->submit_bio_hook = submit_bio_hook; - async->work.func = run_one_async_submit; + async->submit_bio_start = submit_bio_start; + async->submit_bio_done = submit_bio_done; + + async->work.func = run_one_async_start; + async->work.ordered_func = run_one_async_done; + async->work.ordered_free = run_one_async_free; + async->work.flags = 0; async->bio_flags = bio_flags; @@ -533,29 +558,25 @@ static int btree_csum_one_bio(struct bio *bio) return 0; } -static int __btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, - int mirror_num, unsigned long bio_flags) +static int __btree_submit_bio_start(struct inode *inode, int rw, + struct bio *bio, int mirror_num, + unsigned long bio_flags) { - struct btrfs_root *root = BTRFS_I(inode)->root; - int ret; - /* * when we're called for a write, we're already in the async * submission context. Just jump into btrfs_map_bio */ - if (rw & (1 << BIO_RW)) { - btree_csum_one_bio(bio); - return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, - mirror_num, 1); - } + btree_csum_one_bio(bio); + return 0; +} +static int __btree_submit_bio_done(struct inode *inode, int rw, struct bio *bio, + int mirror_num, unsigned long bio_flags) +{ /* - * called for a read, do the setup so that checksum validation - * can happen in the async kernel threads + * when we're called for a write, we're already in the async + * submission context. Just jump into btrfs_map_bio */ - ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1); - BUG_ON(ret); - return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1); } @@ -567,11 +588,22 @@ static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, * can happen in parallel across all CPUs */ if (!(rw & (1 << BIO_RW))) { - return __btree_submit_bio_hook(inode, rw, bio, mirror_num, 0); + int ret; + /* + * called for a read, do the setup so that checksum validation + * can happen in the async kernel threads + */ + ret = btrfs_bio_wq_end_io(BTRFS_I(inode)->root->fs_info, + bio, 1); + BUG_ON(ret); + + return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, + mirror_num, 1); } return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info, inode, rw, bio, mirror_num, 0, - __btree_submit_bio_hook); + __btree_submit_bio_start, + __btree_submit_bio_done); } static int btree_writepage(struct page *page, struct writeback_control *wbc) @@ -1534,7 +1566,8 @@ struct btrfs_root *open_ctree(struct super_block *sb, * were sent by the writeback daemons, improving overall locality * of the IO going down the pipe. */ - fs_info->workers.idle_thresh = 128; + fs_info->workers.idle_thresh = 8; + fs_info->workers.ordered = 1; btrfs_init_workers(&fs_info->fixup_workers, "fixup", 1); btrfs_init_workers(&fs_info->endio_workers, "endio", diff --git a/fs/btrfs/disk-io.h b/fs/btrfs/disk-io.h index 4eb1f1408d21..b8d5948fa279 100644 --- a/fs/btrfs/disk-io.h +++ b/fs/btrfs/disk-io.h @@ -72,7 +72,9 @@ int btrfs_bio_wq_end_io(struct btrfs_fs_info *info, struct bio *bio, int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, int rw, struct bio *bio, int mirror_num, unsigned long bio_flags, - extent_submit_bio_hook_t *submit_bio_hook); + extent_submit_bio_hook_t *submit_bio_start, + extent_submit_bio_hook_t *submit_bio_done); + int btrfs_congested_async(struct btrfs_fs_info *info, int iodone); unsigned long btrfs_async_submit_limit(struct btrfs_fs_info *info); int btrfs_write_tree_block(struct extent_buffer *buf); diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c index 806708dd7e38..3df0ffad976e 100644 --- a/fs/btrfs/inode.c +++ b/fs/btrfs/inode.c @@ -881,7 +881,7 @@ int btrfs_merge_bio_hook(struct page *page, unsigned long offset, * At IO completion time the cums attached on the ordered extent record * are inserted into the btree */ -int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, +int __btrfs_submit_bio_start(struct inode *inode, int rw, struct bio *bio, int mirror_num, unsigned long bio_flags) { struct btrfs_root *root = BTRFS_I(inode)->root; @@ -889,7 +889,21 @@ int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, ret = btrfs_csum_one_bio(root, inode, bio); BUG_ON(ret); + return 0; +} +/* + * in order to insert checksums into the metadata in large chunks, + * we wait until bio submission time. All the pages in the bio are + * checksummed and sums are attached onto the ordered extent record. + * + * At IO completion time the cums attached on the ordered extent record + * are inserted into the btree + */ +int __btrfs_submit_bio_done(struct inode *inode, int rw, struct bio *bio, + int mirror_num, unsigned long bio_flags) +{ + struct btrfs_root *root = BTRFS_I(inode)->root; return btrfs_map_bio(root, rw, bio, mirror_num, 1); } @@ -922,7 +936,8 @@ int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, /* we're doing a write, do the async checksumming */ return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info, inode, rw, bio, mirror_num, - bio_flags, __btrfs_submit_bio_hook); + bio_flags, __btrfs_submit_bio_start, + __btrfs_submit_bio_done); } mapit: