summaryrefslogtreecommitdiffstats
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c235
1 files changed, 188 insertions, 47 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index dd440bd..a75ddbf 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -3,6 +3,7 @@
#include <linux/wait.h>
#include <linux/slab.h>
#include <linux/sched.h>
+#include <linux/smp_lock.h>
#include "mds_client.h"
#include "mon_client.h"
@@ -37,6 +38,11 @@
* are no longer valid.
*/
+struct ceph_reconnect_state {
+ struct ceph_pagelist *pagelist;
+ bool flock;
+};
+
static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head);
@@ -449,7 +455,7 @@ void ceph_mdsc_release_request(struct kref *kref)
kfree(req->r_path1);
kfree(req->r_path2);
put_request_session(req);
- ceph_unreserve_caps(&req->r_caps_reservation);
+ ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
kfree(req);
}
@@ -512,7 +518,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
{
req->r_tid = ++mdsc->last_tid;
if (req->r_num_caps)
- ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
+ ceph_reserve_caps(mdsc, &req->r_caps_reservation,
+ req->r_num_caps);
dout("__register_request %p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req);
__insert_request(mdsc, req);
@@ -704,6 +711,51 @@ static int __open_session(struct ceph_mds_client *mdsc,
}
/*
+ * open sessions for any export targets for the given mds
+ *
+ * called under mdsc->mutex
+ */
+static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_mds_info *mi;
+ struct ceph_mds_session *ts;
+ int i, mds = session->s_mds;
+ int target;
+
+ if (mds >= mdsc->mdsmap->m_max_mds)
+ return;
+ mi = &mdsc->mdsmap->m_info[mds];
+ dout("open_export_target_sessions for mds%d (%d targets)\n",
+ session->s_mds, mi->num_export_targets);
+
+ for (i = 0; i < mi->num_export_targets; i++) {
+ target = mi->export_targets[i];
+ ts = __ceph_lookup_mds_session(mdsc, target);
+ if (!ts) {
+ ts = register_session(mdsc, target);
+ if (IS_ERR(ts))
+ return;
+ }
+ if (session->s_state == CEPH_MDS_SESSION_NEW ||
+ session->s_state == CEPH_MDS_SESSION_CLOSING)
+ __open_session(mdsc, session);
+ else
+ dout(" mds%d target mds%d %p is %s\n", session->s_mds,
+ i, ts, session_state_name(ts->s_state));
+ ceph_put_mds_session(ts);
+ }
+}
+
+void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ mutex_lock(&mdsc->mutex);
+ __open_export_target_sessions(mdsc, session);
+ mutex_unlock(&mdsc->mutex);
+}
+
+/*
* session caps
*/
@@ -764,7 +816,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
last_inode = NULL;
}
if (old_cap) {
- ceph_put_cap(old_cap);
+ ceph_put_cap(session->s_mdsc, old_cap);
old_cap = NULL;
}
@@ -793,7 +845,7 @@ out:
if (last_inode)
iput(last_inode);
if (old_cap)
- ceph_put_cap(old_cap);
+ ceph_put_cap(session->s_mdsc, old_cap);
return ret;
}
@@ -1067,15 +1119,16 @@ static int trim_caps(struct ceph_mds_client *mdsc,
* Called under s_mutex.
*/
int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session,
- int extra)
+ struct ceph_mds_session *session)
{
- struct ceph_msg *msg;
+ struct ceph_msg *msg, *partial = NULL;
struct ceph_mds_cap_release *head;
int err = -ENOMEM;
+ int extra = mdsc->client->mount_args->cap_release_safety;
+ int num;
- if (extra < 0)
- extra = mdsc->client->mount_args->cap_release_safety;
+ dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
+ extra);
spin_lock(&session->s_cap_lock);
@@ -1084,9 +1137,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_msg,
list_head);
head = msg->front.iov_base;
- extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
+ num = le32_to_cpu(head->num);
+ if (num) {
+ dout(" partial %p with (%d/%d)\n", msg, num,
+ (int)CEPH_CAPS_PER_RELEASE);
+ extra += CEPH_CAPS_PER_RELEASE - num;
+ partial = msg;
+ }
}
-
while (session->s_num_cap_releases < session->s_nr_caps + extra) {
spin_unlock(&session->s_cap_lock);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
@@ -1103,19 +1161,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
}
- if (!list_empty(&session->s_cap_releases)) {
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg,
- list_head);
- head = msg->front.iov_base;
- if (head->num) {
- dout(" queueing non-full %p (%d)\n", msg,
- le32_to_cpu(head->num));
- list_move_tail(&msg->list_head,
- &session->s_cap_releases_done);
- session->s_num_cap_releases -=
- CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
- }
+ if (partial) {
+ head = partial->front.iov_base;
+ num = le32_to_cpu(head->num);
+ dout(" queueing partial %p with %d/%d\n", partial, num,
+ (int)CEPH_CAPS_PER_RELEASE);
+ list_move_tail(&partial->list_head,
+ &session->s_cap_releases_done);
+ session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
}
err = 0;
spin_unlock(&session->s_cap_lock);
@@ -1250,6 +1303,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
return ERR_PTR(-ENOMEM);
mutex_init(&req->r_fill_mutex);
+ req->r_mdsc = mdsc;
req->r_started = jiffies;
req->r_resend_mds = -1;
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1580,6 +1634,15 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
req->r_mds = mds;
req->r_attempts++;
+ if (req->r_inode) {
+ struct ceph_cap *cap =
+ ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
+
+ if (cap)
+ req->r_sent_on_mseq = cap->mseq;
+ else
+ req->r_sent_on_mseq = -1;
+ }
dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
@@ -1914,21 +1977,40 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
result = le32_to_cpu(head->result);
/*
- * Tolerate 2 consecutive ESTALEs from the same mds.
- * FIXME: we should be looking at the cap migrate_seq.
+ * Handle an ESTALE
+ * if we're not talking to the authority, send to them
+ * if the authority has changed while we weren't looking,
+ * send to new authority
+ * Otherwise we just have to return an ESTALE
*/
if (result == -ESTALE) {
- req->r_direct_mode = USE_AUTH_MDS;
- req->r_num_stale++;
- if (req->r_num_stale <= 2) {
+ dout("got ESTALE on request %llu", req->r_tid);
+ if (!req->r_inode) {
+ /* do nothing; not an authority problem */
+ } else if (req->r_direct_mode != USE_AUTH_MDS) {
+ dout("not using auth, setting for that now");
+ req->r_direct_mode = USE_AUTH_MDS;
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
+ } else {
+ struct ceph_inode_info *ci = ceph_inode(req->r_inode);
+ struct ceph_cap *cap =
+ ceph_get_cap_for_mds(ci, req->r_mds);;
+
+ dout("already using auth");
+ if ((!cap || cap != ci->i_auth_cap) ||
+ (cap->mseq != req->r_sent_on_mseq)) {
+ dout("but cap changed, so resending");
+ __do_request(mdsc, req);
+ mutex_unlock(&mdsc->mutex);
+ goto out;
+ }
}
- } else {
- req->r_num_stale = 0;
+ dout("have to return ESTALE on request %llu", req->r_tid);
}
+
if (head->safe) {
req->r_got_safe = true;
__unregister_request(mdsc, req);
@@ -1985,7 +2067,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (err == 0) {
if (result == 0 && rinfo->dir_nr)
ceph_readdir_prepopulate(req, req->r_session);
- ceph_unreserve_caps(&req->r_caps_reservation);
+ ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
mutex_unlock(&req->r_fill_mutex);
@@ -2005,7 +2087,7 @@ out_err:
}
mutex_unlock(&mdsc->mutex);
- ceph_add_cap_releases(mdsc, req->r_session, -1);
+ ceph_add_cap_releases(mdsc, req->r_session);
mutex_unlock(&session->s_mutex);
/* kick calling process */
@@ -2193,9 +2275,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
- struct ceph_mds_cap_reconnect rec;
+ union {
+ struct ceph_mds_cap_reconnect v2;
+ struct ceph_mds_cap_reconnect_v1 v1;
+ } rec;
+ size_t reclen;
struct ceph_inode_info *ci;
- struct ceph_pagelist *pagelist = arg;
+ struct ceph_reconnect_state *recon_state = arg;
+ struct ceph_pagelist *pagelist = recon_state->pagelist;
char *path;
int pathlen, err;
u64 pathbase;
@@ -2228,17 +2315,44 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
spin_lock(&inode->i_lock);
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
- rec.cap_id = cpu_to_le64(cap->cap_id);
- rec.pathbase = cpu_to_le64(pathbase);
- rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
- rec.issued = cpu_to_le32(cap->issued);
- rec.size = cpu_to_le64(inode->i_size);
- ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
- ceph_encode_timespec(&rec.atime, &inode->i_atime);
- rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+
+ if (recon_state->flock) {
+ rec.v2.cap_id = cpu_to_le64(cap->cap_id);
+ rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+ rec.v2.issued = cpu_to_le32(cap->issued);
+ rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+ rec.v2.pathbase = cpu_to_le64(pathbase);
+ rec.v2.flock_len = 0;
+ reclen = sizeof(rec.v2);
+ } else {
+ rec.v1.cap_id = cpu_to_le64(cap->cap_id);
+ rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+ rec.v1.issued = cpu_to_le32(cap->issued);
+ rec.v1.size = cpu_to_le64(inode->i_size);
+ ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
+ ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
+ rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+ rec.v1.pathbase = cpu_to_le64(pathbase);
+ reclen = sizeof(rec.v1);
+ }
spin_unlock(&inode->i_lock);
- err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
+ if (recon_state->flock) {
+ int num_fcntl_locks, num_flock_locks;
+
+ lock_kernel();
+ ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
+ rec.v2.flock_len = (2*sizeof(u32) +
+ (num_fcntl_locks+num_flock_locks) *
+ sizeof(struct ceph_filelock));
+
+ err = ceph_pagelist_append(pagelist, &rec, reclen);
+ if (!err)
+ err = ceph_encode_locks(inode, pagelist,
+ num_fcntl_locks,
+ num_flock_locks);
+ unlock_kernel();
+ }
out:
kfree(path);
@@ -2267,6 +2381,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
int mds = session->s_mds;
int err = -ENOMEM;
struct ceph_pagelist *pagelist;
+ struct ceph_reconnect_state recon_state;
pr_info("mds%d reconnect start\n", mds);
@@ -2301,7 +2416,10 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
if (err)
goto fail;
- err = iterate_session_caps(session, encode_caps_cb, pagelist);
+
+ recon_state.pagelist = pagelist;
+ recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
+ err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
goto fail;
@@ -2326,6 +2444,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
}
reply->pagelist = pagelist;
+ if (recon_state.flock)
+ reply->hdr.version = cpu_to_le16(2);
reply->hdr.data_len = cpu_to_le32(pagelist->length);
reply->nr_pages = calc_pages_for(0, pagelist->length);
ceph_con_send(&session->s_con, reply);
@@ -2376,9 +2496,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
oldstate = ceph_mdsmap_get_state(oldmap, i);
newstate = ceph_mdsmap_get_state(newmap, i);
- dout("check_new_map mds%d state %s -> %s (session %s)\n",
+ dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
i, ceph_mds_state_name(oldstate),
+ ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
ceph_mds_state_name(newstate),
+ ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
session_state_name(s->s_state));
if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
@@ -2428,6 +2550,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
wake_up_session_caps(s, 1);
}
}
+
+ for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
+ s = mdsc->sessions[i];
+ if (!s)
+ continue;
+ if (!ceph_mdsmap_is_laggy(newmap, i))
+ continue;
+ if (s->s_state == CEPH_MDS_SESSION_OPEN ||
+ s->s_state == CEPH_MDS_SESSION_HUNG ||
+ s->s_state == CEPH_MDS_SESSION_CLOSING) {
+ dout(" connecting to export targets of laggy mds%d\n",
+ i);
+ __open_export_target_sessions(mdsc, s);
+ }
+ }
}
@@ -2715,7 +2852,7 @@ static void delayed_work(struct work_struct *work)
send_renew_caps(mdsc, s);
else
ceph_con_keepalive(&s->s_con);
- ceph_add_cap_releases(mdsc, s, -1);
+ ceph_add_cap_releases(mdsc, s);
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
s->s_state == CEPH_MDS_SESSION_HUNG)
ceph_send_cap_releases(mdsc, s);
@@ -2764,6 +2901,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
spin_lock_init(&mdsc->dentry_lru_lock);
INIT_LIST_HEAD(&mdsc->dentry_lru);
+ ceph_caps_init(mdsc);
+ ceph_adjust_min_caps(mdsc, client->min_caps);
+
return 0;
}
@@ -2959,6 +3099,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
if (mdsc->mdsmap)
ceph_mdsmap_destroy(mdsc->mdsmap);
kfree(mdsc->sessions);
+ ceph_caps_finalize(mdsc);
}
OpenPOWER on IntegriCloud