ocfs2/dlm: do not purge lockres that is queued for assert master

When workqueue is delayed, it may occur that a lockres is purged while it
is still queued for master assert.  it may trigger BUG() as follows.

N1                                         N2
dlm_get_lockres()
->dlm_do_master_requery
                                  is the master of lockres,
                                  so queue assert_master work

                                  dlm_thread() start running
                                  and purge the lockres

                                  dlm_assert_master_worker()
                                  send assert master message
                                  to other nodes
receiving the assert_master
message, set master to N2

dlmlock_remote() send create_lock message to N2, but receive DLM_IVLOCKID,
if it is RECOVERY lockres, it triggers the BUG().

Another BUG() is triggered when N3 become the new master and send
assert_master to N1, N1 will trigger the BUG() because owner doesn't
match.  So we should not purge lockres when it is queued for assert
master.

Signed-off-by: joyce.xue <xuejiufei@huawei.com>
Reviewed-by: Mark Fasheh <mfasheh@suse.de>
Cc: Joel Becker <jlbec@evilplan.org>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
This commit is contained in:
Xue jiufei 2014-06-23 13:22:09 -07:00 committed by Linus Torvalds
parent b9aaac5a6b
commit ac4fef4d23
4 changed files with 55 additions and 6 deletions

View File

@ -331,6 +331,7 @@ struct dlm_lock_resource
u16 state; u16 state;
char lvb[DLM_LVB_LEN]; char lvb[DLM_LVB_LEN];
unsigned int inflight_locks; unsigned int inflight_locks;
unsigned int inflight_assert_workers;
unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
}; };
@ -910,6 +911,9 @@ void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res); struct dlm_lock_resource *res);
void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void __dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void __dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);

View File

@ -581,6 +581,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
atomic_set(&res->asts_reserved, 0); atomic_set(&res->asts_reserved, 0);
res->migration_pending = 0; res->migration_pending = 0;
res->inflight_locks = 0; res->inflight_locks = 0;
res->inflight_assert_workers = 0;
res->dlm = dlm; res->dlm = dlm;
@ -683,6 +684,43 @@ void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
wake_up(&res->wq); wake_up(&res->wq);
} }
void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
assert_spin_locked(&res->spinlock);
res->inflight_assert_workers++;
mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
dlm->name, res->lockname.len, res->lockname.name,
res->inflight_assert_workers);
}
static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
spin_lock(&res->spinlock);
__dlm_lockres_grab_inflight_worker(dlm, res);
spin_unlock(&res->spinlock);
}
static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
assert_spin_locked(&res->spinlock);
BUG_ON(res->inflight_assert_workers == 0);
res->inflight_assert_workers--;
mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
dlm->name, res->lockname.len, res->lockname.name,
res->inflight_assert_workers);
}
static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
spin_lock(&res->spinlock);
__dlm_lockres_drop_inflight_worker(dlm, res);
spin_unlock(&res->spinlock);
}
/* /*
* lookup a lock resource by name. * lookup a lock resource by name.
* may already exist in the hashtable. * may already exist in the hashtable.
@ -1603,7 +1641,8 @@ send_response:
mlog(ML_ERROR, "failed to dispatch assert master work\n"); mlog(ML_ERROR, "failed to dispatch assert master work\n");
response = DLM_MASTER_RESP_ERROR; response = DLM_MASTER_RESP_ERROR;
dlm_lockres_put(res); dlm_lockres_put(res);
} } else
dlm_lockres_grab_inflight_worker(dlm, res);
} else { } else {
if (res) if (res)
dlm_lockres_put(res); dlm_lockres_put(res);
@ -2118,6 +2157,8 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
dlm_lockres_release_ast(dlm, res); dlm_lockres_release_ast(dlm, res);
put: put:
dlm_lockres_drop_inflight_worker(dlm, res);
dlm_lockres_put(res); dlm_lockres_put(res);
mlog(0, "finished with dlm_assert_master_worker\n"); mlog(0, "finished with dlm_assert_master_worker\n");

View File

@ -1708,7 +1708,8 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
mlog_errno(-ENOMEM); mlog_errno(-ENOMEM);
/* retry!? */ /* retry!? */
BUG(); BUG();
} } else
__dlm_lockres_grab_inflight_worker(dlm, res);
} else /* put.. incase we are not the master */ } else /* put.. incase we are not the master */
dlm_lockres_put(res); dlm_lockres_put(res);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);

View File

@ -259,11 +259,14 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
* refs on it. */ * refs on it. */
unused = __dlm_lockres_unused(lockres); unused = __dlm_lockres_unused(lockres);
if (!unused || if (!unused ||
(lockres->state & DLM_LOCK_RES_MIGRATING)) { (lockres->state & DLM_LOCK_RES_MIGRATING) ||
(lockres->inflight_assert_workers != 0)) {
mlog(0, "%s: res %.*s is in use or being remastered, " mlog(0, "%s: res %.*s is in use or being remastered, "
"used %d, state %d\n", dlm->name, "used %d, state %d, assert master workers %u\n",
lockres->lockname.len, lockres->lockname.name, dlm->name, lockres->lockname.len,
!unused, lockres->state); lockres->lockname.name,
!unused, lockres->state,
lockres->inflight_assert_workers);
list_move_tail(&lockres->purge, &dlm->purge_list); list_move_tail(&lockres->purge, &dlm->purge_list);
spin_unlock(&lockres->spinlock); spin_unlock(&lockres->spinlock);
continue; continue;