2
0
mirror of https://github.com/edk2-porting/linux-next.git synced 2024-12-25 05:34:00 +08:00
linux-next/fs/btrfs/async-thread.c
Filipe Manana f0cc2cd701 Btrfs: fix crash during unmount due to race with delayed inode workers
During unmount we can have a job from the delayed inode items work queue
still running, that can lead to at least two bad things:

1) A crash, because the worker can try to create a transaction just
   after the fs roots were freed;

2) A transaction leak, because the worker can create a transaction
   before the fs roots are freed and just after we committed the last
   transaction and after we stopped the transaction kthread.

A stack trace example of the crash:

 [79011.691214] kernel BUG at lib/radix-tree.c:982!
 [79011.692056] invalid opcode: 0000 [#1] PREEMPT SMP DEBUG_PAGEALLOC PTI
 [79011.693180] CPU: 3 PID: 1394 Comm: kworker/u8:2 Tainted: G        W         5.6.0-rc2-btrfs-next-54 #2
 (...)
 [79011.696789] Workqueue: btrfs-delayed-meta btrfs_work_helper [btrfs]
 [79011.697904] RIP: 0010:radix_tree_tag_set+0xe7/0x170
 (...)
 [79011.702014] RSP: 0018:ffffb3c84a317ca0 EFLAGS: 00010293
 [79011.702949] RAX: 0000000000000000 RBX: 0000000000000000 RCX: 0000000000000000
 [79011.704202] RDX: ffffb3c84a317cb0 RSI: ffffb3c84a317ca8 RDI: ffff8db3931340a0
 [79011.705463] RBP: 0000000000000005 R08: 0000000000000005 R09: ffffffff974629d0
 [79011.706756] R10: ffffb3c84a317bc0 R11: 0000000000000001 R12: ffff8db393134000
 [79011.708010] R13: ffff8db3931340a0 R14: ffff8db393134068 R15: 0000000000000001
 [79011.709270] FS:  0000000000000000(0000) GS:ffff8db3b6a00000(0000) knlGS:0000000000000000
 [79011.710699] CS:  0010 DS: 0000 ES: 0000 CR0: 0000000080050033
 [79011.711710] CR2: 00007f22c2a0a000 CR3: 0000000232ad4005 CR4: 00000000003606e0
 [79011.712958] DR0: 0000000000000000 DR1: 0000000000000000 DR2: 0000000000000000
 [79011.714205] DR3: 0000000000000000 DR6: 00000000fffe0ff0 DR7: 0000000000000400
 [79011.715448] Call Trace:
 [79011.715925]  record_root_in_trans+0x72/0xf0 [btrfs]
 [79011.716819]  btrfs_record_root_in_trans+0x4b/0x70 [btrfs]
 [79011.717925]  start_transaction+0xdd/0x5c0 [btrfs]
 [79011.718829]  btrfs_async_run_delayed_root+0x17e/0x2b0 [btrfs]
 [79011.719915]  btrfs_work_helper+0xaa/0x720 [btrfs]
 [79011.720773]  process_one_work+0x26d/0x6a0
 [79011.721497]  worker_thread+0x4f/0x3e0
 [79011.722153]  ? process_one_work+0x6a0/0x6a0
 [79011.722901]  kthread+0x103/0x140
 [79011.723481]  ? kthread_create_worker_on_cpu+0x70/0x70
 [79011.724379]  ret_from_fork+0x3a/0x50
 (...)

The following diagram shows a sequence of steps that lead to the crash
during ummount of the filesystem:

        CPU 1                                             CPU 2                                CPU 3

 btrfs_punch_hole()
   btrfs_btree_balance_dirty()
     btrfs_balance_delayed_items()
       --> sees
           fs_info->delayed_root->items
           with value 200, which is greater
           than
           BTRFS_DELAYED_BACKGROUND (128)
           and smaller than
           BTRFS_DELAYED_WRITEBACK (512)
       btrfs_wq_run_delayed_node()
         --> queues a job for
             fs_info->delayed_workers to run
             btrfs_async_run_delayed_root()

                                                                                            btrfs_async_run_delayed_root()
                                                                                              --> job queued by CPU 1

                                                                                              --> starts picking and running
                                                                                                  delayed nodes from the
                                                                                                  prepare_list list

                                                 close_ctree()

                                                   btrfs_delete_unused_bgs()

                                                   btrfs_commit_super()

                                                     btrfs_join_transaction()
                                                       --> gets transaction N

                                                     btrfs_commit_transaction(N)
                                                       --> set transaction state
                                                        to TRANTS_STATE_COMMIT_START

                                                                                             btrfs_first_prepared_delayed_node()
                                                                                               --> picks delayed node X through
                                                                                                   the prepared_list list

                                                       btrfs_run_delayed_items()

                                                         btrfs_first_delayed_node()
                                                           --> also picks delayed node X
                                                               but through the node_list
                                                               list

                                                         __btrfs_commit_inode_delayed_items()
                                                            --> runs all delayed items from
                                                                this node and drops the
                                                                node's item count to 0
                                                                through call to
                                                                btrfs_release_delayed_inode()

                                                         --> finishes running any remaining
                                                             delayed nodes

                                                       --> finishes transaction commit

                                                   --> stops cleaner and transaction threads

                                                   btrfs_free_fs_roots()
                                                     --> frees all roots and removes them
                                                         from the radix tree
                                                         fs_info->fs_roots_radix

                                                                                             btrfs_join_transaction()
                                                                                               start_transaction()
                                                                                                 btrfs_record_root_in_trans()
                                                                                                   record_root_in_trans()
                                                                                                     radix_tree_tag_set()
                                                                                                       --> crashes because
                                                                                                           the root is not in
                                                                                                           the radix tree
                                                                                                           anymore

If the worker is able to call btrfs_join_transaction() before the unmount
task frees the fs roots, we end up leaking a transaction and all its
resources, since after the call to btrfs_commit_super() and stopping the
transaction kthread, we don't expect to have any transaction open anymore.

When this situation happens the worker has a delayed node that has no
more items to run, since the task calling btrfs_run_delayed_items(),
which is doing a transaction commit, picks the same node and runs all
its items first.

We can not wait for the worker to complete when running delayed items
through btrfs_run_delayed_items(), because we call that function in
several phases of a transaction commit, and that could cause a deadlock
because the worker calls btrfs_join_transaction() and the task doing the
transaction commit may have already set the transaction state to
TRANS_STATE_COMMIT_DOING.

Also it's not possible to get into a situation where only some of the
items of a delayed node are added to the fs/subvolume tree in the current
transaction and the remaining ones in the next transaction, because when
running the items of a delayed inode we lock its mutex, effectively
waiting for the worker if the worker is running the items of the delayed
node already.

Since this can only cause issues when unmounting a filesystem, fix it in
a simple way by waiting for any jobs on the delayed workers queue before
calling btrfs_commit_supper() at close_ctree(). This works because at this
point no one can call btrfs_btree_balance_dirty() or
btrfs_balance_delayed_items(), and if we end up waiting for any worker to
complete, btrfs_commit_super() will commit the transaction created by the
worker.

CC: stable@vger.kernel.org # 4.4+
Signed-off-by: Filipe Manana <fdmanana@suse.com>
Reviewed-by: David Sterba <dsterba@suse.com>
Signed-off-by: David Sterba <dsterba@suse.com>
2020-03-23 17:01:51 +01:00

406 lines
10 KiB
C

// SPDX-License-Identifier: GPL-2.0
/*
* Copyright (C) 2007 Oracle. All rights reserved.
* Copyright (C) 2014 Fujitsu. All rights reserved.
*/
#include <linux/kthread.h>
#include <linux/slab.h>
#include <linux/list.h>
#include <linux/spinlock.h>
#include <linux/freezer.h>
#include "async-thread.h"
#include "ctree.h"
enum {
WORK_DONE_BIT,
WORK_ORDER_DONE_BIT,
WORK_HIGH_PRIO_BIT,
};
#define NO_THRESHOLD (-1)
#define DFT_THRESHOLD (32)
struct __btrfs_workqueue {
struct workqueue_struct *normal_wq;
/* File system this workqueue services */
struct btrfs_fs_info *fs_info;
/* List head pointing to ordered work list */
struct list_head ordered_list;
/* Spinlock for ordered_list */
spinlock_t list_lock;
/* Thresholding related variants */
atomic_t pending;
/* Up limit of concurrency workers */
int limit_active;
/* Current number of concurrency workers */
int current_active;
/* Threshold to change current_active */
int thresh;
unsigned int count;
spinlock_t thres_lock;
};
struct btrfs_workqueue {
struct __btrfs_workqueue *normal;
struct __btrfs_workqueue *high;
};
struct btrfs_fs_info * __pure btrfs_workqueue_owner(const struct __btrfs_workqueue *wq)
{
return wq->fs_info;
}
struct btrfs_fs_info * __pure btrfs_work_owner(const struct btrfs_work *work)
{
return work->wq->fs_info;
}
bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue *wq)
{
/*
* We could compare wq->normal->pending with num_online_cpus()
* to support "thresh == NO_THRESHOLD" case, but it requires
* moving up atomic_inc/dec in thresh_queue/exec_hook. Let's
* postpone it until someone needs the support of that case.
*/
if (wq->normal->thresh == NO_THRESHOLD)
return false;
return atomic_read(&wq->normal->pending) > wq->normal->thresh * 2;
}
static struct __btrfs_workqueue *
__btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info, const char *name,
unsigned int flags, int limit_active, int thresh)
{
struct __btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL);
if (!ret)
return NULL;
ret->fs_info = fs_info;
ret->limit_active = limit_active;
atomic_set(&ret->pending, 0);
if (thresh == 0)
thresh = DFT_THRESHOLD;
/* For low threshold, disabling threshold is a better choice */
if (thresh < DFT_THRESHOLD) {
ret->current_active = limit_active;
ret->thresh = NO_THRESHOLD;
} else {
/*
* For threshold-able wq, let its concurrency grow on demand.
* Use minimal max_active at alloc time to reduce resource
* usage.
*/
ret->current_active = 1;
ret->thresh = thresh;
}
if (flags & WQ_HIGHPRI)
ret->normal_wq = alloc_workqueue("btrfs-%s-high", flags,
ret->current_active, name);
else
ret->normal_wq = alloc_workqueue("btrfs-%s", flags,
ret->current_active, name);
if (!ret->normal_wq) {
kfree(ret);
return NULL;
}
INIT_LIST_HEAD(&ret->ordered_list);
spin_lock_init(&ret->list_lock);
spin_lock_init(&ret->thres_lock);
trace_btrfs_workqueue_alloc(ret, name, flags & WQ_HIGHPRI);
return ret;
}
static inline void
__btrfs_destroy_workqueue(struct __btrfs_workqueue *wq);
struct btrfs_workqueue *btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info,
const char *name,
unsigned int flags,
int limit_active,
int thresh)
{
struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL);
if (!ret)
return NULL;
ret->normal = __btrfs_alloc_workqueue(fs_info, name,
flags & ~WQ_HIGHPRI,
limit_active, thresh);
if (!ret->normal) {
kfree(ret);
return NULL;
}
if (flags & WQ_HIGHPRI) {
ret->high = __btrfs_alloc_workqueue(fs_info, name, flags,
limit_active, thresh);
if (!ret->high) {
__btrfs_destroy_workqueue(ret->normal);
kfree(ret);
return NULL;
}
}
return ret;
}
/*
* Hook for threshold which will be called in btrfs_queue_work.
* This hook WILL be called in IRQ handler context,
* so workqueue_set_max_active MUST NOT be called in this hook
*/
static inline void thresh_queue_hook(struct __btrfs_workqueue *wq)
{
if (wq->thresh == NO_THRESHOLD)
return;
atomic_inc(&wq->pending);
}
/*
* Hook for threshold which will be called before executing the work,
* This hook is called in kthread content.
* So workqueue_set_max_active is called here.
*/
static inline void thresh_exec_hook(struct __btrfs_workqueue *wq)
{
int new_current_active;
long pending;
int need_change = 0;
if (wq->thresh == NO_THRESHOLD)
return;
atomic_dec(&wq->pending);
spin_lock(&wq->thres_lock);
/*
* Use wq->count to limit the calling frequency of
* workqueue_set_max_active.
*/
wq->count++;
wq->count %= (wq->thresh / 4);
if (!wq->count)
goto out;
new_current_active = wq->current_active;
/*
* pending may be changed later, but it's OK since we really
* don't need it so accurate to calculate new_max_active.
*/
pending = atomic_read(&wq->pending);
if (pending > wq->thresh)
new_current_active++;
if (pending < wq->thresh / 2)
new_current_active--;
new_current_active = clamp_val(new_current_active, 1, wq->limit_active);
if (new_current_active != wq->current_active) {
need_change = 1;
wq->current_active = new_current_active;
}
out:
spin_unlock(&wq->thres_lock);
if (need_change) {
workqueue_set_max_active(wq->normal_wq, wq->current_active);
}
}
static void run_ordered_work(struct __btrfs_workqueue *wq,
struct btrfs_work *self)
{
struct list_head *list = &wq->ordered_list;
struct btrfs_work *work;
spinlock_t *lock = &wq->list_lock;
unsigned long flags;
bool free_self = false;
while (1) {
spin_lock_irqsave(lock, flags);
if (list_empty(list))
break;
work = list_entry(list->next, struct btrfs_work,
ordered_list);
if (!test_bit(WORK_DONE_BIT, &work->flags))
break;
/*
* we are going to call the ordered done function, but
* we leave the work item on the list as a barrier so
* that later work items that are done don't have their
* functions called before this one returns
*/
if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
break;
trace_btrfs_ordered_sched(work);
spin_unlock_irqrestore(lock, flags);
work->ordered_func(work);
/* now take the lock again and drop our item from the list */
spin_lock_irqsave(lock, flags);
list_del(&work->ordered_list);
spin_unlock_irqrestore(lock, flags);
if (work == self) {
/*
* This is the work item that the worker is currently
* executing.
*
* The kernel workqueue code guarantees non-reentrancy
* of work items. I.e., if a work item with the same
* address and work function is queued twice, the second
* execution is blocked until the first one finishes. A
* work item may be freed and recycled with the same
* work function; the workqueue code assumes that the
* original work item cannot depend on the recycled work
* item in that case (see find_worker_executing_work()).
*
* Note that different types of Btrfs work can depend on
* each other, and one type of work on one Btrfs
* filesystem may even depend on the same type of work
* on another Btrfs filesystem via, e.g., a loop device.
* Therefore, we must not allow the current work item to
* be recycled until we are really done, otherwise we
* break the above assumption and can deadlock.
*/
free_self = true;
} else {
/*
* We don't want to call the ordered free functions with
* the lock held.
*/
work->ordered_free(work);
/* NB: work must not be dereferenced past this point. */
trace_btrfs_all_work_done(wq->fs_info, work);
}
}
spin_unlock_irqrestore(lock, flags);
if (free_self) {
self->ordered_free(self);
/* NB: self must not be dereferenced past this point. */
trace_btrfs_all_work_done(wq->fs_info, self);
}
}
static void btrfs_work_helper(struct work_struct *normal_work)
{
struct btrfs_work *work = container_of(normal_work, struct btrfs_work,
normal_work);
struct __btrfs_workqueue *wq;
int need_order = 0;
/*
* We should not touch things inside work in the following cases:
* 1) after work->func() if it has no ordered_free
* Since the struct is freed in work->func().
* 2) after setting WORK_DONE_BIT
* The work may be freed in other threads almost instantly.
* So we save the needed things here.
*/
if (work->ordered_func)
need_order = 1;
wq = work->wq;
trace_btrfs_work_sched(work);
thresh_exec_hook(wq);
work->func(work);
if (need_order) {
set_bit(WORK_DONE_BIT, &work->flags);
run_ordered_work(wq, work);
} else {
/* NB: work must not be dereferenced past this point. */
trace_btrfs_all_work_done(wq->fs_info, work);
}
}
void btrfs_init_work(struct btrfs_work *work, btrfs_func_t func,
btrfs_func_t ordered_func, btrfs_func_t ordered_free)
{
work->func = func;
work->ordered_func = ordered_func;
work->ordered_free = ordered_free;
INIT_WORK(&work->normal_work, btrfs_work_helper);
INIT_LIST_HEAD(&work->ordered_list);
work->flags = 0;
}
static inline void __btrfs_queue_work(struct __btrfs_workqueue *wq,
struct btrfs_work *work)
{
unsigned long flags;
work->wq = wq;
thresh_queue_hook(wq);
if (work->ordered_func) {
spin_lock_irqsave(&wq->list_lock, flags);
list_add_tail(&work->ordered_list, &wq->ordered_list);
spin_unlock_irqrestore(&wq->list_lock, flags);
}
trace_btrfs_work_queued(work);
queue_work(wq->normal_wq, &work->normal_work);
}
void btrfs_queue_work(struct btrfs_workqueue *wq,
struct btrfs_work *work)
{
struct __btrfs_workqueue *dest_wq;
if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags) && wq->high)
dest_wq = wq->high;
else
dest_wq = wq->normal;
__btrfs_queue_work(dest_wq, work);
}
static inline void
__btrfs_destroy_workqueue(struct __btrfs_workqueue *wq)
{
destroy_workqueue(wq->normal_wq);
trace_btrfs_workqueue_destroy(wq);
kfree(wq);
}
void btrfs_destroy_workqueue(struct btrfs_workqueue *wq)
{
if (!wq)
return;
if (wq->high)
__btrfs_destroy_workqueue(wq->high);
__btrfs_destroy_workqueue(wq->normal);
kfree(wq);
}
void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int limit_active)
{
if (!wq)
return;
wq->normal->limit_active = limit_active;
if (wq->high)
wq->high->limit_active = limit_active;
}
void btrfs_set_work_high_priority(struct btrfs_work *work)
{
set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
}
void btrfs_flush_workqueue(struct btrfs_workqueue *wq)
{
if (wq->high)
flush_workqueue(wq->high->normal_wq);
flush_workqueue(wq->normal->normal_wq);
}