diff options
Diffstat (limited to 'fs/dlm/recover.c')
-rw-r--r-- | fs/dlm/recover.c | 73 |
1 files changed, 42 insertions, 31 deletions
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index 34d5adf1f..7554e4d 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid) { struct dlm_lkb *lkb; - list_for_each_entry(lkb, queue, lkb_statequeue) - if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) + list_for_each_entry(lkb, queue, lkb_statequeue) { + if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) { lkb->lkb_nodeid = nodeid; + lkb->lkb_remid = 0; + } + } } static void set_master_lkbs(struct dlm_rsb *r) @@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r) /* * Propagate the new master nodeid to locks * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider. - * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which + * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which * rsb's to consider. */ static void set_new_master(struct dlm_rsb *r, int nodeid) { - lock_rsb(r); r->res_nodeid = nodeid; set_master_lkbs(r); rsb_set_flag(r, RSB_NEW_MASTER); rsb_set_flag(r, RSB_NEW_MASTER2); - unlock_rsb(r); } /* @@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid) static int recover_master(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); - - dir_nodeid = dlm_dir_nodeid(r); + int error, ret_nodeid; + int our_nodeid = dlm_our_nodeid(); + int dir_nodeid = dlm_dir_nodeid(r); if (dir_nodeid == our_nodeid) { error = dlm_dir_lookup(ls, our_nodeid, r->res_name, @@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r) if (ret_nodeid == our_nodeid) ret_nodeid = 0; + lock_rsb(r); set_new_master(r, ret_nodeid); + unlock_rsb(r); } else { recover_list_add(r); error = dlm_send_rcom_lookup(r, dir_nodeid); @@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r) } /* - * When not using a directory, most resource names will hash to a new static - * master nodeid and the resource will need to be remastered. + * All MSTCPY locks are purged and rebuilt, even if the master stayed the same. + * This is necessary because recovery can be started, aborted and restarted, + * causing the master nodeid to briefly change during the aborted recovery, and + * change back to the original value in the second recovery. The MSTCPY locks + * may or may not have been purged during the aborted recovery. Another node + * with an outstanding request in waiters list and a request reply saved in the + * requestqueue, cannot know whether it should ignore the reply and resend the + * request, or accept the reply and complete the request. It must do the + * former if the remote node purged MSTCPY locks, and it must do the later if + * the remote node did not. This is solved by always purging MSTCPY locks, in + * which case, the request reply would always be ignored and the request + * resent. */ static int recover_master_static(struct dlm_rsb *r) { - int master = dlm_dir_nodeid(r); + int dir_nodeid = dlm_dir_nodeid(r); + int new_master = dir_nodeid; - if (master == dlm_our_nodeid()) - master = 0; + if (dir_nodeid == dlm_our_nodeid()) + new_master = 0; - if (r->res_nodeid != master) { - if (is_master(r)) - dlm_purge_mstcpy_locks(r); - set_new_master(r, master); - return 1; - } - return 0; + lock_rsb(r); + dlm_purge_mstcpy_locks(r); + set_new_master(r, new_master); + unlock_rsb(r); + return 1; } /* @@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) if (nodeid == dlm_our_nodeid()) nodeid = 0; + lock_rsb(r); set_new_master(r, nodeid); + unlock_rsb(r); recover_list_del(r); if (recover_list_empty(ls)) @@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls) struct dlm_rsb *r; int error, count = 0; - log_debug(ls, "dlm_recover_locks"); - down_read(&ls->ls_root_sem); list_for_each_entry(r, &ls->ls_root_list, res_root_list) { if (is_master(r)) { @@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls) } up_read(&ls->ls_root_sem); - log_debug(ls, "dlm_recover_locks %d locks", count); + log_debug(ls, "dlm_recover_locks %d out", count); error = dlm_wait_function(ls, &recover_list_empty); out: @@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r) } /* We've become the new master for this rsb and waiting/converting locks may - need to be granted in dlm_grant_after_purge() due to locks that may have + need to be granted in dlm_recover_grant() due to locks that may have existed from a removed node. */ -static void set_locks_purged(struct dlm_rsb *r) +static void recover_grant(struct dlm_rsb *r) { if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) - rsb_set_flag(r, RSB_LOCKS_PURGED); + rsb_set_flag(r, RSB_RECOVER_GRANT); } void dlm_recover_rsbs(struct dlm_ls *ls) { struct dlm_rsb *r; - int count = 0; - - log_debug(ls, "dlm_recover_rsbs"); + unsigned int count = 0; down_read(&ls->ls_root_sem); list_for_each_entry(r, &ls->ls_root_list, res_root_list) { @@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls) if (rsb_flag(r, RSB_RECOVER_CONVERT)) recover_conversion(r); if (rsb_flag(r, RSB_NEW_MASTER2)) - set_locks_purged(r); + recover_grant(r); recover_lvb(r); count++; } @@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls) } up_read(&ls->ls_root_sem); - log_debug(ls, "dlm_recover_rsbs %d rsbs", count); + if (count) + log_debug(ls, "dlm_recover_rsbs %d done", count); } /* Create a single list of all root rsb's to be used during recovery */ |