From 116978854427fd7abbeb328768e75b49a7298520 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 2 Aug 2024 13:26:38 -0400 Subject: [PATCH 01/11] dlm: cleanup memory allocation helpers This patch removes a unnecessary parameter from DLM memory allocation helpers and reduce some functions by just directly reply the pointer address of the allocated memory. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lock.c | 4 ++-- fs/dlm/memory.c | 19 +++++-------------- fs/dlm/memory.h | 4 ++-- 3 files changed, 9 insertions(+), 18 deletions(-) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 8bee4f444afd..6930d7c57216 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -600,7 +600,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, { struct dlm_rsb *r; - r = dlm_allocate_rsb(ls); + r = dlm_allocate_rsb(); if (!r) return -ENOMEM; @@ -1497,7 +1497,7 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, limit.max = end; limit.min = start; - lkb = dlm_allocate_lkb(ls); + lkb = dlm_allocate_lkb(); if (!lkb) return -ENOMEM; diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c index 8c44b954c166..442898cf7185 100644 --- a/fs/dlm/memory.c +++ b/fs/dlm/memory.c @@ -84,10 +84,7 @@ void dlm_memory_exit(void) char *dlm_allocate_lvb(struct dlm_ls *ls) { - char *p; - - p = kzalloc(ls->ls_lvblen, GFP_ATOMIC); - return p; + return kzalloc(ls->ls_lvblen, GFP_ATOMIC); } void dlm_free_lvb(char *p) @@ -95,12 +92,9 @@ void dlm_free_lvb(char *p) kfree(p); } -struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls) +struct dlm_rsb *dlm_allocate_rsb(void) { - struct dlm_rsb *r; - - r = kmem_cache_zalloc(rsb_cache, GFP_ATOMIC); - return r; + return kmem_cache_zalloc(rsb_cache, GFP_ATOMIC); } static void __free_rsb_rcu(struct rcu_head *rcu) @@ -116,12 +110,9 @@ void dlm_free_rsb(struct dlm_rsb *r) call_rcu(&r->rcu, __free_rsb_rcu); } -struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls) +struct dlm_lkb *dlm_allocate_lkb(void) { - struct dlm_lkb *lkb; - - lkb = kmem_cache_zalloc(lkb_cache, GFP_ATOMIC); - return lkb; + return kmem_cache_zalloc(lkb_cache, GFP_ATOMIC); } void dlm_free_lkb(struct dlm_lkb *lkb) diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h index 15198d46b42a..551b6b788489 100644 --- a/fs/dlm/memory.h +++ b/fs/dlm/memory.h @@ -14,9 +14,9 @@ int dlm_memory_init(void); void dlm_memory_exit(void); -struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls); +struct dlm_rsb *dlm_allocate_rsb(void); void dlm_free_rsb(struct dlm_rsb *r); -struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls); +struct dlm_lkb *dlm_allocate_lkb(void); void dlm_free_lkb(struct dlm_lkb *l); char *dlm_allocate_lvb(struct dlm_ls *ls); void dlm_free_lvb(char *l); From d3b3d2d8e1aa394fc5fde4c0d3e32f8697c2b42c Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 2 Aug 2024 13:26:39 -0400 Subject: [PATCH 02/11] dlm: remove unnecessary refcounts This patch removes unnecessary refcounts that are obviously not necessary because either when the pointer is passed as parameter or it is part of a list we should already hold a reference to it. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lock.c | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 6930d7c57216..720715ddaf48 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1442,18 +1442,6 @@ static void deactivate_rsb(struct kref *kref) } } -/* See comment for unhold_lkb */ - -static void unhold_rsb(struct dlm_rsb *r) -{ - int rv; - - /* inactive rsbs are not ref counted */ - WARN_ON(rsb_flag(r, RSB_INACTIVE)); - rv = kref_put(&r->res_ref, deactivate_rsb); - DLM_ASSERT(!rv, dlm_dump_rsb(r);); -} - void free_inactive_rsb(struct dlm_rsb *r) { WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE)); @@ -1675,10 +1663,8 @@ static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) { - hold_lkb(lkb); del_lkb(r, lkb); add_lkb(r, lkb, sts); - unhold_lkb(lkb); } static int msg_reply_type(int mstype) @@ -5409,7 +5395,6 @@ void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list) return; list_for_each_entry(r, root_list, res_root_list) { - hold_rsb(r); lock_rsb(r); if (is_master(r)) { purge_dead_list(ls, r, &r->res_grantqueue, @@ -5420,7 +5405,7 @@ void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list) nodeid_gone, &lkb_count); } unlock_rsb(r); - unhold_rsb(r); + cond_resched(); } From 90ad918e371fcb7ea4cb6c0a391acc6bba51662e Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 2 Aug 2024 13:26:40 -0400 Subject: [PATCH 03/11] dlm: never return invalid nodeid by dlm_our_nodeid() This patch will remote the return of an invalid nodeid value when local_comm is not set. This case should never happen as the DLM stack tries to compare valid nodeids with an invalid nodeid returned by dlm_our_nodeid(). Instead we let it crash to getting at least recognized if we running into such state. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/config.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 99952234799e..eac96f1c1d74 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -928,7 +928,7 @@ int dlm_comm_seq(int nodeid, uint32_t *seq) int dlm_our_nodeid(void) { - return local_comm ? local_comm->nodeid : 0; + return local_comm->nodeid; } /* num 0 is first addr, num 1 is second addr */ From d47b822974b8d4da6f22be5341afd4ce6bca6a9f Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 2 Aug 2024 13:26:41 -0400 Subject: [PATCH 04/11] dlm: warn about invalid nodeid comparsions This patch adds a warn on if is_master() and dlm_is_removed() checks on invalid nodeid states that are probably not what the caller wants to do here. The is_master() function checking on r->res_nodeid is invalid when it is set to -1, whereas the dlm_is_removed() has a different meaning as "nodeid member" and also 0 is invalid. We run into these cases and this patch changes those cases as we never will run into them. There should be no functional changes as the condition should return the same result. However this patch signals now on caller level that there might be an "extra" case to handle here. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lock.c | 6 +++--- fs/dlm/lock.h | 2 ++ fs/dlm/member.c | 2 ++ fs/dlm/recover.c | 9 +++++---- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 720715ddaf48..30aec123a483 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1151,7 +1151,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no r->res_dir_nodeid = our_nodeid; } - if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { + if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) { /* Recovery uses this function to set a new master when * the previous master failed. Setting NEW_MASTER will * force dlm_recover_masters to call recover_master on this @@ -5283,7 +5283,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) case DLM_MSG_LOOKUP: case DLM_MSG_REQUEST: _request_lock(r, lkb); - if (is_master(r)) + if (r->res_nodeid != -1 && is_master(r)) confirm_master(r, 0); break; case DLM_MSG_CONVERT: @@ -5396,7 +5396,7 @@ void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list) list_for_each_entry(r, root_list, res_root_list) { lock_rsb(r); - if (is_master(r)) { + if (r->res_nodeid != -1 && is_master(r)) { purge_dead_list(ls, r, &r->res_grantqueue, nodeid_gone, &lkb_count); purge_dead_list(ls, r, &r->res_convertqueue, diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index 4ed8d36f9c6d..b23d7b854ed4 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -66,6 +66,8 @@ int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, static inline int is_master(struct dlm_rsb *r) { + WARN_ON_ONCE(r->res_nodeid == -1); + return !r->res_nodeid; } diff --git a/fs/dlm/member.c b/fs/dlm/member.c index a7ee7fd2b9d3..c9661906568a 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -366,6 +366,8 @@ int dlm_is_member(struct dlm_ls *ls, int nodeid) int dlm_is_removed(struct dlm_ls *ls, int nodeid) { + WARN_ON_ONCE(!nodeid || nodeid == -1); + if (find_memb(&ls->ls_nodes_gone, nodeid)) return 1; return 0; diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index c7afb428a2b4..2e1169c81c6e 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -452,10 +452,11 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq) int is_removed = 0; int error; - if (is_master(r)) + if (r->res_nodeid != -1 && is_master(r)) return 0; - is_removed = dlm_is_removed(ls, r->res_nodeid); + if (r->res_nodeid != -1) + is_removed = dlm_is_removed(ls, r->res_nodeid); if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER)) return 0; @@ -664,7 +665,7 @@ int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq, int error, count = 0; list_for_each_entry(r, root_list, res_root_list) { - if (is_master(r)) { + if (r->res_nodeid != -1 && is_master(r)) { rsb_clear_flag(r, RSB_NEW_MASTER); continue; } @@ -858,7 +859,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list) list_for_each_entry(r, root_list, res_root_list) { lock_rsb(r); - if (is_master(r)) { + if (r->res_nodeid != -1 && is_master(r)) { if (rsb_flag(r, RSB_RECOVER_CONVERT)) recover_conversion(r); From 8a4cf500f1dded74ababd8d33db35631e540e124 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 2 Aug 2024 13:26:42 -0400 Subject: [PATCH 05/11] dlm: drop kobject release callback handling This patch removes the releasing of the "struct dlm ls" resource out of the kobject handling. Instead we run kfree() after kobject_put() of the lockspace kobject structure that should always being the last put call. This prepares to split the releasing of all lockspace resources asynchronously in the background and just deregister everything in release_lockspace(). Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lockspace.c | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 1848cbbc96a9..bf14016d53e1 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -174,12 +174,6 @@ static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr, return a->store ? a->store(ls, buf, len) : len; } -static void lockspace_kobj_release(struct kobject *k) -{ - struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj); - kfree(ls); -} - static const struct sysfs_ops dlm_attr_ops = { .show = dlm_attr_show, .store = dlm_attr_store, @@ -188,7 +182,6 @@ static const struct sysfs_ops dlm_attr_ops = { static struct kobj_type dlm_ktype = { .default_groups = dlm_groups, .sysfs_ops = &dlm_attr_ops, - .release = lockspace_kobj_release, }; static struct kset *dlm_kset; @@ -328,7 +321,6 @@ static int new_lockspace(const char *name, const char *cluster, int *ops_result, dlm_lockspace_t **lockspace) { struct dlm_ls *ls; - int do_unreg = 0; int namelen = strlen(name); int error; @@ -530,9 +522,6 @@ static int new_lockspace(const char *name, const char *cluster, wait_event(ls->ls_recover_lock_wait, test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); - /* let kobject handle freeing of ls if there's an error */ - do_unreg = 1; - ls->ls_kobj.kset = dlm_kset; error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, "%s", ls->ls_name); @@ -580,10 +569,8 @@ static int new_lockspace(const char *name, const char *cluster, xa_destroy(&ls->ls_lkbxa); rhashtable_destroy(&ls->ls_rsbtbl); out_lsfree: - if (do_unreg) - kobject_put(&ls->ls_kobj); - else - kfree(ls); + kobject_put(&ls->ls_kobj); + kfree(ls); out: module_put(THIS_MODULE); return error; @@ -743,6 +730,8 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_delete_debug_file(ls); + kobject_put(&ls->ls_kobj); + xa_destroy(&ls->ls_recover_xa); kfree(ls->ls_recover_buf); @@ -769,8 +758,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_clear_members_gone(ls); kfree(ls->ls_node_array); log_rinfo(ls, "release_lockspace final free"); - kobject_put(&ls->ls_kobj); - /* The ls structure will be freed when the kobject is done with */ + kfree(ls); module_put(THIS_MODULE); return 0; From 94e180d6255f5a765bb723e6e8b67f1438ce574b Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 2 Aug 2024 13:26:43 -0400 Subject: [PATCH 06/11] dlm: async freeing of lockspace resources This patch handles freeing of lockspace resources asynchronously besides the release_lockspace() context. The release_lockspace() context is sometimes called in a time critical context, e.g. umount syscall. Most every user space init system will timeout if it takes too long. To reduce the potential waiting time we deregister in release_lockspace() the lockspace from the DLM subsystem and do the actual releasing of lockspace resource in a worker of a workqueue following recommendation of: https://lore.kernel.org/all/49925af7-78a8-a3dd-bce6-cfc02e1a9236@I-love.SAKURA.ne.jp/T/#u as flushing of system workqueues are not allowed. The most time to release the DLM resources are spent to release the data structures "ls->ls_lkbxa" and "ls->ls_rsbtbl" as they iterate over each entries and those data structures can contain millions of entries. This patch handles for now only freeing of those data structures as those operations are the most reason why release_lockspace() blocking of being returned. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 4 +++ fs/dlm/lockspace.c | 77 ++++++++++++++++++++++++------------------- fs/dlm/main.c | 10 ++++++ 3 files changed, 58 insertions(+), 33 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 32d98e63d25e..0562099e60eb 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -660,6 +660,8 @@ struct dlm_ls { const struct dlm_lockspace_ops *ls_ops; void *ls_ops_arg; + struct work_struct ls_free_work; + int ls_namelen; char ls_name[DLM_LOCKSPACE_LEN + 1]; }; @@ -803,6 +805,8 @@ static inline void dlm_set_sbflags_val(struct dlm_lkb *lkb, uint32_t val) __DLM_SBF_MAX_BIT); } +extern struct workqueue_struct *dlm_wq; + int dlm_plock_init(void); void dlm_plock_exit(void); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index bf14016d53e1..8afac6e2dff0 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -315,6 +315,44 @@ static int threads_start(void) return error; } +static int lkb_idr_free(struct dlm_lkb *lkb) +{ + if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) + dlm_free_lvb(lkb->lkb_lvbptr); + + dlm_free_lkb(lkb); + return 0; +} + +static void rhash_free_rsb(void *ptr, void *arg) +{ + struct dlm_rsb *rsb = ptr; + + dlm_free_rsb(rsb); +} + +static void free_lockspace(struct work_struct *work) +{ + struct dlm_ls *ls = container_of(work, struct dlm_ls, ls_free_work); + struct dlm_lkb *lkb; + unsigned long id; + + /* + * Free all lkb's in xa + */ + xa_for_each(&ls->ls_lkbxa, id, lkb) { + lkb_idr_free(lkb); + } + xa_destroy(&ls->ls_lkbxa); + + /* + * Free all rsb's on rsbtbl + */ + rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); + + kfree(ls); +} + static int new_lockspace(const char *name, const char *cluster, uint32_t flags, int lvblen, const struct dlm_lockspace_ops *ops, void *ops_arg, @@ -445,6 +483,8 @@ static int new_lockspace(const char *name, const char *cluster, spin_lock_init(&ls->ls_cb_lock); INIT_LIST_HEAD(&ls->ls_cb_delay); + INIT_WORK(&ls->ls_free_work, free_lockspace); + ls->ls_recoverd_task = NULL; mutex_init(&ls->ls_recoverd_active); spin_lock_init(&ls->ls_recover_lock); @@ -627,15 +667,6 @@ int dlm_new_user_lockspace(const char *name, const char *cluster, ops_arg, ops_result, lockspace); } -static int lkb_idr_free(struct dlm_lkb *lkb) -{ - if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) - dlm_free_lvb(lkb->lkb_lvbptr); - - dlm_free_lkb(lkb); - return 0; -} - /* NOTE: We check the lkbxa here rather than the resource table. This is because there may be LKBs queued as ASTs that have been unlinked from their RSBs and are pending deletion once the AST has been delivered */ @@ -667,17 +698,8 @@ static int lockspace_busy(struct dlm_ls *ls, int force) return rv; } -static void rhash_free_rsb(void *ptr, void *arg) -{ - struct dlm_rsb *rsb = ptr; - - dlm_free_rsb(rsb); -} - static int release_lockspace(struct dlm_ls *ls, int force) { - struct dlm_lkb *lkb; - unsigned long id; int busy, rv; busy = lockspace_busy(ls, force); @@ -735,19 +757,6 @@ static int release_lockspace(struct dlm_ls *ls, int force) xa_destroy(&ls->ls_recover_xa); kfree(ls->ls_recover_buf); - /* - * Free all lkb's in xa - */ - xa_for_each(&ls->ls_lkbxa, id, lkb) { - lkb_idr_free(lkb); - } - xa_destroy(&ls->ls_lkbxa); - - /* - * Free all rsb's on rsbtbl - */ - rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); - /* * Free structures on any other lists */ @@ -757,9 +766,11 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_clear_members(ls); dlm_clear_members_gone(ls); kfree(ls->ls_node_array); - log_rinfo(ls, "release_lockspace final free"); - kfree(ls); + log_rinfo(ls, "%s final free", __func__); + + /* delayed free of data structures see free_lockspace() */ + queue_work(dlm_wq, &ls->ls_free_work); module_put(THIS_MODULE); return 0; } diff --git a/fs/dlm/main.c b/fs/dlm/main.c index 6ca28299c9db..cb15db8ba9bf 100644 --- a/fs/dlm/main.c +++ b/fs/dlm/main.c @@ -22,6 +22,8 @@ #define CREATE_TRACE_POINTS #include +struct workqueue_struct *dlm_wq; + static int __init init_dlm(void) { int error; @@ -50,10 +52,16 @@ static int __init init_dlm(void) if (error) goto out_user; + dlm_wq = alloc_workqueue("dlm_wq", 0, 0); + if (!dlm_wq) + goto out_plock; + printk("DLM installed\n"); return 0; + out_plock: + dlm_plock_exit(); out_user: dlm_user_exit(); out_debug: @@ -70,6 +78,8 @@ static int __init init_dlm(void) static void __exit exit_dlm(void) { + /* be sure every pending work e.g. freeing is done */ + destroy_workqueue(dlm_wq); dlm_plock_exit(); dlm_user_exit(); dlm_config_exit(); From 98ff7d95d91b56d8f2fdd0c4d0421f7fbb538cba Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 2 Aug 2024 13:26:44 -0400 Subject: [PATCH 07/11] dlm: use RSB_HASHED to avoid lookup twice Since commit 01fdeca1cc2d ("dlm: use rcu to avoid an extra rsb struct lookup") _dlm_master_lookup() is called under rcu lock that prevents that the rsb structure is being freed. There was a missing change to avoid an additional lookup and just check that the rsb is still part of the ls_rsbtbl structure. This patch is doing such check instead of lookup the rsb structure again. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lock.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 30aec123a483..8bf3654f4827 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1308,11 +1308,11 @@ static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *na } do_inactive: - /* unlikely path - relookup under write */ + /* unlikely path - check if still part of ls_rsbtbl */ write_lock_bh(&ls->ls_rsbtbl_lock); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (!error) { + /* see comment in find_rsb_dir */ + if (rsb_flag(r, RSB_HASHED)) { if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); /* something as changed, very unlikely but From 5be323b0c64dbecdc33b43012f927e6af82d62d3 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 2 Aug 2024 13:26:45 -0400 Subject: [PATCH 08/11] dlm: move dlm_search_rsb_tree() out of lock The rhashtable structure is lockless for readers such as rhashtable_lookup_fast(). It should be save to call this lookup functionality out of holding ls_rsbtbl_lock to get the rsb pointer out of the hash. This reduce the contention time of ls_rsbtbl_lock in some cases. We still need to check if the rsb is part of the check as this state can be changed while ls_rsbtbl_lock is not held. If its part of the rhashtable data structure we take a reference to be sure it will not be freed after we drop the ls_rsbtbl_lock read lock. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lock.c | 83 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 33 deletions(-) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 8bf3654f4827..9d3ec359d5e3 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -733,11 +733,13 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, } retry: + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (error) + goto do_new; /* check if the rsb is active under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (error) { + if (!rsb_flag(r, RSB_HASHED)) { read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_new; } @@ -918,11 +920,13 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, int error; retry: + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (error) + goto do_new; /* check if the rsb is in active state under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (error) { + if (!rsb_flag(r, RSB_HASHED)) { read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_new; } @@ -1276,37 +1280,39 @@ static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *na } retry: + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (error) + goto not_found; /* check if the rsb is active under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (!error) { - if (rsb_flag(r, RSB_INACTIVE)) { - read_unlock_bh(&ls->ls_rsbtbl_lock); - goto do_inactive; - } - - /* because the rsb is active, we need to lock_rsb before - * checking/changing re_master_nodeid - */ - - hold_rsb(r); - read_unlock_bh(&ls->ls_rsbtbl_lock); - lock_rsb(r); - - __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, - flags, r_nodeid, result); - - /* the rsb was active */ - unlock_rsb(r); - put_rsb(r); - - return 0; - } else { + if (!rsb_flag(r, RSB_HASHED)) { read_unlock_bh(&ls->ls_rsbtbl_lock); goto not_found; } + if (rsb_flag(r, RSB_INACTIVE)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_inactive; + } + + /* because the rsb is active, we need to lock_rsb before + * checking/changing re_master_nodeid + */ + + hold_rsb(r); + read_unlock_bh(&ls->ls_rsbtbl_lock); + lock_rsb(r); + + __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, + flags, r_nodeid, result); + + /* the rsb was active */ + unlock_rsb(r); + put_rsb(r); + + return 0; + do_inactive: /* unlikely path - check if still part of ls_rsbtbl */ write_lock_bh(&ls->ls_rsbtbl_lock); @@ -1403,14 +1409,14 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) struct dlm_rsb *r = NULL; int error; - read_lock_bh(&ls->ls_rsbtbl_lock); + rcu_read_lock(); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) goto out; dlm_dump_rsb(r); out: - read_unlock_bh(&ls->ls_rsbtbl_lock); + rcu_read_unlock(); } static void deactivate_rsb(struct kref *kref) @@ -4309,17 +4315,28 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) memset(name, 0, sizeof(name)); memcpy(name, ms->m_extra, len); - write_lock_bh(&ls->ls_rsbtbl_lock); - + rcu_read_lock(); rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (rv) { + rcu_read_unlock(); /* should not happen */ log_error(ls, "%s from %d not found %s", __func__, from_nodeid, name); - write_unlock_bh(&ls->ls_rsbtbl_lock); return; } + write_lock_bh(&ls->ls_rsbtbl_lock); + if (!rsb_flag(r, RSB_HASHED)) { + rcu_read_unlock(); + write_unlock_bh(&ls->ls_rsbtbl_lock); + /* should not happen */ + log_error(ls, "%s from %d got removed during removal %s", + __func__, from_nodeid, name); + return; + } + /* at this stage the rsb can only being freed here */ + rcu_read_unlock(); + if (!rsb_flag(r, RSB_INACTIVE)) { if (r->res_master_nodeid != from_nodeid) { /* should not happen */ From c846f732b97aa30ab91c03b0337cc0c8e27b24df Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 2 Aug 2024 13:26:46 -0400 Subject: [PATCH 09/11] dlm: move lkb xarray lookup out of lock This patch moves the xarray lookup functionality for the lkb out of the ls_lkbxa_lock read lock handling. We can do that as the xarray should be possible to access lockless in case of reader like xa_load(). We confirm under ls_lkbxa_lock that the lkb is still part of the data structure and take a reference when its still part of ls_lkbxa to avoid being freed after doing the lookup. To do a check if the lkb is still part of the ls_lkbxa data structure we use a kref_read() as the last put will remove it from the ls_lkbxa data structure and any reference taken means it is still part of ls_lkbxa. A similar approach was done with the DLM rsb rhashtable just with a flag instead of the refcounter because the refcounter has a slightly different meaning. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dlm_internal.h | 1 + fs/dlm/lock.c | 18 ++++++++++++++---- fs/dlm/memory.c | 9 ++++++++- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 0562099e60eb..d534a4bc162b 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -295,6 +295,7 @@ struct dlm_lkb { void *lkb_astparam; /* caller's ast arg */ struct dlm_user_args *lkb_ua; }; + struct rcu_head rcu; }; /* diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 9d3ec359d5e3..865dc70a9dfc 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1527,11 +1527,21 @@ static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - read_lock_bh(&ls->ls_lkbxa_lock); + rcu_read_lock(); lkb = xa_load(&ls->ls_lkbxa, lkid); - if (lkb) - kref_get(&lkb->lkb_ref); - read_unlock_bh(&ls->ls_lkbxa_lock); + if (lkb) { + /* check if lkb is still part of lkbxa under lkbxa_lock as + * the lkb_ref is tight to the lkbxa data structure, see + * __put_lkb(). + */ + read_lock_bh(&ls->ls_lkbxa_lock); + if (kref_read(&lkb->lkb_ref)) + kref_get(&lkb->lkb_ref); + else + lkb = NULL; + read_unlock_bh(&ls->ls_lkbxa_lock); + } + rcu_read_unlock(); *lkb_ret = lkb; return lkb ? 0 : -ENOENT; diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c index 442898cf7185..5c35cc67aca4 100644 --- a/fs/dlm/memory.c +++ b/fs/dlm/memory.c @@ -115,8 +115,10 @@ struct dlm_lkb *dlm_allocate_lkb(void) return kmem_cache_zalloc(lkb_cache, GFP_ATOMIC); } -void dlm_free_lkb(struct dlm_lkb *lkb) +static void __free_lkb_rcu(struct rcu_head *rcu) { + struct dlm_lkb *lkb = container_of(rcu, struct dlm_lkb, rcu); + if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { struct dlm_user_args *ua; ua = lkb->lkb_ua; @@ -129,6 +131,11 @@ void dlm_free_lkb(struct dlm_lkb *lkb) kmem_cache_free(lkb_cache, lkb); } +void dlm_free_lkb(struct dlm_lkb *lkb) +{ + call_rcu(&lkb->rcu, __free_lkb_rcu); +} + struct dlm_mhandle *dlm_allocate_mhandle(void) { return kmem_cache_alloc(mhandle_cache, GFP_ATOMIC); From fb1911ef6f4899eaba082bb81f301987e2e3bb86 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Fri, 2 Aug 2024 13:26:47 -0400 Subject: [PATCH 10/11] dlm: do synchronized socket connect call To avoid -EINPROGRESS cases on connect that just ends in a retry we just call connect in a synchronized way to wait until its done. Since commit dbb751ffab0b ("fs: dlm: parallelize lowcomms socket handling") we have a non ordered workqueue running for serving the DLM sockets that allows us to call send/recv for each DLM socket connection in parallel. Before each worker needed to wait until the previous worker was done and probably the reason why connect() was called in an asynchronous way to not block other workers. This is however not necessary anymore as other socket handling workers don't need to wait. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 36 +----------------------------------- 1 file changed, 1 insertion(+), 35 deletions(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 2e3e269d820e..cb3a10b041c2 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -161,8 +161,6 @@ struct dlm_proto_ops { const char *name; int proto; - int (*connect)(struct connection *con, struct socket *sock, - struct sockaddr *addr, int addr_len); void (*sockopts)(struct socket *sock); int (*bind)(struct socket *sock); int (*listen_validate)(void); @@ -1599,8 +1597,7 @@ static int dlm_connect(struct connection *con) log_print_ratelimited("connecting to %d", con->nodeid); make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); - result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, - addr_len); + result = kernel_connect(sock, (struct sockaddr *)&addr, addr_len, 0); switch (result) { case -EINPROGRESS: /* not an error */ @@ -1634,13 +1631,6 @@ static void process_send_sockets(struct work_struct *work) switch (ret) { case 0: break; - case -EINPROGRESS: - /* avoid spamming resched on connection - * we might can switch to a state_change - * event based mechanism if established - */ - msleep(100); - break; default: /* CF_SEND_PENDING not cleared */ up_write(&con->sock_lock); @@ -1831,12 +1821,6 @@ static int dlm_tcp_bind(struct socket *sock) return 0; } -static int dlm_tcp_connect(struct connection *con, struct socket *sock, - struct sockaddr *addr, int addr_len) -{ - return kernel_connect(sock, addr, addr_len, O_NONBLOCK); -} - static int dlm_tcp_listen_validate(void) { /* We don't support multi-homed hosts */ @@ -1873,7 +1857,6 @@ static int dlm_tcp_listen_bind(struct socket *sock) static const struct dlm_proto_ops dlm_tcp_ops = { .name = "TCP", .proto = IPPROTO_TCP, - .connect = dlm_tcp_connect, .sockopts = dlm_tcp_sockopts, .bind = dlm_tcp_bind, .listen_validate = dlm_tcp_listen_validate, @@ -1886,22 +1869,6 @@ static int dlm_sctp_bind(struct socket *sock) return sctp_bind_addrs(sock, 0); } -static int dlm_sctp_connect(struct connection *con, struct socket *sock, - struct sockaddr *addr, int addr_len) -{ - int ret; - - /* - * Make kernel_connect() function return in specified time, - * since O_NONBLOCK argument in connect() function does not work here, - * then, we should restore the default value of this attribute. - */ - sock_set_sndtimeo(sock->sk, 5); - ret = kernel_connect(sock, addr, addr_len, 0); - sock_set_sndtimeo(sock->sk, 0); - return ret; -} - static int dlm_sctp_listen_validate(void) { if (!IS_ENABLED(CONFIG_IP_SCTP)) { @@ -1929,7 +1896,6 @@ static const struct dlm_proto_ops dlm_sctp_ops = { .name = "SCTP", .proto = IPPROTO_SCTP, .try_new_addr = true, - .connect = dlm_sctp_connect, .sockopts = dlm_sctp_sockopts, .bind = dlm_sctp_bind, .listen_validate = dlm_sctp_listen_validate, From 652b0ae675fede81420758e3af7c5174cdaa8404 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Mon, 12 Aug 2024 16:14:24 -0400 Subject: [PATCH 11/11] dlm: add missing -ENOMEM if alloc_workqueue() fails This patch sets an missing -ENOMEM as error return value when the allocation of the dlm workqueue fails. Fixes: 94e180d6255f ("dlm: async freeing of lockspace resources") Reported-by: kernel test robot Reported-by: Dan Carpenter Closes: https://lore.kernel.org/r/202408110800.OsoP8TB9-lkp@intel.com/ Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/main.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fs/dlm/main.c b/fs/dlm/main.c index cb15db8ba9bf..4887c8a05318 100644 --- a/fs/dlm/main.c +++ b/fs/dlm/main.c @@ -53,8 +53,10 @@ static int __init init_dlm(void) goto out_user; dlm_wq = alloc_workqueue("dlm_wq", 0, 0); - if (!dlm_wq) + if (!dlm_wq) { + error = -ENOMEM; goto out_plock; + } printk("DLM installed\n");