diff options
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r-- | fs/ceph/mds_client.c | 428 |
1 files changed, 263 insertions, 165 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 24561a5..1766947 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -40,7 +40,7 @@ static void __wake_requests(struct ceph_mds_client *mdsc, struct list_head *head); -const static struct ceph_connection_operations mds_con_ops; +static const struct ceph_connection_operations mds_con_ops; /* @@ -665,10 +665,10 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq) struct ceph_msg *msg; struct ceph_mds_session_head *h; - msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL); - if (IS_ERR(msg)) { + msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS); + if (!msg) { pr_err("create_session_msg ENOMEM creating msg\n"); - return ERR_PTR(PTR_ERR(msg)); + return NULL; } h = msg->front.iov_base; h->op = cpu_to_le32(op); @@ -687,7 +687,6 @@ static int __open_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg; int mstate; int mds = session->s_mds; - int err = 0; /* wait for mds to go active? */ mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); @@ -698,13 +697,9 @@ static int __open_session(struct ceph_mds_client *mdsc, /* send connect message */ msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); - if (IS_ERR(msg)) { - err = PTR_ERR(msg); - goto out; - } + if (!msg) + return -ENOMEM; ceph_con_send(&session->s_con, msg); - -out: return 0; } @@ -804,12 +799,49 @@ out: } static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, - void *arg) + void *arg) { struct ceph_inode_info *ci = ceph_inode(inode); + int drop = 0; + dout("removing cap %p, ci is %p, inode is %p\n", cap, ci, &ci->vfs_inode); - ceph_remove_cap(cap); + spin_lock(&inode->i_lock); + __ceph_remove_cap(cap); + if (!__ceph_is_any_real_caps(ci)) { + struct ceph_mds_client *mdsc = + &ceph_sb_to_client(inode->i_sb)->mdsc; + + spin_lock(&mdsc->cap_dirty_lock); + if (!list_empty(&ci->i_dirty_item)) { + pr_info(" dropping dirty %s state for %p %lld\n", + ceph_cap_string(ci->i_dirty_caps), + inode, ceph_ino(inode)); + ci->i_dirty_caps = 0; + list_del_init(&ci->i_dirty_item); + drop = 1; + } + if (!list_empty(&ci->i_flushing_item)) { + pr_info(" dropping dirty+flushing %s state for %p %lld\n", + ceph_cap_string(ci->i_flushing_caps), + inode, ceph_ino(inode)); + ci->i_flushing_caps = 0; + list_del_init(&ci->i_flushing_item); + mdsc->num_cap_flushing--; + drop = 1; + } + if (drop && ci->i_wrbuffer_ref) { + pr_info(" dropping dirty data for %p %lld\n", + inode, ceph_ino(inode)); + ci->i_wrbuffer_ref = 0; + ci->i_wrbuffer_ref_head = 0; + drop++; + } + spin_unlock(&mdsc->cap_dirty_lock); + } + spin_unlock(&inode->i_lock); + while (drop--) + iput(inode); return 0; } @@ -821,6 +853,7 @@ static void remove_session_caps(struct ceph_mds_session *session) dout("remove_session_caps on %p\n", session); iterate_session_caps(session, remove_session_caps_cb, NULL); BUG_ON(session->s_nr_caps > 0); + BUG_ON(!list_empty(&session->s_cap_flushing)); cleanup_cap_releases(session); } @@ -883,8 +916,8 @@ static int send_renew_caps(struct ceph_mds_client *mdsc, ceph_mds_state_name(state)); msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, ++session->s_renew_seq); - if (IS_ERR(msg)) - return PTR_ERR(msg); + if (!msg) + return -ENOMEM; ceph_con_send(&session->s_con, msg); return 0; } @@ -931,17 +964,15 @@ static int request_close_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_msg *msg; - int err = 0; dout("request_close_session mds%d state %s seq %lld\n", session->s_mds, session_state_name(session->s_state), session->s_seq); msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); - if (IS_ERR(msg)) - err = PTR_ERR(msg); - else - ceph_con_send(&session->s_con, msg); - return err; + if (!msg) + return -ENOMEM; + ceph_con_send(&session->s_con, msg); + return 0; } /* @@ -1035,9 +1066,9 @@ static int trim_caps(struct ceph_mds_client *mdsc, * * Called under s_mutex. */ -static int add_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session, - int extra) +int ceph_add_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + int extra) { struct ceph_msg *msg; struct ceph_mds_cap_release *head; @@ -1059,7 +1090,7 @@ static int add_cap_releases(struct ceph_mds_client *mdsc, while (session->s_num_cap_releases < session->s_nr_caps + extra) { spin_unlock(&session->s_cap_lock); msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, - 0, 0, NULL); + GFP_NOFS); if (!msg) goto out_unlocked; dout("add_cap_releases %p msg %p now %d\n", session, msg, @@ -1145,16 +1176,14 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) /* * called under s_mutex */ -static void send_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) +void ceph_send_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) { struct ceph_msg *msg; dout("send_cap_releases mds%d\n", session->s_mds); - while (1) { - spin_lock(&session->s_cap_lock); - if (list_empty(&session->s_cap_releases_done)) - break; + spin_lock(&session->s_cap_lock); + while (!list_empty(&session->s_cap_releases_done)) { msg = list_first_entry(&session->s_cap_releases_done, struct ceph_msg, list_head); list_del_init(&msg->list_head); @@ -1162,7 +1191,46 @@ static void send_cap_releases(struct ceph_mds_client *mdsc, msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); dout("send_cap_releases mds%d %p\n", session->s_mds, msg); ceph_con_send(&session->s_con, msg); + spin_lock(&session->s_cap_lock); + } + spin_unlock(&session->s_cap_lock); +} + +static void discard_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + struct ceph_msg *msg; + struct ceph_mds_cap_release *head; + unsigned num; + + dout("discard_cap_releases mds%d\n", session->s_mds); + spin_lock(&session->s_cap_lock); + + /* zero out the in-progress message */ + msg = list_first_entry(&session->s_cap_releases, + struct ceph_msg, list_head); + head = msg->front.iov_base; + num = le32_to_cpu(head->num); + dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); + head->num = cpu_to_le32(0); + session->s_num_cap_releases += num; + + /* requeue completed messages */ + while (!list_empty(&session->s_cap_releases_done)) { + msg = list_first_entry(&session->s_cap_releases_done, + struct ceph_msg, list_head); + list_del_init(&msg->list_head); + + head = msg->front.iov_base; + num = le32_to_cpu(head->num); + dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, + num); + session->s_num_cap_releases += num; + head->num = cpu_to_le32(0); + msg->front.iov_len = sizeof(*head); + list_add(&msg->list_head, &session->s_cap_releases); } + spin_unlock(&session->s_cap_lock); } @@ -1181,6 +1249,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) if (!req) return ERR_PTR(-ENOMEM); + mutex_init(&req->r_fill_mutex); req->r_started = jiffies; req->r_resend_mds = -1; INIT_LIST_HEAD(&req->r_unsafe_dir_item); @@ -1251,7 +1320,7 @@ retry: len += 1 + temp->d_name.len; temp = temp->d_parent; if (temp == NULL) { - pr_err("build_path_dentry corrupt dentry %p\n", dentry); + pr_err("build_path corrupt dentry %p\n", dentry); return ERR_PTR(-EINVAL); } } @@ -1267,7 +1336,7 @@ retry: struct inode *inode = temp->d_inode; if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { - dout("build_path_dentry path+%d: %p SNAPDIR\n", + dout("build_path path+%d: %p SNAPDIR\n", pos, temp); } else if (stop_on_nosnap && inode && ceph_snap(inode) == CEPH_NOSNAP) { @@ -1278,20 +1347,18 @@ retry: break; strncpy(path + pos, temp->d_name.name, temp->d_name.len); - dout("build_path_dentry path+%d: %p '%.*s'\n", - pos, temp, temp->d_name.len, path + pos); } if (pos) path[--pos] = '/'; temp = temp->d_parent; if (temp == NULL) { - pr_err("build_path_dentry corrupt dentry\n"); + pr_err("build_path corrupt dentry\n"); kfree(path); return ERR_PTR(-EINVAL); } } if (pos != 0) { - pr_err("build_path_dentry did not end path lookup where " + pr_err("build_path did not end path lookup where " "expected, namelen is %d, pos is %d\n", len, pos); /* presumably this is only possible if racing with a rename of one of the parent directories (we can not @@ -1303,7 +1370,7 @@ retry: *base = ceph_ino(temp->d_inode); *plen = len; - dout("build_path_dentry on %p %d built %llx '%.*s'\n", + dout("build_path on %p %d built %llx '%.*s'\n", dentry, atomic_read(&dentry->d_count), *base, len, path); return path; } @@ -1426,9 +1493,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, if (req->r_old_dentry_drop) len += req->r_old_dentry->d_name.len; - msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL); - if (IS_ERR(msg)) + msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS); + if (!msg) { + msg = ERR_PTR(-ENOMEM); goto out_free2; + } msg->hdr.tid = cpu_to_le64(req->r_tid); @@ -1517,9 +1586,9 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, } msg = create_request_message(mdsc, req, mds); if (IS_ERR(msg)) { - req->r_reply = ERR_PTR(PTR_ERR(msg)); + req->r_err = PTR_ERR(msg); complete_request(mdsc, req); - return -PTR_ERR(msg); + return PTR_ERR(msg); } req->r_request = msg; @@ -1552,7 +1621,7 @@ static int __do_request(struct ceph_mds_client *mdsc, int mds = -1; int err = -EAGAIN; - if (req->r_reply) + if (req->r_err || req->r_got_result) goto out; if (req->r_timeout && @@ -1609,7 +1678,7 @@ out: return err; finish: - req->r_reply = ERR_PTR(err); + req->r_err = err; complete_request(mdsc, req); goto out; } @@ -1630,10 +1699,9 @@ static void __wake_requests(struct ceph_mds_client *mdsc, /* * Wake up threads with requests pending for @mds, so that they can - * resubmit their requests to a possibly different mds. If @all is set, - * wake up if their requests has been forwarded to @mds, too. + * resubmit their requests to a possibly different mds. */ -static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all) +static void kick_requests(struct ceph_mds_client *mdsc, int mds) { struct ceph_mds_request *req; struct rb_node *p; @@ -1689,64 +1757,78 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, __register_request(mdsc, req, dir); __do_request(mdsc, req); - /* wait */ - if (!req->r_reply) { - mutex_unlock(&mdsc->mutex); - if (req->r_timeout) { - err = (long)wait_for_completion_interruptible_timeout( - &req->r_completion, req->r_timeout); - if (err == 0) - req->r_reply = ERR_PTR(-EIO); - else if (err < 0) - req->r_reply = ERR_PTR(err); - } else { - err = wait_for_completion_interruptible( - &req->r_completion); - if (err) - req->r_reply = ERR_PTR(err); - } - mutex_lock(&mdsc->mutex); + if (req->r_err) { + err = req->r_err; + __unregister_request(mdsc, req); + dout("do_request early error %d\n", err); + goto out; } - if (IS_ERR(req->r_reply)) { - err = PTR_ERR(req->r_reply); - req->r_reply = NULL; + /* wait */ + mutex_unlock(&mdsc->mutex); + dout("do_request waiting\n"); + if (req->r_timeout) { + err = (long)wait_for_completion_killable_timeout( + &req->r_completion, req->r_timeout); + if (err == 0) + err = -EIO; + } else { + err = wait_for_completion_killable(&req->r_completion); + } + dout("do_request waited, got %d\n", err); + mutex_lock(&mdsc->mutex); - if (err == -ERESTARTSYS) { - /* aborted */ - req->r_aborted = true; + /* only abort if we didn't race with a real reply */ + if (req->r_got_result) { + err = le32_to_cpu(req->r_reply_info.head->result); + } else if (err < 0) { + dout("aborted request %lld with %d\n", req->r_tid, err); - if (req->r_locked_dir && - (req->r_op & CEPH_MDS_OP_WRITE)) { - struct ceph_inode_info *ci = - ceph_inode(req->r_locked_dir); + /* + * ensure we aren't running concurrently with + * ceph_fill_trace or ceph_readdir_prepopulate, which + * rely on locks (dir mutex) held by our caller. + */ + mutex_lock(&req->r_fill_mutex); + req->r_err = err; + req->r_aborted = true; + mutex_unlock(&req->r_fill_mutex); - dout("aborted, clearing I_COMPLETE on %p\n", - req->r_locked_dir); - spin_lock(&req->r_locked_dir->i_lock); - ci->i_ceph_flags &= ~CEPH_I_COMPLETE; - ci->i_release_count++; - spin_unlock(&req->r_locked_dir->i_lock); - } - } else { - /* clean up this request */ - __unregister_request(mdsc, req); - if (!list_empty(&req->r_unsafe_item)) - list_del_init(&req->r_unsafe_item); - complete(&req->r_safe_completion); - } - } else if (req->r_err) { - err = req->r_err; + if (req->r_locked_dir && + (req->r_op & CEPH_MDS_OP_WRITE)) + ceph_invalidate_dir_request(req); } else { - err = le32_to_cpu(req->r_reply_info.head->result); + err = req->r_err; } - mutex_unlock(&mdsc->mutex); +out: + mutex_unlock(&mdsc->mutex); dout("do_request %p done, result %d\n", req, err); return err; } /* + * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS + * namespace request. + */ +void ceph_invalidate_dir_request(struct ceph_mds_request *req) +{ + struct inode *inode = req->r_locked_dir; + struct ceph_inode_info *ci = ceph_inode(inode); + + dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode); + spin_lock(&inode->i_lock); + ci->i_ceph_flags &= ~CEPH_I_COMPLETE; + ci->i_release_count++; + spin_unlock(&inode->i_lock); + + if (req->r_dentry) + ceph_invalidate_dentry_lease(req->r_dentry); + if (req->r_old_dentry) + ceph_invalidate_dentry_lease(req->r_old_dentry); +} + +/* * Handle mds reply. * * We take the session mutex and parse and process the reply immediately. @@ -1797,6 +1879,12 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) mutex_unlock(&mdsc->mutex); goto out; } + if (req->r_got_safe && !head->safe) { + pr_warning("got unsafe after safe on %llu from mds%d\n", + tid, mds); + mutex_unlock(&mdsc->mutex); + goto out; + } result = le32_to_cpu(head->result); @@ -1838,11 +1926,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) mutex_unlock(&mdsc->mutex); goto out; } - } - - BUG_ON(req->r_reply); - - if (!head->safe) { + } else { req->r_got_unsafe = true; list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); } @@ -1871,23 +1955,32 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) } /* insert trace into our cache */ + mutex_lock(&req->r_fill_mutex); err = ceph_fill_trace(mdsc->client->sb, req, req->r_session); if (err == 0) { if (result == 0 && rinfo->dir_nr) ceph_readdir_prepopulate(req, req->r_session); ceph_unreserve_caps(&req->r_caps_reservation); } + mutex_unlock(&req->r_fill_mutex); up_read(&mdsc->snap_rwsem); out_err: - if (err) { - req->r_err = err; + mutex_lock(&mdsc->mutex); + if (!req->r_aborted) { + if (err) { + req->r_err = err; + } else { + req->r_reply = msg; + ceph_msg_get(msg); + req->r_got_result = true; + } } else { - req->r_reply = msg; - ceph_msg_get(msg); + dout("reply arrived after request %lld was aborted\n", tid); } + mutex_unlock(&mdsc->mutex); - add_cap_releases(mdsc, req->r_session, -1); + ceph_add_cap_releases(mdsc, req->r_session, -1); mutex_unlock(&session->s_mutex); /* kick calling process */ @@ -1921,16 +2014,21 @@ static void handle_forward(struct ceph_mds_client *mdsc, mutex_lock(&mdsc->mutex); req = __lookup_request(mdsc, tid); if (!req) { - dout("forward %llu to mds%d - req dne\n", tid, next_mds); + dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); goto out; /* dup reply? */ } - if (fwd_seq <= req->r_num_fwd) { - dout("forward %llu to mds%d - old seq %d <= %d\n", + if (req->r_aborted) { + dout("forward tid %llu aborted, unregistering\n", tid); + __unregister_request(mdsc, req); + } else if (fwd_seq <= req->r_num_fwd) { + dout("forward tid %llu to mds%d - old seq %d <= %d\n", tid, next_mds, req->r_num_fwd, fwd_seq); } else { /* resend. forward race not possible; mds would drop */ - dout("forward %llu to mds%d (we resend)\n", tid, next_mds); + dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); + BUG_ON(req->r_err); + BUG_ON(req->r_got_result); req->r_num_fwd = fwd_seq; req->r_resend_mds = next_mds; put_request_session(req); @@ -1984,6 +2082,8 @@ static void handle_session(struct ceph_mds_session *session, switch (op) { case CEPH_SESSION_OPEN: + if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) + pr_info("mds%d reconnect success\n", session->s_mds); session->s_state = CEPH_MDS_SESSION_OPEN; renewed_caps(mdsc, session, 0); wake = 1; @@ -1997,10 +2097,12 @@ static void handle_session(struct ceph_mds_session *session, break; case CEPH_SESSION_CLOSE: + if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) + pr_info("mds%d reconnect denied\n", session->s_mds); remove_session_caps(session); wake = 1; /* for good measure */ complete(&mdsc->session_close_waiters); - kick_requests(mdsc, mds, 0); /* cur only */ + kick_requests(mdsc, mds); break; case CEPH_SESSION_STALE: @@ -2132,54 +2234,44 @@ out: * * called with mdsc->mutex held. */ -static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) +static void send_mds_reconnect(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) { - struct ceph_mds_session *session = NULL; struct ceph_msg *reply; struct rb_node *p; + int mds = session->s_mds; int err = -ENOMEM; struct ceph_pagelist *pagelist; - pr_info("reconnect to recovering mds%d\n", mds); + pr_info("mds%d reconnect start\n", mds); pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); if (!pagelist) goto fail_nopagelist; ceph_pagelist_init(pagelist); - reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL); - if (IS_ERR(reply)) { - err = PTR_ERR(reply); + reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS); + if (!reply) goto fail_nomsg; - } - - /* find session */ - session = __ceph_lookup_mds_session(mdsc, mds); - mutex_unlock(&mdsc->mutex); /* drop lock for duration */ - - if (session) { - mutex_lock(&session->s_mutex); - session->s_state = CEPH_MDS_SESSION_RECONNECTING; - session->s_seq = 0; + mutex_lock(&session->s_mutex); + session->s_state = CEPH_MDS_SESSION_RECONNECTING; + session->s_seq = 0; - ceph_con_open(&session->s_con, - ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); + ceph_con_open(&session->s_con, + ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); - /* replay unsafe requests */ - replay_unsafe_requests(mdsc, session); - } else { - dout("no session for mds%d, will send short reconnect\n", - mds); - } + /* replay unsafe requests */ + replay_unsafe_requests(mdsc, session); down_read(&mdsc->snap_rwsem); - if (!session) - goto send; dout("session %p state %s\n", session, session_state_name(session->s_state)); + /* drop old cap expires; we're about to reestablish that state */ + discard_cap_releases(mdsc, session); + /* traverse this session's caps */ err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); if (err) @@ -2208,36 +2300,29 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) goto fail; } -send: reply->pagelist = pagelist; reply->hdr.data_len = cpu_to_le32(pagelist->length); reply->nr_pages = calc_pages_for(0, pagelist->length); ceph_con_send(&session->s_con, reply); - session->s_state = CEPH_MDS_SESSION_OPEN; mutex_unlock(&session->s_mutex); mutex_lock(&mdsc->mutex); __wake_requests(mdsc, &session->s_waiting); mutex_unlock(&mdsc->mutex); - ceph_put_mds_session(session); - up_read(&mdsc->snap_rwsem); - mutex_lock(&mdsc->mutex); return; fail: ceph_msg_put(reply); up_read(&mdsc->snap_rwsem); mutex_unlock(&session->s_mutex); - ceph_put_mds_session(session); fail_nomsg: ceph_pagelist_release(pagelist); kfree(pagelist); fail_nopagelist: pr_err("error %d preparing reconnect for mds%d\n", err, mds); - mutex_lock(&mdsc->mutex); return; } @@ -2290,7 +2375,7 @@ static void check_new_map(struct ceph_mds_client *mdsc, } /* kick any requests waiting on the recovering mds */ - kick_requests(mdsc, i, 1); + kick_requests(mdsc, i); } else if (oldstate == newstate) { continue; /* nothing new with this mds */ } @@ -2299,22 +2384,21 @@ static void check_new_map(struct ceph_mds_client *mdsc, * send reconnect? */ if (s->s_state == CEPH_MDS_SESSION_RESTARTING && - newstate >= CEPH_MDS_STATE_RECONNECT) - send_mds_reconnect(mdsc, i); + newstate >= CEPH_MDS_STATE_RECONNECT) { + mutex_unlock(&mdsc->mutex); + send_mds_reconnect(mdsc, s); + mutex_lock(&mdsc->mutex); + } /* - * kick requests on any mds that has gone active. - * - * kick requests on cur or forwarder: we may have sent - * the request to mds1, mds1 told us it forwarded it - * to mds2, but then we learn mds1 failed and can't be - * sure it successfully forwarded our request before - * it died. + * kick request on any mds that has gone active. */ if (oldstate < CEPH_MDS_STATE_ACTIVE && newstate >= CEPH_MDS_STATE_ACTIVE) { - pr_info("mds%d reconnect completed\n", s->s_mds); - kick_requests(mdsc, i, 1); + if (oldstate != CEPH_MDS_STATE_CREATING && + oldstate != CEPH_MDS_STATE_STARTING) + pr_info("mds%d recovery completed\n", s->s_mds); + kick_requests(mdsc, i); ceph_kick_flushing_caps(mdsc, s); wake_up_session_caps(s, 1); } @@ -2349,6 +2433,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_dentry_info *di; int mds = session->s_mds; struct ceph_mds_lease *h = msg->front.iov_base; + u32 seq; struct ceph_vino vino; int mask; struct qstr dname; @@ -2362,6 +2447,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, vino.ino = le64_to_cpu(h->ino); vino.snap = CEPH_NOSNAP; mask = le16_to_cpu(h->mask); + seq = le32_to_cpu(h->seq); dname.name = (void *)h + sizeof(*h) + sizeof(u32); dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); if (dname.len != get_unaligned_le32(h+1)) @@ -2372,8 +2458,9 @@ static void handle_lease(struct ceph_mds_client *mdsc, /* lookup inode */ inode = ceph_find_inode(sb, vino); - dout("handle_lease '%s', mask %d, ino %llx %p\n", - ceph_lease_op_name(h->action), mask, vino.ino, inode); + dout("handle_lease %s, mask %d, ino %llx %p %.*s\n", + ceph_lease_op_name(h->action), mask, vino.ino, inode, + dname.len, dname.name); if (inode == NULL) { dout("handle_lease no inode %llx\n", vino.ino); goto release; @@ -2398,7 +2485,8 @@ static void handle_lease(struct ceph_mds_client *mdsc, switch (h->action) { case CEPH_MDS_LEASE_REVOKE: if (di && di->lease_session == session) { - h->seq = cpu_to_le32(di->lease_seq); + if (ceph_seq_cmp(di->lease_seq, seq) > 0) + h->seq = cpu_to_le32(di->lease_seq); __ceph_mdsc_drop_dentry_lease(dentry); } release = 1; @@ -2412,7 +2500,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, unsigned long duration = le32_to_cpu(h->duration_ms) * HZ / 1000; - di->lease_seq = le32_to_cpu(h->seq); + di->lease_seq = seq; dentry->d_time = di->lease_renew_from + duration; di->lease_renew_after = di->lease_renew_from + (duration >> 1); @@ -2457,12 +2545,12 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, dnamelen = dentry->d_name.len; len += dnamelen; - msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL); - if (IS_ERR(msg)) + msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS); + if (!msg) return; lease = msg->front.iov_base; lease->action = action; - lease->mask = cpu_to_le16(CEPH_LOCK_DN); + lease->mask = cpu_to_le16(1); lease->ino = cpu_to_le64(ceph_vino(inode).ino); lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); lease->seq = cpu_to_le32(seq); @@ -2492,7 +2580,7 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, BUG_ON(inode == NULL); BUG_ON(dentry == NULL); - BUG_ON(mask != CEPH_LOCK_DN); + BUG_ON(mask == 0); /* is dentry lease valid? */ spin_lock(&dentry->d_lock); @@ -2602,8 +2690,10 @@ static void delayed_work(struct work_struct *work) send_renew_caps(mdsc, s); else ceph_con_keepalive(&s->s_con); - add_cap_releases(mdsc, s, -1); - send_cap_releases(mdsc, s); + ceph_add_cap_releases(mdsc, s, -1); + if (s->s_state == CEPH_MDS_SESSION_OPEN || + s->s_state == CEPH_MDS_SESSION_HUNG) + ceph_send_cap_releases(mdsc, s); mutex_unlock(&s->s_mutex); ceph_put_mds_session(s); @@ -2620,6 +2710,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client) mdsc->client = client; mutex_init(&mdsc->mutex); mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); + if (mdsc->mdsmap == NULL) + return -ENOMEM; + init_completion(&mdsc->safe_umount_waiters); init_completion(&mdsc->session_close_waiters); INIT_LIST_HEAD(&mdsc->waiting_for_map); @@ -2645,6 +2738,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client) init_waitqueue_head(&mdsc->cap_flushing_wq); spin_lock_init(&mdsc->dentry_lru_lock); INIT_LIST_HEAD(&mdsc->dentry_lru); + return 0; } @@ -2740,6 +2834,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) { u64 want_tid, want_flush; + if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN) + return; + dout("sync\n"); mutex_lock(&mdsc->mutex); want_tid = mdsc->last_tid; @@ -2922,9 +3019,10 @@ static void con_put(struct ceph_connection *con) static void peer_reset(struct ceph_connection *con) { struct ceph_mds_session *s = con->private; + struct ceph_mds_client *mdsc = s->s_mdsc; - pr_err("mds%d gave us the boot. IMPLEMENT RECONNECT.\n", - s->s_mds); + pr_warning("mds%d closed our session\n", s->s_mds); + send_mds_reconnect(mdsc, s); } static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) @@ -3031,7 +3129,7 @@ static int invalidate_authorizer(struct ceph_connection *con) return ceph_monc_validate_auth(&mdsc->client->monc); } -const static struct ceph_connection_operations mds_con_ops = { +static const struct ceph_connection_operations mds_con_ops = { .get = con_get, .put = con_put, .dispatch = dispatch, |