diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 5ea0b62f276b..e9da812352b4 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -37,12 +37,32 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from, *from = to; } -int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, - int status, uint32_t sbflags) +static void dlm_callback_work(struct work_struct *work) { - struct dlm_ls *ls = lkb->lkb_resource->res_ls; + struct dlm_callback *cb = container_of(work, struct dlm_callback, work); + + if (cb->flags & DLM_CB_BAST) { + trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name, + cb->res_length); + cb->bastfn(cb->astparam, cb->mode); + } else if (cb->flags & DLM_CB_CAST) { + trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status, + cb->sb_flags, cb->res_name, cb->res_length); + cb->lkb_lksb->sb_status = cb->sb_status; + cb->lkb_lksb->sb_flags = cb->sb_flags; + cb->astfn(cb->astparam); + } + + kref_put(&cb->ref, dlm_release_callback); +} + +int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb) +{ + struct dlm_rsb *rsb = lkb->lkb_resource; int rv = DLM_ENQUEUE_CALLBACK_SUCCESS; - struct dlm_callback *cb; + struct dlm_ls *ls = rsb->res_ls; int copy_lvb = 0; int prev_mode; @@ -88,57 +108,46 @@ int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, } } - cb = dlm_allocate_cb(); - if (!cb) { + *cb = dlm_allocate_cb(); + if (!*cb) { rv = DLM_ENQUEUE_CALLBACK_FAILURE; goto out; } - cb->flags = flags; - cb->mode = mode; - cb->sb_status = status; - cb->sb_flags = (sbflags & 0x000000FF); - cb->copy_lvb = copy_lvb; - kref_init(&cb->ref); - if (!test_and_set_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags)) - rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED; + /* for tracing */ + (*cb)->lkb_id = lkb->lkb_id; + (*cb)->ls_id = ls->ls_global_id; + memcpy((*cb)->res_name, rsb->res_name, rsb->res_length); + (*cb)->res_length = rsb->res_length; - list_add_tail(&cb->list, &lkb->lkb_callbacks); + (*cb)->flags = flags; + (*cb)->mode = mode; + (*cb)->sb_status = status; + (*cb)->sb_flags = (sbflags & 0x000000FF); + (*cb)->copy_lvb = copy_lvb; + (*cb)->lkb_lksb = lkb->lkb_lksb; + kref_init(&(*cb)->ref); if (flags & DLM_CB_BAST) { lkb->lkb_last_bast_time = ktime_get(); - lkb->lkb_last_bast_mode = cb->mode; + lkb->lkb_last_bast_mode = mode; } else if (flags & DLM_CB_CAST) { - dlm_callback_set_last_ptr(&lkb->lkb_last_cast, cb); + dlm_callback_set_last_ptr(&lkb->lkb_last_cast, *cb); lkb->lkb_last_cast_time = ktime_get(); } - dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb); + dlm_callback_set_last_ptr(&lkb->lkb_last_cb, *cb); + rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED; - out: +out: return rv; } -int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb) -{ - /* oldest undelivered cb is callbacks first entry */ - *cb = list_first_entry_or_null(&lkb->lkb_callbacks, - struct dlm_callback, list); - if (!*cb) - return DLM_DEQUEUE_CALLBACK_EMPTY; - - /* remove it from callbacks so shift others down */ - list_del(&(*cb)->list); - if (list_empty(&lkb->lkb_callbacks)) - return DLM_DEQUEUE_CALLBACK_LAST; - - return DLM_DEQUEUE_CALLBACK_SUCCESS; -} - void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, - uint32_t sbflags) + uint32_t sbflags) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; + struct dlm_callback *cb; int rv; if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { @@ -146,18 +155,20 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, return; } - spin_lock(&lkb->lkb_cb_lock); - rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags); + rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, + &cb); switch (rv) { case DLM_ENQUEUE_CALLBACK_NEED_SCHED: - kref_get(&lkb->lkb_ref); + cb->astfn = lkb->lkb_astfn; + cb->bastfn = lkb->lkb_bastfn; + cb->astparam = lkb->lkb_astparam; + INIT_WORK(&cb->work, dlm_callback_work); spin_lock(&ls->ls_cb_lock); - if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { - list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay); - } else { - queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); - } + if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) + list_add(&cb->list, &ls->ls_cb_delay); + else + queue_work(ls->ls_callback_wq, &cb->work); spin_unlock(&ls->ls_cb_lock); break; case DLM_ENQUEUE_CALLBACK_SUCCESS: @@ -168,67 +179,12 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, WARN_ON_ONCE(1); break; } - spin_unlock(&lkb->lkb_cb_lock); -} - -void dlm_callback_work(struct work_struct *work) -{ - struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work); - struct dlm_ls *ls = lkb->lkb_resource->res_ls; - struct dlm_rsb *rsb = lkb->lkb_resource; - void (*castfn) (void *astparam); - void (*bastfn) (void *astparam, int mode); - struct dlm_callback *cb; - int rv; - - spin_lock(&lkb->lkb_cb_lock); - rv = dlm_dequeue_lkb_callback(lkb, &cb); - if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) { - clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); - spin_unlock(&lkb->lkb_cb_lock); - goto out; - } - spin_unlock(&lkb->lkb_cb_lock); - - for (;;) { - castfn = lkb->lkb_astfn; - bastfn = lkb->lkb_bastfn; - - if (cb->flags & DLM_CB_BAST) { - trace_dlm_bast(ls->ls_global_id, lkb->lkb_id, - cb->mode, rsb->res_name, - rsb->res_length); - bastfn(lkb->lkb_astparam, cb->mode); - } else if (cb->flags & DLM_CB_CAST) { - lkb->lkb_lksb->sb_status = cb->sb_status; - lkb->lkb_lksb->sb_flags = cb->sb_flags; - trace_dlm_ast(ls->ls_global_id, lkb->lkb_id, - cb->sb_flags, cb->sb_status, - rsb->res_name, rsb->res_length); - castfn(lkb->lkb_astparam); - } - - kref_put(&cb->ref, dlm_release_callback); - - spin_lock(&lkb->lkb_cb_lock); - rv = dlm_dequeue_lkb_callback(lkb, &cb); - if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) { - clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); - spin_unlock(&lkb->lkb_cb_lock); - break; - } - spin_unlock(&lkb->lkb_cb_lock); - } - -out: - /* undo kref_get from dlm_add_callback, may cause lkb to be freed */ - dlm_put_lkb(lkb); } int dlm_callback_start(struct dlm_ls *ls) { - ls->ls_callback_wq = alloc_workqueue("dlm_callback", - WQ_HIGHPRI | WQ_MEM_RECLAIM, 0); + ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback", + WQ_HIGHPRI | WQ_MEM_RECLAIM); if (!ls->ls_callback_wq) { log_print("can't start dlm_callback workqueue"); return -ENOMEM; @@ -257,7 +213,7 @@ void dlm_callback_suspend(struct dlm_ls *ls) void dlm_callback_resume(struct dlm_ls *ls) { - struct dlm_lkb *lkb, *safe; + struct dlm_callback *cb, *safe; int count = 0, sum = 0; bool empty; @@ -266,9 +222,9 @@ void dlm_callback_resume(struct dlm_ls *ls) more: spin_lock(&ls->ls_cb_lock); - list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) { - list_del_init(&lkb->lkb_cb_list); - queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); + list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) { + list_del(&cb->list); + queue_work(ls->ls_callback_wq, &cb->work); count++; if (count == MAX_CB_QUEUE) break; diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h index ce007892dc2d..9bd12409e1ee 100644 --- a/fs/dlm/ast.h +++ b/fs/dlm/ast.h @@ -14,19 +14,15 @@ #define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1 #define DLM_ENQUEUE_CALLBACK_SUCCESS 0 #define DLM_ENQUEUE_CALLBACK_FAILURE -1 -int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, - int status, uint32_t sbflags); -#define DLM_DEQUEUE_CALLBACK_EMPTY 2 -#define DLM_DEQUEUE_CALLBACK_LAST 1 -#define DLM_DEQUEUE_CALLBACK_SUCCESS 0 -int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb); +int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb); void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, uint32_t sbflags); void dlm_callback_set_last_ptr(struct dlm_callback **from, struct dlm_callback *to); void dlm_release_callback(struct kref *ref); -void dlm_callback_work(struct work_struct *work); int dlm_callback_start(struct dlm_ls *ls); void dlm_callback_stop(struct dlm_ls *ls); void dlm_callback_suspend(struct dlm_ls *ls); diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index a9137c90f348..cda1526fd15b 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -16,6 +16,7 @@ * This is the main header file to be included in each DLM source file. */ +#include #include #include #include @@ -204,8 +205,7 @@ struct dlm_args { #define DLM_IFL_OVERLAP_CANCEL_BIT 20 #define DLM_IFL_ENDOFLIFE_BIT 21 #define DLM_IFL_DEADLOCK_CANCEL_BIT 24 -#define DLM_IFL_CB_PENDING_BIT 25 -#define __DLM_IFL_MAX_BIT DLM_IFL_CB_PENDING_BIT +#define __DLM_IFL_MAX_BIT DLM_IFL_DEADLOCK_CANCEL_BIT /* lkb_dflags */ @@ -217,12 +217,45 @@ struct dlm_args { #define DLM_CB_CAST 0x00000001 #define DLM_CB_BAST 0x00000002 +/* much of this is just saving user space pointers associated with the + * lock that we pass back to the user lib with an ast + */ + +struct dlm_user_args { + struct dlm_user_proc *proc; /* each process that opens the lockspace + * device has private data + * (dlm_user_proc) on the struct file, + * the process's locks point back to it + */ + struct dlm_lksb lksb; + struct dlm_lksb __user *user_lksb; + void __user *castparam; + void __user *castaddr; + void __user *bastparam; + void __user *bastaddr; + uint64_t xid; +}; + struct dlm_callback { uint32_t flags; /* DLM_CBF_ */ int sb_status; /* copy to lksb status */ uint8_t sb_flags; /* copy to lksb flags */ int8_t mode; /* rq mode of bast, gr mode of cast */ - int copy_lvb; + bool copy_lvb; + struct dlm_lksb *lkb_lksb; + unsigned char lvbptr[DLM_USER_LVB_LEN]; + + union { + void *astparam; /* caller's ast arg */ + struct dlm_user_args ua; + }; + struct work_struct work; + void (*bastfn)(void *astparam, int mode); + void (*astfn)(void *astparam); + char res_name[DLM_RESNAME_MAXLEN]; + size_t res_length; + uint32_t ls_id; + uint32_t lkb_id; struct list_head list; struct kref ref; @@ -256,10 +289,6 @@ struct dlm_lkb { struct list_head lkb_ownqueue; /* list of locks for a process */ ktime_t lkb_timestamp; - spinlock_t lkb_cb_lock; - struct work_struct lkb_cb_work; - struct list_head lkb_cb_list; /* for ls_cb_delay or proc->asts */ - struct list_head lkb_callbacks; struct dlm_callback *lkb_last_cast; struct dlm_callback *lkb_last_cb; int lkb_last_bast_mode; @@ -688,23 +717,6 @@ struct dlm_ls { #define LSFL_CB_DELAY 9 #define LSFL_NODIR 10 -/* much of this is just saving user space pointers associated with the - lock that we pass back to the user lib with an ast */ - -struct dlm_user_args { - struct dlm_user_proc *proc; /* each process that opens the lockspace - device has private data - (dlm_user_proc) on the struct file, - the process's locks point back to it*/ - struct dlm_lksb lksb; - struct dlm_lksb __user *user_lksb; - void __user *castparam; - void __user *castaddr; - void __user *bastparam; - void __user *bastaddr; - uint64_t xid; -}; - #define DLM_PROC_FLAGS_CLOSING 1 #define DLM_PROC_FLAGS_COMPAT 2 diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index fd752dd03896..198672446dcd 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1203,10 +1203,6 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, kref_init(&lkb->lkb_ref); INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); - INIT_LIST_HEAD(&lkb->lkb_cb_list); - INIT_LIST_HEAD(&lkb->lkb_callbacks); - spin_lock_init(&lkb->lkb_cb_lock); - INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); idr_preload(GFP_NOFS); spin_lock(&ls->ls_lkbidr_spin); @@ -6003,6 +5999,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) { + struct dlm_callback *cb, *cb_safe; struct dlm_lkb *lkb, *safe; dlm_lock_recovery(ls); @@ -6032,10 +6029,9 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); } - list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { - dlm_purge_lkb_callbacks(lkb); - list_del_init(&lkb->lkb_cb_list); - dlm_put_lkb(lkb); + list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) { + list_del(&cb->list); + kref_put(&cb->ref, dlm_release_callback); } spin_unlock(&ls->ls_clear_proc_locks); @@ -6044,6 +6040,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) { + struct dlm_callback *cb, *cb_safe; struct dlm_lkb *lkb, *safe; while (1) { @@ -6073,10 +6070,9 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) spin_unlock(&proc->locks_spin); spin_lock(&proc->asts_spin); - list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { - dlm_purge_lkb_callbacks(lkb); - list_del_init(&lkb->lkb_cb_list); - dlm_put_lkb(lkb); + list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) { + list_del(&cb->list); + kref_put(&cb->ref, dlm_release_callback); } spin_unlock(&proc->asts_spin); } diff --git a/fs/dlm/user.c b/fs/dlm/user.c index fa99b6074e5c..334a6d64d413 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -21,6 +21,7 @@ #include "dlm_internal.h" #include "lockspace.h" #include "lock.h" +#include "lvb_table.h" #include "user.h" #include "ast.h" #include "config.h" @@ -144,24 +145,6 @@ static void compat_output(struct dlm_lock_result *res, } #endif -/* should held proc->asts_spin lock */ -void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb) -{ - struct dlm_callback *cb, *safe; - - list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) { - list_del(&cb->list); - kref_put(&cb->ref, dlm_release_callback); - } - - clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); - - /* invalidate */ - dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL); - dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL); - lkb->lkb_last_bast_mode = -1; -} - /* Figure out if this lock is at the end of its life and no longer available for the application to use. The lkb still exists until the final ast is read. A lock becomes EOL in three situations: @@ -198,6 +181,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, struct dlm_ls *ls; struct dlm_user_args *ua; struct dlm_user_proc *proc; + struct dlm_callback *cb; int rv; if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) || @@ -229,11 +213,18 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, spin_lock(&proc->asts_spin); - rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags); + rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, &cb); switch (rv) { case DLM_ENQUEUE_CALLBACK_NEED_SCHED: - kref_get(&lkb->lkb_ref); - list_add_tail(&lkb->lkb_cb_list, &proc->asts); + cb->ua = *ua; + cb->lkb_lksb = &cb->ua.lksb; + if (cb->copy_lvb) { + memcpy(cb->lvbptr, ua->lksb.sb_lvbptr, + DLM_USER_LVB_LEN); + cb->lkb_lksb->sb_lvbptr = cb->lvbptr; + } + + list_add_tail(&cb->list, &proc->asts); wake_up_interruptible(&proc->wait); break; case DLM_ENQUEUE_CALLBACK_SUCCESS: @@ -801,10 +792,8 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, loff_t *ppos) { struct dlm_user_proc *proc = file->private_data; - struct dlm_lkb *lkb; DECLARE_WAITQUEUE(wait, current); struct dlm_callback *cb; - struct dlm_rsb *rsb; int rv, ret; if (count == sizeof(struct dlm_device_version)) { @@ -824,8 +813,6 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, #endif return -EINVAL; - try_another: - /* do we really need this? can a read happen after a close? */ if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags)) return -EINVAL; @@ -860,55 +847,24 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count, without removing lkb_cb_list; so empty lkb_cb_list is always consistent with empty lkb_callbacks */ - lkb = list_first_entry(&proc->asts, struct dlm_lkb, lkb_cb_list); - - rv = dlm_dequeue_lkb_callback(lkb, &cb); - switch (rv) { - case DLM_DEQUEUE_CALLBACK_EMPTY: - /* this shouldn't happen; lkb should have been removed from - * list when last item was dequeued - */ - log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id); - list_del_init(&lkb->lkb_cb_list); - spin_unlock(&proc->asts_spin); - /* removes ref for proc->asts, may cause lkb to be freed */ - dlm_put_lkb(lkb); - WARN_ON_ONCE(1); - goto try_another; - case DLM_DEQUEUE_CALLBACK_LAST: - list_del_init(&lkb->lkb_cb_list); - clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); - break; - case DLM_DEQUEUE_CALLBACK_SUCCESS: - break; - default: - WARN_ON_ONCE(1); - break; - } + cb = list_first_entry(&proc->asts, struct dlm_callback, list); + list_del(&cb->list); spin_unlock(&proc->asts_spin); - rsb = lkb->lkb_resource; if (cb->flags & DLM_CB_BAST) { - trace_dlm_bast(rsb->res_ls->ls_global_id, lkb->lkb_id, - cb->mode, rsb->res_name, rsb->res_length); + trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name, + cb->res_length); } else if (cb->flags & DLM_CB_CAST) { - lkb->lkb_lksb->sb_status = cb->sb_status; - lkb->lkb_lksb->sb_flags = cb->sb_flags; - trace_dlm_ast(rsb->res_ls->ls_global_id, lkb->lkb_id, - cb->sb_flags, cb->sb_status, rsb->res_name, - rsb->res_length); + cb->lkb_lksb->sb_status = cb->sb_status; + cb->lkb_lksb->sb_flags = cb->sb_flags; + trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status, + cb->sb_flags, cb->res_name, cb->res_length); } - ret = copy_result_to_user(lkb->lkb_ua, + ret = copy_result_to_user(&cb->ua, test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags), cb->flags, cb->mode, cb->copy_lvb, buf, count); - kref_put(&cb->ref, dlm_release_callback); - - /* removes ref for proc->asts, may cause lkb to be freed */ - if (rv == DLM_DEQUEUE_CALLBACK_LAST) - dlm_put_lkb(lkb); - return ret; }