diff options
author | Xue jiufei <xuejiufei@huawei.com> | 2014-06-23 13:22:09 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2014-06-23 16:47:45 -0700 |
commit | ac4fef4d23ed879a7fd11ab24ccd2e1464277e9a (patch) | |
tree | 6f5139c8f615683b688d27f3d5f628cc140465bc /fs | |
parent | b9aaac5a6b7d228435fcb80963d41c274406011b (diff) | |
download | op-kernel-dev-ac4fef4d23ed879a7fd11ab24ccd2e1464277e9a.zip op-kernel-dev-ac4fef4d23ed879a7fd11ab24ccd2e1464277e9a.tar.gz |
ocfs2/dlm: do not purge lockres that is queued for assert master
When workqueue is delayed, it may occur that a lockres is purged while it
is still queued for master assert. it may trigger BUG() as follows.
N1 N2
dlm_get_lockres()
->dlm_do_master_requery
is the master of lockres,
so queue assert_master work
dlm_thread() start running
and purge the lockres
dlm_assert_master_worker()
send assert master message
to other nodes
receiving the assert_master
message, set master to N2
dlmlock_remote() send create_lock message to N2, but receive DLM_IVLOCKID,
if it is RECOVERY lockres, it triggers the BUG().
Another BUG() is triggered when N3 become the new master and send
assert_master to N1, N1 will trigger the BUG() because owner doesn't
match. So we should not purge lockres when it is queued for assert
master.
Signed-off-by: joyce.xue <xuejiufei@huawei.com>
Reviewed-by: Mark Fasheh <mfasheh@suse.de>
Cc: Joel Becker <jlbec@evilplan.org>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'fs')
-rw-r--r-- | fs/ocfs2/dlm/dlmcommon.h | 4 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmmaster.c | 43 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmrecovery.c | 3 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmthread.c | 11 |
4 files changed, 55 insertions, 6 deletions
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index a106b3f..fae17c6 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -331,6 +331,7 @@ struct dlm_lock_resource u16 state; char lvb[DLM_LVB_LEN]; unsigned int inflight_locks; + unsigned int inflight_assert_workers; unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; }; @@ -910,6 +911,9 @@ void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); +void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res); + void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void __dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 4f4b00e..82abf0c 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -581,6 +581,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, atomic_set(&res->asts_reserved, 0); res->migration_pending = 0; res->inflight_locks = 0; + res->inflight_assert_workers = 0; res->dlm = dlm; @@ -683,6 +684,43 @@ void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, wake_up(&res->wq); } +void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + assert_spin_locked(&res->spinlock); + res->inflight_assert_workers++; + mlog(0, "%s:%.*s: inflight assert worker++: now %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->inflight_assert_workers); +} + +static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + spin_lock(&res->spinlock); + __dlm_lockres_grab_inflight_worker(dlm, res); + spin_unlock(&res->spinlock); +} + +static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + assert_spin_locked(&res->spinlock); + BUG_ON(res->inflight_assert_workers == 0); + res->inflight_assert_workers--; + mlog(0, "%s:%.*s: inflight assert worker--: now %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->inflight_assert_workers); +} + +static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + spin_lock(&res->spinlock); + __dlm_lockres_drop_inflight_worker(dlm, res); + spin_unlock(&res->spinlock); +} + /* * lookup a lock resource by name. * may already exist in the hashtable. @@ -1603,7 +1641,8 @@ send_response: mlog(ML_ERROR, "failed to dispatch assert master work\n"); response = DLM_MASTER_RESP_ERROR; dlm_lockres_put(res); - } + } else + dlm_lockres_grab_inflight_worker(dlm, res); } else { if (res) dlm_lockres_put(res); @@ -2118,6 +2157,8 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) dlm_lockres_release_ast(dlm, res); put: + dlm_lockres_drop_inflight_worker(dlm, res); + dlm_lockres_put(res); mlog(0, "finished with dlm_assert_master_worker\n"); diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 5de0194..45067fa 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1708,7 +1708,8 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data, mlog_errno(-ENOMEM); /* retry!? */ BUG(); - } + } else + __dlm_lockres_grab_inflight_worker(dlm, res); } else /* put.. incase we are not the master */ dlm_lockres_put(res); spin_unlock(&res->spinlock); diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index cf53a82..69aac6f 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -259,11 +259,14 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm, * refs on it. */ unused = __dlm_lockres_unused(lockres); if (!unused || - (lockres->state & DLM_LOCK_RES_MIGRATING)) { + (lockres->state & DLM_LOCK_RES_MIGRATING) || + (lockres->inflight_assert_workers != 0)) { mlog(0, "%s: res %.*s is in use or being remastered, " - "used %d, state %d\n", dlm->name, - lockres->lockname.len, lockres->lockname.name, - !unused, lockres->state); + "used %d, state %d, assert master workers %u\n", + dlm->name, lockres->lockname.len, + lockres->lockname.name, + !unused, lockres->state, + lockres->inflight_assert_workers); list_move_tail(&lockres->purge, &dlm->purge_list); spin_unlock(&lockres->spinlock); continue; |