From 9280be24dc9c7aaee230de3ed33f8357386de9a2 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 14 Oct 2014 10:33:35 +0800 Subject: ceph: fix file lock interruption When a lock operation is interrupted, current code sends a unlock request to MDS to undo the lock operation. This method does not work as expected because the unlock request can drop locks that have already been acquired. The fix is use the newly introduced CEPH_LOCK_FCNTL_INTR/CEPH_LOCK_FLOCK_INTR requests to interrupt blocked file lock request. These requests do not drop locks that have alread been acquired, they only interrupt blocked file lock request. Signed-off-by: Yan, Zheng --- fs/ceph/locks.c | 64 ++++++++++++++++++++++++++++++++++++++++++++-------- fs/ceph/mds_client.c | 2 ++ fs/ceph/mds_client.h | 6 +++++ 3 files changed, 62 insertions(+), 10 deletions(-) (limited to 'fs') diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c index fbc39c4..c35c5c6 100644 --- a/fs/ceph/locks.c +++ b/fs/ceph/locks.c @@ -9,6 +9,8 @@ #include static u64 lock_secret; +static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req); static inline u64 secure_addr(void *addr) { @@ -40,6 +42,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, u64 length = 0; u64 owner; + if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK) + wait = 0; + req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS); if (IS_ERR(req)) return PTR_ERR(req); @@ -68,6 +73,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, req->r_args.filelock_change.length = cpu_to_le64(length); req->r_args.filelock_change.wait = wait; + if (wait) + req->r_wait_for_completion = ceph_lock_wait_for_completion; + err = ceph_mdsc_do_request(mdsc, inode, req); if (operation == CEPH_MDS_OP_GETFILELOCK) { @@ -96,6 +104,52 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, return err; } +static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req) +{ + struct ceph_mds_request *intr_req; + struct inode *inode = req->r_inode; + int err, lock_type; + + BUG_ON(req->r_op != CEPH_MDS_OP_SETFILELOCK); + if (req->r_args.filelock_change.rule == CEPH_LOCK_FCNTL) + lock_type = CEPH_LOCK_FCNTL_INTR; + else if (req->r_args.filelock_change.rule == CEPH_LOCK_FLOCK) + lock_type = CEPH_LOCK_FLOCK_INTR; + else + BUG_ON(1); + BUG_ON(req->r_args.filelock_change.type == CEPH_LOCK_UNLOCK); + + err = wait_for_completion_interruptible(&req->r_completion); + if (!err) + return 0; + + dout("ceph_lock_wait_for_completion: request %llu was interrupted\n", + req->r_tid); + + intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK, + USE_AUTH_MDS); + if (IS_ERR(intr_req)) + return PTR_ERR(intr_req); + + intr_req->r_inode = inode; + ihold(inode); + intr_req->r_num_caps = 1; + + intr_req->r_args.filelock_change = req->r_args.filelock_change; + intr_req->r_args.filelock_change.rule = lock_type; + intr_req->r_args.filelock_change.type = CEPH_LOCK_UNLOCK; + + err = ceph_mdsc_do_request(mdsc, inode, intr_req); + ceph_mdsc_put_request(intr_req); + + if (err && err != -ERESTARTSYS) + return err; + + wait_for_completion(&req->r_completion); + return 0; +} + /** * Attempt to set an fcntl lock. * For now, this just goes away to the server. Later it may be more awesome. @@ -143,11 +197,6 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) err); } } - - } else if (err == -ERESTARTSYS) { - dout("undoing lock\n"); - ceph_lock_message(CEPH_LOCK_FCNTL, op, file, - CEPH_LOCK_UNLOCK, 0, fl); } return err; } @@ -186,11 +235,6 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) file, CEPH_LOCK_UNLOCK, 0, fl); dout("got %d on flock_lock_file_wait, undid lock", err); } - } else if (err == -ERESTARTSYS) { - dout("undoing lock\n"); - ceph_lock_message(CEPH_LOCK_FLOCK, - CEPH_MDS_OP_SETFILELOCK, - file, CEPH_LOCK_UNLOCK, 0, fl); } return err; } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index a92d3f5..5a47ed7 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2208,6 +2208,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, &req->r_completion, req->r_timeout); if (err == 0) err = -EIO; + } else if (req->r_wait_for_completion) { + err = req->r_wait_for_completion(mdsc, req); } else { err = wait_for_completion_killable(&req->r_completion); } diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 3288359..230bda7 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -166,6 +166,11 @@ struct ceph_mds_client; */ typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc, struct ceph_mds_request *req); +/* + * wait for request completion callback + */ +typedef int (*ceph_mds_request_wait_callback_t) (struct ceph_mds_client *mdsc, + struct ceph_mds_request *req); /* * an in-flight mds request @@ -239,6 +244,7 @@ struct ceph_mds_request { struct completion r_completion; struct completion r_safe_completion; ceph_mds_request_callback_t r_callback; + ceph_mds_request_wait_callback_t r_wait_for_completion; struct list_head r_unsafe_item; /* per-session unsafe list item */ bool r_got_unsafe, r_got_safe, r_got_result; -- cgit v1.1 From 70db4f3629b3476cf506be869ef9d15688d2d44a Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 21 Oct 2014 18:09:56 -0700 Subject: ceph: introduce a new inode flag indicating if cached dentries are ordered After creating/deleting/renaming file, offsets of sibling dentries may change. So we can not use cached dentries to satisfy readdir. But we can still use the cached dentries to conclude -ENOENT for lookup. This patch introduces a new inode flag indicating if child dentries are ordered. The flag is set at the same time marking a directory complete. After creating/deleting/renaming file, we clear the flag on directory inode. This prevents ceph_readdir() from using cached dentries to satisfy readdir syscall. Signed-off-by: Yan, Zheng --- fs/ceph/dir.c | 23 +++++++++++++++-------- fs/ceph/inode.c | 13 ++++++++----- fs/ceph/super.h | 38 ++++++++++++++++++++++++++++++++------ 3 files changed, 55 insertions(+), 19 deletions(-) (limited to 'fs') diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index e6d63f8..6526199 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -183,7 +183,7 @@ more: spin_unlock(&parent->d_lock); /* make sure a dentry wasn't dropped while we didn't have parent lock */ - if (!ceph_dir_is_complete(dir)) { + if (!ceph_dir_is_complete_ordered(dir)) { dout(" lost dir complete on %p; falling back to mds\n", dir); dput(dentry); err = -EAGAIN; @@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* always start with . and .. */ if (ctx->pos == 0) { - /* note dir version at start of readdir so we can tell - * if any dentries get dropped */ - fi->dir_release_count = atomic_read(&ci->i_release_count); - dout("readdir off 0 -> '.'\n"); if (!dir_emit(ctx, ".", 1, ceph_translate_ino(inode->i_sb, inode->i_ino), @@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) if ((ctx->pos == 2 || fi->dentry) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && - __ceph_dir_is_complete(ci) && + __ceph_dir_is_complete_ordered(ci) && __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { u32 shared_gen = ci->i_shared_gen; spin_unlock(&ci->i_ceph_lock); @@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* proceed with a normal readdir */ + if (ctx->pos == 2) { + /* note dir version at start of readdir so we can tell + * if any dentries get dropped */ + fi->dir_release_count = atomic_read(&ci->i_release_count); + fi->dir_ordered_count = ci->i_ordered_count; + } + more: /* do we have the correct frag content buffered? */ if (fi->frag != frag || fi->last_readdir == NULL) { @@ -446,8 +449,12 @@ more: */ spin_lock(&ci->i_ceph_lock); if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { - dout(" marking %p complete\n", inode); - __ceph_dir_set_complete(ci, fi->dir_release_count); + if (ci->i_ordered_count == fi->dir_ordered_count) + dout(" marking %p complete and ordered\n", inode); + else + dout(" marking %p complete\n", inode); + __ceph_dir_set_complete(ci, fi->dir_release_count, + fi->dir_ordered_count); } spin_unlock(&ci->i_ceph_lock); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 7b61390..72607c1 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -389,6 +389,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_version = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; + ci->i_ordered_count = 0; atomic_set(&ci->i_release_count, 1); atomic_set(&ci->i_complete_count, 0); ci->i_symlink = NULL; @@ -845,7 +846,8 @@ static int fill_inode(struct inode *inode, (issued & CEPH_CAP_FILE_EXCL) == 0 && !__ceph_dir_is_complete(ci)) { dout(" marking %p complete (empty)\n", inode); - __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); + __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count), + ci->i_ordered_count); } /* were we issued a capability? */ @@ -1206,8 +1208,8 @@ retry_lookup: ceph_invalidate_dentry_lease(dn); /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_complete(dir); - ceph_dir_clear_complete(olddir); + ceph_dir_clear_ordered(dir); + ceph_dir_clear_ordered(olddir); dout("dn %p gets new offset %lld\n", req->r_old_dentry, ceph_dentry(req->r_old_dentry)->offset); @@ -1219,6 +1221,7 @@ retry_lookup: if (!rinfo->head->is_target) { dout("fill_trace null dentry\n"); if (dn->d_inode) { + ceph_dir_clear_ordered(dir); dout("d_delete %p\n", dn); d_delete(dn); } else { @@ -1235,7 +1238,7 @@ retry_lookup: /* attach proper inode */ if (!dn->d_inode) { - ceph_dir_clear_complete(dir); + ceph_dir_clear_ordered(dir); ihold(in); dn = splice_dentry(dn, in, &have_lease); if (IS_ERR(dn)) { @@ -1265,7 +1268,7 @@ retry_lookup: BUG_ON(!dir); BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR); dout(" linking snapped dir %p to dn %p\n", in, dn); - ceph_dir_clear_complete(dir); + ceph_dir_clear_ordered(dir); ihold(in); dn = splice_dentry(dn, in, NULL); if (IS_ERR(dn)) { diff --git a/fs/ceph/super.h b/fs/ceph/super.h index b82f507..aca2287 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -256,6 +256,7 @@ struct ceph_inode_info { u32 i_time_warp_seq; unsigned i_ceph_flags; + int i_ordered_count; atomic_t i_release_count; atomic_t i_complete_count; @@ -434,14 +435,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, /* * Ceph inode. */ -#define CEPH_I_NODELAY 4 /* do not delay cap release */ -#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ -#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ +#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */ +#define CEPH_I_NODELAY 4 /* do not delay cap release */ +#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ +#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, - int release_count) + int release_count, int ordered_count) { atomic_set(&ci->i_complete_count, release_count); + if (ci->i_ordered_count == ordered_count) + ci->i_ceph_flags |= CEPH_I_DIR_ORDERED; + else + ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; } static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) @@ -455,16 +461,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci) atomic_read(&ci->i_release_count); } +static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci) +{ + return __ceph_dir_is_complete(ci) && + (ci->i_ceph_flags & CEPH_I_DIR_ORDERED); +} + static inline void ceph_dir_clear_complete(struct inode *inode) { __ceph_dir_clear_complete(ceph_inode(inode)); } -static inline bool ceph_dir_is_complete(struct inode *inode) +static inline void ceph_dir_clear_ordered(struct inode *inode) { - return __ceph_dir_is_complete(ceph_inode(inode)); + struct ceph_inode_info *ci = ceph_inode(inode); + spin_lock(&ci->i_ceph_lock); + ci->i_ordered_count++; + ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; + spin_unlock(&ci->i_ceph_lock); } +static inline bool ceph_dir_is_complete_ordered(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + bool ret; + spin_lock(&ci->i_ceph_lock); + ret = __ceph_dir_is_complete_ordered(ci); + spin_unlock(&ci->i_ceph_lock); + return ret; +} /* find a specific frag @f */ extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, @@ -580,6 +605,7 @@ struct ceph_file_info { char *last_name; /* last entry in previous chunk */ struct dentry *dentry; /* next dentry (for dcache readdir) */ int dir_release_count; + int dir_ordered_count; /* used for -o dirstat read() on directory thing */ char *dir_info; -- cgit v1.1 From e96a650a8174e20112b400e72e0b2429aa66de20 Mon Sep 17 00:00:00 2001 From: SF Markus Elfring Date: Sun, 2 Nov 2014 15:20:59 +0100 Subject: ceph, rbd: delete unnecessary checks before two function calls The functions ceph_put_snap_context() and iput() test whether their argument is NULL and then return immediately. Thus the test around the call is not needed. This issue was detected by using the Coccinelle software. Signed-off-by: Markus Elfring [idryomov@redhat.com: squashed rbd.c hunk, changelog] Signed-off-by: Ilya Dryomov --- fs/ceph/caps.c | 3 +-- fs/ceph/mds_client.c | 6 ++---- fs/ceph/snap.c | 9 +++------ 3 files changed, 6 insertions(+), 12 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index cefca66..eb1bf1f 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -3137,8 +3137,7 @@ flush_cap_releases: done: mutex_unlock(&session->s_mutex); done_unlocked: - if (inode) - iput(inode); + iput(inode); return; bad: diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 5a47ed7..fc74617 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -524,8 +524,7 @@ void ceph_mdsc_release_request(struct kref *kref) } if (req->r_locked_dir) ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); - if (req->r_target_inode) - iput(req->r_target_inode); + iput(req->r_target_inode); if (req->r_dentry) dput(req->r_dentry); if (req->r_old_dentry) @@ -1066,8 +1065,7 @@ out: session->s_cap_iterator = NULL; spin_unlock(&session->s_cap_lock); - if (last_inode) - iput(last_inode); + iput(last_inode); if (old_cap) ceph_put_cap(session->s_mdsc, old_cap); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index f01645a..c1cc993 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -365,8 +365,7 @@ static int build_snap_context(struct ceph_snap_realm *realm) realm->ino, realm, snapc, snapc->seq, (unsigned int) snapc->num_snaps); - if (realm->cached_context) - ceph_put_snap_context(realm->cached_context); + ceph_put_snap_context(realm->cached_context); realm->cached_context = snapc; return 0; @@ -590,15 +589,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) if (!inode) continue; spin_unlock(&realm->inodes_with_caps_lock); - if (lastinode) - iput(lastinode); + iput(lastinode); lastinode = inode; ceph_queue_cap_snap(ci); spin_lock(&realm->inodes_with_caps_lock); } spin_unlock(&realm->inodes_with_caps_lock); - if (lastinode) - iput(lastinode); + iput(lastinode); list_for_each_entry(child, &realm->children, child_item) { dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n", -- cgit v1.1 From 33d07337962c7bbd2fd5cf7f1106735c9507fbe2 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 4 Nov 2014 16:33:37 +0800 Subject: libceph: message signature support Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'fs') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index fc74617..9f00853 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3744,6 +3744,20 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, return msg; } +static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) +{ + struct ceph_mds_session *s = con->private; + struct ceph_auth_handshake *auth = &s->s_auth; + return ceph_auth_sign_message(auth, msg); +} + +static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) +{ + struct ceph_mds_session *s = con->private; + struct ceph_auth_handshake *auth = &s->s_auth; + return ceph_auth_check_message_signature(auth, msg); +} + static const struct ceph_connection_operations mds_con_ops = { .get = con_get, .put = con_put, @@ -3753,6 +3767,8 @@ static const struct ceph_connection_operations mds_con_ops = { .invalidate_authorizer = invalidate_authorizer, .peer_reset = peer_reset, .alloc_msg = mds_alloc_msg, + .sign_message = sign_message, + .check_message_signature = check_message_signature, }; /* eof */ -- cgit v1.1 From 7cfa0313d0dc52e4da9f196f8ad5bfdf266fc1fb Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 30 Oct 2014 17:15:26 +0000 Subject: ceph: message versioning fixes There were two places we were assigning version in host byte order instead of network byte order. Also in MSG_CLIENT_SESSION we weren't setting compat_version in the header to reflect continued compatability with older MDSs. Fixes: http://tracker.ceph.com/issues/9945 Signed-off-by: John Spray Reviewed-by: Sage Weil --- fs/ceph/mds_client.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'fs') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 9f00853..387a489 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -860,8 +860,11 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 /* * Serialize client metadata into waiting buffer space, using * the format that userspace expects for map + * + * ClientSession messages with metadata are v2 */ - msg->hdr.version = 2; /* ClientSession messages with metadata are v2 */ + msg->hdr.version = cpu_to_le16(2); + msg->hdr.compat_version = cpu_to_le16(1); /* The write pointer, following the session_head structure */ p = msg->front.iov_base + sizeof(*h); @@ -1872,7 +1875,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, goto out_free2; } - msg->hdr.version = 2; + msg->hdr.version = cpu_to_le16(2); msg->hdr.tid = cpu_to_le64(req->r_tid); head = msg->front.iov_base; -- cgit v1.1 From 97c85a828f36bbfffe9d77b977b65a5872b6cad4 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 6 Nov 2014 15:09:41 +0800 Subject: ceph: introduce global empty snap context Current snaphost code does not properly handle moving inode from one empty snap realm to another empty snap realm. After changing inode's snap realm, some dirty pages' snap context can be not equal to inode's i_head_snap. This can trigger BUG() in ceph_put_wrbuffer_cap_refs() The fix is introduce a global empty snap context for all empty snap realm. This avoids triggering the BUG() for filesystem with no snapshot. Fixes: http://tracker.ceph.com/issues/9928 Signed-off-by: Yan, Zheng Reviewed-by: Ilya Dryomov --- fs/ceph/snap.c | 26 +++++++++++++++++++++++++- fs/ceph/super.c | 10 ++++++++-- fs/ceph/super.h | 2 ++ 3 files changed, 35 insertions(+), 3 deletions(-) (limited to 'fs') diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index c1cc993..f64576e 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -288,6 +288,9 @@ static int cmpu64_rev(const void *a, const void *b) return 0; } + +static struct ceph_snap_context *empty_snapc; + /* * build the snap context for a given realm. */ @@ -328,6 +331,12 @@ static int build_snap_context(struct ceph_snap_realm *realm) return 0; } + if (num == 0 && realm->seq == empty_snapc->seq) { + ceph_get_snap_context(empty_snapc); + snapc = empty_snapc; + goto done; + } + /* alloc new snap context */ err = -ENOMEM; if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64)) @@ -365,6 +374,7 @@ static int build_snap_context(struct ceph_snap_realm *realm) realm->ino, realm, snapc, snapc->seq, (unsigned int) snapc->num_snaps); +done: ceph_put_snap_context(realm->cached_context); realm->cached_context = snapc; return 0; @@ -465,6 +475,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) cap_snap. lucky us. */ dout("queue_cap_snap %p already pending\n", inode); kfree(capsnap); + } else if (ci->i_snap_realm->cached_context == empty_snapc) { + dout("queue_cap_snap %p empty snapc\n", inode); + kfree(capsnap); } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { struct ceph_snap_context *snapc = ci->i_head_snapc; @@ -925,5 +938,16 @@ out: return; } +int __init ceph_snap_init(void) +{ + empty_snapc = ceph_create_snap_context(0, GFP_NOFS); + if (!empty_snapc) + return -ENOMEM; + empty_snapc->seq = 1; + return 0; +} - +void ceph_snap_exit(void) +{ + ceph_put_snap_context(empty_snapc); +} diff --git a/fs/ceph/super.c b/fs/ceph/super.c index f6e1237..3b5c1e3 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -1028,15 +1028,20 @@ static int __init init_ceph(void) ceph_flock_init(); ceph_xattr_init(); + ret = ceph_snap_init(); + if (ret) + goto out_xattr; ret = register_filesystem(&ceph_fs_type); if (ret) - goto out_icache; + goto out_snap; pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL); return 0; -out_icache: +out_snap: + ceph_snap_exit(); +out_xattr: ceph_xattr_exit(); destroy_caches(); out: @@ -1047,6 +1052,7 @@ static void __exit exit_ceph(void) { dout("exit_ceph\n"); unregister_filesystem(&ceph_fs_type); + ceph_snap_exit(); ceph_xattr_exit(); destroy_caches(); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index aca2287..fc1c825 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -699,6 +699,8 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci); extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, struct ceph_cap_snap *capsnap); extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); +extern int ceph_snap_init(void); +extern void ceph_snap_exit(void); /* * a cap_snap is "pending" if it is still awaiting an in-progress -- cgit v1.1 From ca3995ad13b5e8bb08be850d08fe1e2abffc0206 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Nov 2014 20:42:35 +0300 Subject: ceph: remove unused stringification macros These were used to report git versions a long time ago. Signed-off-by: Ilya Dryomov --- fs/ceph/super.c | 3 --- 1 file changed, 3 deletions(-) (limited to 'fs') diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 3b5c1e3..989e86c 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -1017,9 +1017,6 @@ static struct file_system_type ceph_fs_type = { }; MODULE_ALIAS_FS("ceph"); -#define _STRINGIFY(x) #x -#define STRINGIFY(x) _STRINGIFY(x) - static int __init init_ceph(void) { int ret = init_caches(); -- cgit v1.1 From 715e4cd405cfd67bd058e410b3e599bab2072645 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 13 Nov 2014 14:40:37 +0800 Subject: libceph: specify position of extent operation allow specifying position of extent operation in multi-operations osd request. This is required for cephfs to convert inline data to normal data (compare xattr, then write object). Signed-off-by: Yan, Zheng Reviewed-by: Ilya Dryomov --- fs/ceph/addr.c | 9 ++++++--- fs/ceph/file.c | 8 +++++--- 2 files changed, 11 insertions(+), 6 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 18c06bb..f2c7aa8 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -319,7 +319,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) off, len); vino = ceph_vino(inode); req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, - 1, CEPH_OSD_OP_READ, + 0, 1, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq, ci->i_truncate_size, false); @@ -750,7 +750,6 @@ retry: last_snapc = snapc; while (!done && index <= end) { - int num_ops = do_sync ? 2 : 1; unsigned i; int first; pgoff_t next; @@ -850,7 +849,8 @@ get_more_pages: len = wsize; req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, - offset, &len, num_ops, + offset, &len, 0, + do_sync ? 2 : 1, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, @@ -862,6 +862,9 @@ get_more_pages: break; } + if (do_sync) + osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); + req->r_callback = writepages_finish; req->r_inode = inode; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index d7e0da8..c03ac4c 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -598,7 +598,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) snapc = ci->i_snap_realm->cached_context; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, + vino, pos, &len, 0, 2,/*include a 'startsync' command*/ CEPH_OSD_OP_WRITE, flags, snapc, ci->i_truncate_seq, @@ -609,6 +609,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) break; } + osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); + n = iov_iter_get_pages_alloc(from, &pages, len, &start); if (unlikely(n < 0)) { ret = n; @@ -713,7 +715,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) snapc = ci->i_snap_realm->cached_context; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, 1, + vino, pos, &len, 0, 1, CEPH_OSD_OP_WRITE, flags, snapc, ci->i_truncate_seq, ci->i_truncate_size, @@ -1111,7 +1113,7 @@ static int ceph_zero_partial_object(struct inode *inode, req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, ceph_vino(inode), offset, length, - 1, op, + 0, 1, op, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, NULL, 0, 0, false); -- cgit v1.1 From fb01d1f8b0343f1b19be878cee89d089f06e9f38 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 21:29:55 +0800 Subject: ceph: parse inline data in MClientReply and MClientCaps Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 34 +++++++++++++++++++++++----------- fs/ceph/mds_client.c | 10 ++++++++++ fs/ceph/mds_client.h | 3 +++ 3 files changed, 36 insertions(+), 11 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index eb1bf1f..b88ae60 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2383,6 +2383,8 @@ static void invalidate_aliases(struct inode *inode) static void handle_cap_grant(struct ceph_mds_client *mdsc, struct inode *inode, struct ceph_mds_caps *grant, void *snaptrace, int snaptrace_len, + u64 inline_version, + void *inline_data, int inline_len, struct ceph_buffer *xattr_buf, struct ceph_mds_session *session, struct ceph_cap *cap, int issued) @@ -2996,11 +2998,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, u64 cap_id; u64 size, max_size; u64 tid; + u64 inline_version = 0; + void *inline_data = NULL; + u32 inline_len = 0; void *snaptrace; size_t snaptrace_len; - void *flock; - void *end; - u32 flock_len; + void *p, *end; dout("handle_caps from mds%d\n", mds); @@ -3021,30 +3024,37 @@ void ceph_handle_caps(struct ceph_mds_session *session, snaptrace = h + 1; snaptrace_len = le32_to_cpu(h->snap_trace_len); + p = snaptrace + snaptrace_len; if (le16_to_cpu(msg->hdr.version) >= 2) { - void *p = snaptrace + snaptrace_len; + u32 flock_len; ceph_decode_32_safe(&p, end, flock_len, bad); if (p + flock_len > end) goto bad; - flock = p; - } else { - flock = NULL; - flock_len = 0; + p += flock_len; } if (le16_to_cpu(msg->hdr.version) >= 3) { if (op == CEPH_CAP_OP_IMPORT) { - void *p = flock + flock_len; if (p + sizeof(*peer) > end) goto bad; peer = p; + p += sizeof(*peer); } else if (op == CEPH_CAP_OP_EXPORT) { /* recorded in unused fields */ peer = (void *)&h->size; } } + if (le16_to_cpu(msg->hdr.version) >= 4) { + ceph_decode_64_safe(&p, end, inline_version, bad); + ceph_decode_32_safe(&p, end, inline_len, bad); + if (p + inline_len > end) + goto bad; + inline_data = p; + p += inline_len; + } + /* lookup ino */ inode = ceph_find_inode(sb, vino); ci = ceph_inode(inode); @@ -3085,6 +3095,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, handle_cap_import(mdsc, inode, h, peer, session, &cap, &issued); handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, + inline_version, inline_data, inline_len, msg->middle, session, cap, issued); goto done_unlocked; } @@ -3105,8 +3116,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, case CEPH_CAP_OP_GRANT: __ceph_caps_issued(ci, &issued); issued |= __ceph_caps_dirty(ci); - handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle, - session, cap, issued); + handle_cap_grant(mdsc, inode, h, NULL, 0, + inline_version, inline_data, inline_len, + msg->middle, session, cap, issued); goto done_unlocked; case CEPH_CAP_OP_FLUSH_ACK: diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 387a489..d2171f4 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -89,6 +89,16 @@ static int parse_reply_info_in(void **p, void *end, ceph_decode_need(p, end, info->xattr_len, bad); info->xattr_data = *p; *p += info->xattr_len; + + if (features & CEPH_FEATURE_MDS_INLINE_DATA) { + ceph_decode_64_safe(p, end, info->inline_version, bad); + ceph_decode_32_safe(p, end, info->inline_len, bad); + ceph_decode_need(p, end, info->inline_len, bad); + info->inline_data = *p; + *p += info->inline_len; + } else + info->inline_version = CEPH_INLINE_NONE; + return 0; bad: return err; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 230bda7..01f5e4c 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -41,6 +41,9 @@ struct ceph_mds_reply_info_in { char *symlink; u32 xattr_len; char *xattr_data; + u64 inline_version; + u32 inline_len; + char *inline_data; }; /* -- cgit v1.1 From 31c542a199d79f0f402c2f3e04229464510d47ec Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 21:41:55 +0800 Subject: ceph: add inline data to pagecache Request reply and cap message can contain inline data. add inline data to the page cache if there is Fc cap. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 43 +++++++++++++++++++++++++++++++++++++++++++ fs/ceph/caps.c | 11 +++++++++++ fs/ceph/inode.c | 16 ++++++++++++++++ fs/ceph/super.h | 5 ++++- fs/ceph/super.h.rej | 10 ++++++++++ 5 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 fs/ceph/super.h.rej (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index f2c7aa8..4a3f55f 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1318,6 +1318,49 @@ out: return ret; } +void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, + char *data, size_t len) +{ + struct address_space *mapping = inode->i_mapping; + struct page *page; + + if (locked_page) { + page = locked_page; + } else { + if (i_size_read(inode) == 0) + return; + page = find_or_create_page(mapping, 0, + mapping_gfp_mask(mapping) & ~__GFP_FS); + if (!page) + return; + if (PageUptodate(page)) { + unlock_page(page); + page_cache_release(page); + return; + } + } + + dout("fill_inline_data %p %llx.%llx len %lu locked_page %p\n", + inode, ceph_vinop(inode), len, locked_page); + + if (len > 0) { + void *kaddr = kmap_atomic(page); + memcpy(kaddr, data, len); + kunmap_atomic(kaddr); + } + + if (page != locked_page) { + if (len < PAGE_CACHE_SIZE) + zero_user_segment(page, len, PAGE_CACHE_SIZE); + else + flush_dcache_page(page); + + SetPageUptodate(page); + unlock_page(page); + page_cache_release(page); + } +} + static struct vm_operations_struct ceph_vmops = { .fault = ceph_filemap_fault, .page_mkwrite = ceph_page_mkwrite, diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index b88ae60..6372eb9 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2405,6 +2405,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, bool queue_invalidate = false; bool queue_revalidate = false; bool deleted_inode = false; + bool fill_inline = false; dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", inode, cap, mds, seq, ceph_cap_string(newcaps)); @@ -2578,6 +2579,13 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, } BUG_ON(cap->issued & ~cap->implemented); + if (inline_version > 0 && inline_version >= ci->i_inline_version) { + ci->i_inline_version = inline_version; + if (ci->i_inline_version != CEPH_INLINE_NONE && + (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO))) + fill_inline = true; + } + spin_unlock(&ci->i_ceph_lock); if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { @@ -2591,6 +2599,9 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, wake = true; } + if (fill_inline) + ceph_fill_inline_data(inode, NULL, inline_data, inline_len); + if (queue_trunc) { ceph_queue_vmtruncate(inode); ceph_queue_revalidate(inode); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 72607c1..feea6a8 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -387,6 +387,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) spin_lock_init(&ci->i_ceph_lock); ci->i_version = 0; + ci->i_inline_version = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; ci->i_ordered_count = 0; @@ -676,6 +677,7 @@ static int fill_inode(struct inode *inode, bool wake = false; bool queue_trunc = false; bool new_version = false; + bool fill_inline = false; dout("fill_inode %p ino %llx.%llx v %llu had %llu\n", inode, ceph_vinop(inode), le64_to_cpu(info->version), @@ -875,8 +877,22 @@ static int fill_inode(struct inode *inode, ceph_vinop(inode)); __ceph_get_fmode(ci, cap_fmode); } + + if (iinfo->inline_version > 0 && + iinfo->inline_version >= ci->i_inline_version) { + int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; + ci->i_inline_version = iinfo->inline_version; + if (ci->i_inline_version != CEPH_INLINE_NONE && + (le32_to_cpu(info->cap.caps) & cache_caps)) + fill_inline = true; + } + spin_unlock(&ci->i_ceph_lock); + if (fill_inline) + ceph_fill_inline_data(inode, NULL, + iinfo->inline_data, iinfo->inline_len); + if (wake) wake_up_all(&ci->i_cap_wq); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index fc1c825..b0de916 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -253,6 +253,7 @@ struct ceph_inode_info { spinlock_t i_ceph_lock; u64 i_version; + u64 i_inline_version; u32 i_time_warp_seq; unsigned i_ceph_flags; @@ -858,7 +859,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn, int mds, int drop, int unless); extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, - int *got, loff_t endoff); + loff_t endoff, int *got, struct page **pinned_page); /* for counting open files by mode */ static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode) @@ -880,6 +881,8 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, struct file *file, unsigned flags, umode_t mode, int *opened); extern int ceph_release(struct inode *inode, struct file *filp); +extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, + char *data, size_t len); /* dir.c */ extern const struct file_operations ceph_dir_fops; diff --git a/fs/ceph/super.h.rej b/fs/ceph/super.h.rej new file mode 100644 index 0000000..88fe3df --- /dev/null +++ b/fs/ceph/super.h.rej @@ -0,0 +1,10 @@ +--- fs/ceph/super.h ++++ fs/ceph/super.h +@@ -254,6 +255,7 @@ + spinlock_t i_ceph_lock; + + u64 i_version; ++ u64 i_inline_version; + u32 i_time_warp_seq; + + unsigned i_ceph_flags; -- cgit v1.1 From 01deead041e03c9a6b4e1b2dd165dee4cced6112 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 21:56:29 +0800 Subject: ceph: use getattr request to fetch inline data Add a new parameter 'locked_page' to ceph_do_getattr(). If inline data in getattr reply will be copied to the page. Signed-off-by: Yan, Zheng --- fs/ceph/inode.c | 34 +++++++++++++++++++++++++--------- fs/ceph/mds_client.h | 1 + fs/ceph/super.h | 7 ++++++- 3 files changed, 32 insertions(+), 10 deletions(-) (limited to 'fs') diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index feea6a8..0bdabd1 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -659,7 +659,7 @@ void ceph_fill_file_time(struct inode *inode, int issued, * Populate an inode based on info from mds. May be called on new or * existing inodes. */ -static int fill_inode(struct inode *inode, +static int fill_inode(struct inode *inode, struct page *locked_page, struct ceph_mds_reply_info_in *iinfo, struct ceph_mds_reply_dirfrag *dirinfo, struct ceph_mds_session *session, @@ -883,14 +883,15 @@ static int fill_inode(struct inode *inode, int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; ci->i_inline_version = iinfo->inline_version; if (ci->i_inline_version != CEPH_INLINE_NONE && - (le32_to_cpu(info->cap.caps) & cache_caps)) + (locked_page || + (le32_to_cpu(info->cap.caps) & cache_caps))) fill_inline = true; } spin_unlock(&ci->i_ceph_lock); if (fill_inline) - ceph_fill_inline_data(inode, NULL, + ceph_fill_inline_data(inode, locked_page, iinfo->inline_data, iinfo->inline_len); if (wake) @@ -1080,7 +1081,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, struct inode *dir = req->r_locked_dir; if (dir) { - err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag, + err = fill_inode(dir, NULL, + &rinfo->diri, rinfo->dirfrag, session, req->r_request_started, -1, &req->r_caps_reservation); if (err < 0) @@ -1150,7 +1152,7 @@ retry_lookup: } req->r_target_inode = in; - err = fill_inode(in, &rinfo->targeti, NULL, + err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL, session, req->r_request_started, (!req->r_aborted && rinfo->head->result == 0) ? req->r_fmode : -1, @@ -1321,7 +1323,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, dout("new_inode badness got %d\n", err); continue; } - rc = fill_inode(in, &rinfo->dir_in[i], NULL, session, + rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, req->r_request_started, -1, &req->r_caps_reservation); if (rc < 0) { @@ -1437,7 +1439,7 @@ retry_lookup: } } - if (fill_inode(in, &rinfo->dir_in[i], NULL, session, + if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, req->r_request_started, -1, &req->r_caps_reservation) < 0) { pr_err("fill_inode badness on %p\n", in); @@ -1920,7 +1922,8 @@ out_put: * Verify that we have a lease on the given mask. If not, * do a getattr against an mds. */ -int ceph_do_getattr(struct inode *inode, int mask, bool force) +int __ceph_do_getattr(struct inode *inode, struct page *locked_page, + int mask, bool force) { struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; @@ -1932,7 +1935,8 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force) return 0; } - dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode); + dout("do_getattr inode %p mask %s mode 0%o\n", + inode, ceph_cap_string(mask), inode->i_mode); if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) return 0; @@ -1943,7 +1947,19 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force) ihold(inode); req->r_num_caps = 1; req->r_args.getattr.mask = cpu_to_le32(mask); + req->r_locked_page = locked_page; err = ceph_mdsc_do_request(mdsc, NULL, req); + if (locked_page && err == 0) { + u64 inline_version = req->r_reply_info.targeti.inline_version; + if (inline_version == 0) { + /* the reply is supposed to contain inline data */ + err = -EINVAL; + } else if (inline_version == CEPH_INLINE_NONE) { + err = -ENODATA; + } else { + err = req->r_reply_info.targeti.inline_len; + } + } ceph_mdsc_put_request(req); dout("do_getattr result=%d\n", err); return err; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 01f5e4c..e2817d0 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -223,6 +223,7 @@ struct ceph_mds_request { int r_request_release_offset; struct ceph_msg *r_reply; struct ceph_mds_reply_info_parsed r_reply_info; + struct page *r_locked_page; int r_err; bool r_aborted; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index b0de916..6d56fae 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -744,7 +744,12 @@ extern void ceph_queue_vmtruncate(struct inode *inode); extern void ceph_queue_invalidate(struct inode *inode); extern void ceph_queue_writeback(struct inode *inode); -extern int ceph_do_getattr(struct inode *inode, int mask, bool force); +extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page, + int mask, bool force); +static inline int ceph_do_getattr(struct inode *inode, int mask, bool force) +{ + return __ceph_do_getattr(inode, NULL, mask, force); +} extern int ceph_permission(struct inode *inode, int mask); extern int ceph_setattr(struct dentry *dentry, struct iattr *attr); extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, -- cgit v1.1 From 3738daa68a5121ad7dd0318bca931e2a6afb0e8c Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 22:10:07 +0800 Subject: ceph: fetch inline data when getting Fcr cap refs we can't use getattr to fetch inline data after getting Fcr caps, because it can cause deadlock. The solution is try bringing inline data to page cache when not holding any cap, and hope the inline data page is still there after getting the Fcr caps. If the page is still there, pin it in page cache for later IO. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 9 +++++++-- fs/ceph/caps.c | 60 +++++++++++++++++++++++++++++++++++++++++++++------------- fs/ceph/file.c | 12 +++++++++--- 3 files changed, 63 insertions(+), 18 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 4a3f55f..5d2b88e 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1207,6 +1207,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) struct inode *inode = file_inode(vma->vm_file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_file_info *fi = vma->vm_file->private_data; + struct page *pinned_page = NULL; loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT; int want, got, ret; @@ -1218,7 +1219,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) want = CEPH_CAP_FILE_CACHE; while (1) { got = 0; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, + &got, &pinned_page); if (ret == 0) break; if (ret != -ERESTARTSYS) { @@ -1233,6 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); + if (pinned_page) + page_cache_release(pinned_page); ceph_put_cap_refs(ci, got); return ret; @@ -1266,7 +1270,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) want = CEPH_CAP_FILE_BUFFER; while (1) { got = 0; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len, + &got, NULL); if (ret == 0) break; if (ret != -ERESTARTSYS) { diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 6372eb9..795afe3 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2057,15 +2057,17 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) * requested from the MDS. */ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, - int *got, loff_t endoff, int *check_max, int *err) + loff_t endoff, int *got, struct page **pinned_page, + int *check_max, int *err) { struct inode *inode = &ci->vfs_inode; int ret = 0; - int have, implemented; + int have, implemented, _got = 0; int file_wanted; dout("get_cap_refs %p need %s want %s\n", inode, ceph_cap_string(need), ceph_cap_string(want)); +again: spin_lock(&ci->i_ceph_lock); /* make sure file is actually open */ @@ -2075,7 +2077,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ceph_cap_string(need), ceph_cap_string(file_wanted)); *err = -EBADF; ret = 1; - goto out; + goto out_unlock; } /* finish pending truncate */ @@ -2095,7 +2097,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, *check_max = 1; ret = 1; } - goto out; + goto out_unlock; } /* * If a sync write is in progress, we must wait, so that we @@ -2103,7 +2105,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, */ if (__ceph_have_pending_cap_snap(ci)) { dout("get_cap_refs %p cap_snap_pending\n", inode); - goto out; + goto out_unlock; } } @@ -2120,18 +2122,50 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, inode, ceph_cap_string(have), ceph_cap_string(not), ceph_cap_string(revoking)); if ((revoking & not) == 0) { - *got = need | (have & want); - __take_cap_refs(ci, *got); + _got = need | (have & want); + __take_cap_refs(ci, _got); ret = 1; } } else { dout("get_cap_refs %p have %s needed %s\n", inode, ceph_cap_string(have), ceph_cap_string(need)); } -out: +out_unlock: spin_unlock(&ci->i_ceph_lock); + + if (ci->i_inline_version != CEPH_INLINE_NONE && + (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && + i_size_read(inode) > 0) { + int ret1; + struct page *page = find_get_page(inode->i_mapping, 0); + if (page) { + if (PageUptodate(page)) { + *pinned_page = page; + goto out; + } + page_cache_release(page); + } + /* + * drop cap refs first because getattr while holding + * caps refs can cause deadlock. + */ + ceph_put_cap_refs(ci, _got); + _got = 0; + + /* getattr request will bring inline data into page cache */ + ret1 = __ceph_do_getattr(inode, NULL, + CEPH_STAT_CAP_INLINE_DATA, true); + if (ret1 >= 0) { + ret = 0; + goto again; + } + *err = ret1; + ret = 1; + } +out: dout("get_cap_refs %p ret %d got %s\n", inode, - ret, ceph_cap_string(*got)); + ret, ceph_cap_string(_got)); + *got = _got; return ret; } @@ -2168,8 +2202,8 @@ static void check_max_size(struct inode *inode, loff_t endoff) * due to a small max_size, make sure we check_max_size (and possibly * ask the mds) so we don't get hung up indefinitely. */ -int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got, - loff_t endoff) +int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, + loff_t endoff, int *got, struct page **pinned_page) { int check_max, ret, err; @@ -2179,8 +2213,8 @@ retry: check_max = 0; err = 0; ret = wait_event_interruptible(ci->i_cap_wq, - try_get_cap_refs(ci, need, want, - got, endoff, + try_get_cap_refs(ci, need, want, endoff, + got, pinned_page, &check_max, &err)); if (err) ret = err; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index c03ac4c..861b995 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -805,6 +805,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) size_t len = iocb->ki_nbytes; struct inode *inode = file_inode(filp); struct ceph_inode_info *ci = ceph_inode(inode); + struct page *pinned_page = NULL; ssize_t ret; int want, got = 0; int checkeof = 0, read = 0; @@ -817,7 +818,7 @@ again: want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; else want = CEPH_CAP_FILE_CACHE; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page); if (ret < 0) return ret; @@ -840,6 +841,10 @@ again: } dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); + if (pinned_page) { + page_cache_release(pinned_page); + pinned_page = NULL; + } ceph_put_cap_refs(ci, got); if (checkeof && ret >= 0) { @@ -924,7 +929,8 @@ retry_snap: else want = CEPH_CAP_FILE_BUFFER; got = 0; - err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count); + err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count, + &got, NULL); if (err < 0) goto out; @@ -1225,7 +1231,7 @@ static long ceph_fallocate(struct file *file, int mode, else want = CEPH_CAP_FILE_BUFFER; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL); if (ret < 0) goto unlock; -- cgit v1.1 From 83701246aee8f83b4b42483051b439fbe96ed47d Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 22:36:18 +0800 Subject: ceph: sync read inline data we can't use getattr to fetch inline data while holding Fr cap, because it can cause deadlock. If we need to sync read inline data, drop cap refs first, then use getattr to fetch inline data. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++----- fs/ceph/file.c | 63 ++++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 116 insertions(+), 13 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 5d2b88e..13413d7 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -192,17 +192,30 @@ static int readpage_nounlock(struct file *filp, struct page *page) struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; int err = 0; + u64 off = page_offset(page); u64 len = PAGE_CACHE_SIZE; - err = ceph_readpage_from_fscache(inode, page); + if (off >= i_size_read(inode)) { + zero_user_segment(page, err, PAGE_CACHE_SIZE); + SetPageUptodate(page); + return 0; + } + /* + * Uptodate inline data should have been added into page cache + * while getting Fcr caps. + */ + if (ci->i_inline_version != CEPH_INLINE_NONE) + return -EINVAL; + + err = ceph_readpage_from_fscache(inode, page); if (err == 0) goto out; dout("readpage inode %p file %p page %p index %lu\n", inode, filp, page, page->index); err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, - (u64) page_offset(page), &len, + off, &len, ci->i_truncate_seq, ci->i_truncate_size, &page, 1, 0); if (err == -ENOENT) @@ -384,6 +397,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, int rc = 0; int max = 0; + if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE) + return -EINVAL; + rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list, &nr_pages); @@ -1219,8 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) want = CEPH_CAP_FILE_CACHE; while (1) { got = 0; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, - &got, &pinned_page); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, + -1, &got, &pinned_page); if (ret == 0) break; if (ret != -ERESTARTSYS) { @@ -1231,7 +1247,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) dout("filemap_fault %p %llu~%zd got cap refs on %s\n", inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got)); - ret = filemap_fault(vma, vmf); + if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || + ci->i_inline_version == CEPH_INLINE_NONE) + ret = filemap_fault(vma, vmf); + else + ret = -EAGAIN; dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); @@ -1239,6 +1259,42 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) page_cache_release(pinned_page); ceph_put_cap_refs(ci, got); + if (ret != -EAGAIN) + return ret; + + /* read inline data */ + if (off >= PAGE_CACHE_SIZE) { + /* does not support inline data > PAGE_SIZE */ + ret = VM_FAULT_SIGBUS; + } else { + int ret1; + struct address_space *mapping = inode->i_mapping; + struct page *page = find_or_create_page(mapping, 0, + mapping_gfp_mask(mapping) & + ~__GFP_FS); + if (!page) { + ret = VM_FAULT_OOM; + goto out; + } + ret1 = __ceph_do_getattr(inode, page, + CEPH_STAT_CAP_INLINE_DATA, true); + if (ret1 < 0 || off >= i_size_read(inode)) { + unlock_page(page); + page_cache_release(page); + ret = VM_FAULT_SIGBUS; + goto out; + } + if (ret1 < PAGE_CACHE_SIZE) + zero_user_segment(page, ret1, PAGE_CACHE_SIZE); + else + flush_dcache_page(page); + SetPageUptodate(page); + vmf->page = page; + ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED; + } +out: + dout("filemap_fault %p %llu~%zd read inline data ret %d\n", + inode, off, (size_t)PAGE_CACHE_SIZE, ret); return ret; } diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 861b995..5b092bd 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file) return 0; } +enum { + CHECK_EOF = 1, + READ_INLINE = 2, +}; + /* * Read a range of bytes striped over one or more objects. Iterate over * objects we stripe over. (That's not atomic, but good enough for now.) @@ -412,7 +417,7 @@ more: ret = read; /* did we bounce off eof? */ if (pos + left > inode->i_size) - *checkeof = 1; + *checkeof = CHECK_EOF; } dout("striped_read returns %d\n", ret); @@ -808,7 +813,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) struct page *pinned_page = NULL; ssize_t ret; int want, got = 0; - int checkeof = 0, read = 0; + int retry_op = 0, read = 0; again: dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", @@ -830,8 +835,12 @@ again: inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, ceph_cap_string(got)); - /* hmm, this isn't really async... */ - ret = ceph_sync_read(iocb, to, &checkeof); + if (ci->i_inline_version == CEPH_INLINE_NONE) { + /* hmm, this isn't really async... */ + ret = ceph_sync_read(iocb, to, &retry_op); + } else { + retry_op = READ_INLINE; + } } else { dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, @@ -846,12 +855,50 @@ again: pinned_page = NULL; } ceph_put_cap_refs(ci, got); + if (retry_op && ret >= 0) { + int statret; + struct page *page = NULL; + loff_t i_size; + if (retry_op == READ_INLINE) { + page = __page_cache_alloc(GFP_NOFS); + if (!page) + return -ENOMEM; + } + + statret = __ceph_do_getattr(inode, page, + CEPH_STAT_CAP_INLINE_DATA, !!page); + if (statret < 0) { + __free_page(page); + if (statret == -ENODATA) { + BUG_ON(retry_op != READ_INLINE); + goto again; + } + return statret; + } - if (checkeof && ret >= 0) { - int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); + i_size = i_size_read(inode); + if (retry_op == READ_INLINE) { + /* does not support inline data > PAGE_SIZE */ + if (i_size > PAGE_CACHE_SIZE) { + ret = -EIO; + } else if (iocb->ki_pos < i_size) { + loff_t end = min_t(loff_t, i_size, + iocb->ki_pos + len); + if (statret < end) + zero_user_segment(page, statret, end); + ret = copy_page_to_iter(page, + iocb->ki_pos & ~PAGE_MASK, + end - iocb->ki_pos, to); + iocb->ki_pos += ret; + } else { + ret = 0; + } + __free_pages(page, 0); + return ret; + } /* hit EOF or hole? */ - if (statret == 0 && iocb->ki_pos < inode->i_size && + if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && ret < len) { dout("sync_read hit hole, ppos %lld < size %lld" ", reading more\n", iocb->ki_pos, @@ -859,7 +906,7 @@ again: read += ret; len -= ret; - checkeof = 0; + retry_op = 0; goto again; } } -- cgit v1.1 From 28127bdd2f843e996f24b51a70a0592c7ec5c763 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 22:38:29 +0800 Subject: ceph: convert inline data to normal data before data write Before any data write, convert inline data to normal data and set i_inline_version to CEPH_INLINE_NONE. The OSD request that saves inline data to object contains 3 operations (CMPXATTR, WRITE and SETXATTR). It compares a xattr named 'inline_version' to prevent old data overwrites newer data. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 148 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- fs/ceph/file.c | 14 ++++++ fs/ceph/super.h | 2 +- 3 files changed, 161 insertions(+), 3 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 13413d7..70a3b44 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1313,6 +1313,19 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) size_t len; int want, got, ret; + if (ci->i_inline_version != CEPH_INLINE_NONE) { + struct page *locked_page = NULL; + if (off == 0) { + lock_page(page); + locked_page = page; + } + ret = ceph_uninline_data(vma->vm_file, locked_page); + if (locked_page) + unlock_page(locked_page); + if (ret < 0) + return VM_FAULT_SIGBUS; + } + if (off + PAGE_CACHE_SIZE <= size) len = PAGE_CACHE_SIZE; else @@ -1361,11 +1374,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = VM_FAULT_SIGBUS; } out: - if (ret != VM_FAULT_LOCKED) { + if (ret != VM_FAULT_LOCKED) unlock_page(page); - } else { + if (ret == VM_FAULT_LOCKED || + ci->i_inline_version != CEPH_INLINE_NONE) { int dirty; spin_lock(&ci->i_ceph_lock); + ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); spin_unlock(&ci->i_ceph_lock); if (dirty) @@ -1422,6 +1437,135 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, } } +int ceph_uninline_data(struct file *filp, struct page *locked_page) +{ + struct inode *inode = file_inode(filp); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_osd_request *req; + struct page *page = NULL; + u64 len, inline_version; + int err = 0; + bool from_pagecache = false; + + spin_lock(&ci->i_ceph_lock); + inline_version = ci->i_inline_version; + spin_unlock(&ci->i_ceph_lock); + + dout("uninline_data %p %llx.%llx inline_version %llu\n", + inode, ceph_vinop(inode), inline_version); + + if (inline_version == 1 || /* initial version, no data */ + inline_version == CEPH_INLINE_NONE) + goto out; + + if (locked_page) { + page = locked_page; + WARN_ON(!PageUptodate(page)); + } else if (ceph_caps_issued(ci) & + (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) { + page = find_get_page(inode->i_mapping, 0); + if (page) { + if (PageUptodate(page)) { + from_pagecache = true; + lock_page(page); + } else { + page_cache_release(page); + page = NULL; + } + } + } + + if (page) { + len = i_size_read(inode); + if (len > PAGE_CACHE_SIZE) + len = PAGE_CACHE_SIZE; + } else { + page = __page_cache_alloc(GFP_NOFS); + if (!page) { + err = -ENOMEM; + goto out; + } + err = __ceph_do_getattr(inode, page, + CEPH_STAT_CAP_INLINE_DATA, true); + if (err < 0) { + /* no inline data */ + if (err == -ENODATA) + err = 0; + goto out; + } + len = err; + } + + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + ceph_vino(inode), 0, &len, 0, 1, + CEPH_OSD_OP_CREATE, + CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, + ci->i_snap_realm->cached_context, + 0, 0, false); + if (IS_ERR(req)) { + err = PTR_ERR(req); + goto out; + } + + ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); + err = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!err) + err = ceph_osdc_wait_request(&fsc->client->osdc, req); + ceph_osdc_put_request(req); + if (err < 0) + goto out; + + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + ceph_vino(inode), 0, &len, 1, 3, + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, + ci->i_snap_realm->cached_context, + ci->i_truncate_seq, ci->i_truncate_size, + false); + if (IS_ERR(req)) { + err = PTR_ERR(req); + goto out; + } + + osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false); + + err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, + "inline_version", &inline_version, + sizeof(inline_version), + CEPH_OSD_CMPXATTR_OP_GT, + CEPH_OSD_CMPXATTR_MODE_U64); + if (err) + goto out_put; + + err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, + "inline_version", &inline_version, + sizeof(inline_version), 0, 0); + if (err) + goto out_put; + + ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); + err = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!err) + err = ceph_osdc_wait_request(&fsc->client->osdc, req); +out_put: + ceph_osdc_put_request(req); + if (err == -ECANCELED) + err = 0; +out: + if (page && page != locked_page) { + if (from_pagecache) { + unlock_page(page); + page_cache_release(page); + } else + __free_pages(page, 0); + } + + dout("uninline_data %p %llx.%llx inline_version %llu = %d\n", + inode, ceph_vinop(inode), inline_version, err); + return err; +} + static struct vm_operations_struct ceph_vmops = { .fault = ceph_filemap_fault, .page_mkwrite = ceph_page_mkwrite, diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 5b092bd..9b5901f 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -963,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) if (err) goto out; + if (ci->i_inline_version != CEPH_INLINE_NONE) { + err = ceph_uninline_data(file, NULL); + if (err < 0) + goto out; + } + retry_snap: if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) { err = -ENOSPC; @@ -1024,6 +1030,7 @@ retry_snap: if (written >= 0) { int dirty; spin_lock(&ci->i_ceph_lock); + ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); spin_unlock(&ci->i_ceph_lock); if (dirty) @@ -1269,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode, goto unlock; } + if (ci->i_inline_version != CEPH_INLINE_NONE) { + ret = ceph_uninline_data(file, NULL); + if (ret < 0) + goto unlock; + } + size = i_size_read(inode); if (!(mode & FALLOC_FL_KEEP_SIZE)) endoff = offset + length; @@ -1295,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode, if (!ret) { spin_lock(&ci->i_ceph_lock); + ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); spin_unlock(&ci->i_ceph_lock); if (dirty) diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 6d56fae..8197a3c 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -888,7 +888,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, extern int ceph_release(struct inode *inode, struct file *filp); extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, char *data, size_t len); - +int ceph_uninline_data(struct file *filp, struct page *locked_page); /* dir.c */ extern const struct file_operations ceph_dir_fops; extern const struct inode_operations ceph_dir_iops; -- cgit v1.1 From e20d258d73a8d565b729b6fc0290e060daabd8b8 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 22:39:13 +0800 Subject: ceph: flush inline version After converting inline data to normal data, client need to flush the new i_inline_version (CEPH_INLINE_NONE) to MDS. This commit makes cap messages (sent to MDS) contain inline_version and inline_data. Client always converts inline data to normal data before data write, so the inline data length part is always zero. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 24 ++++++++++++++++++++---- fs/ceph/snap.c | 2 ++ fs/ceph/super.h | 1 + 3 files changed, 23 insertions(+), 4 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 795afe3..b93c631 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -975,10 +975,12 @@ static int send_cap_msg(struct ceph_mds_session *session, kuid_t uid, kgid_t gid, umode_t mode, u64 xattr_version, struct ceph_buffer *xattrs_buf, - u64 follows) + u64 follows, bool inline_data) { struct ceph_mds_caps *fc; struct ceph_msg *msg; + void *p; + size_t extra_len; dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" " seq %u/%u mseq %u follows %lld size %llu/%llu" @@ -988,7 +990,10 @@ static int send_cap_msg(struct ceph_mds_session *session, seq, issue_seq, mseq, follows, size, max_size, xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false); + /* flock buffer size + inline version + inline data size */ + extra_len = 4 + 8 + 4; + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, + GFP_NOFS, false); if (!msg) return -ENOMEM; @@ -1020,6 +1025,14 @@ static int send_cap_msg(struct ceph_mds_session *session, fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid)); fc->mode = cpu_to_le32(mode); + p = fc + 1; + /* flock buffer size */ + ceph_encode_32(&p, 0); + /* inline version */ + ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE); + /* inline data size */ + ceph_encode_32(&p, 0); + fc->xattr_version = cpu_to_le64(xattr_version); if (xattrs_buf) { msg->middle = ceph_buffer_get(xattrs_buf); @@ -1126,6 +1139,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, u64 flush_tid = 0; int i; int ret; + bool inline_data; held = cap->issued | cap->implemented; revoking = cap->implemented & ~cap->issued; @@ -1209,13 +1223,15 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, xattr_version = ci->i_xattrs.version; } + inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + spin_unlock(&ci->i_ceph_lock); ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, size, max_size, &mtime, &atime, time_warp_seq, uid, gid, mode, xattr_version, xattr_blob, - follows); + follows, inline_data); if (ret < 0) { dout("error sending cap msg, must requeue %p\n", inode); delayed = 1; @@ -1336,7 +1352,7 @@ retry: capsnap->time_warp_seq, capsnap->uid, capsnap->gid, capsnap->mode, capsnap->xattr_version, capsnap->xattr_blob, - capsnap->follows); + capsnap->follows, capsnap->inline_data); next_follows = capsnap->follows + 1; ceph_put_cap_snap(capsnap); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index f64576e..ce35fbd 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -516,6 +516,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) capsnap->xattr_version = 0; } + capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + /* dirty page count moved from _head to this cap_snap; all subsequent writes page dirties occur _after_ this snapshot. */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 8197a3c..e1aa32d 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -161,6 +161,7 @@ struct ceph_cap_snap { u64 time_warp_seq; int writing; /* a sync write is still in progress */ int dirty_pages; /* dirty pages awaiting writeback */ + bool inline_data; }; static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) -- cgit v1.1 From 65a22662bfe1a84d72b9bbd9146b6782b9e53478 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 17 Nov 2014 10:01:03 +0800 Subject: ceph: support inline data feature Signed-off-by: Yan, Zheng --- fs/ceph/super.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 989e86c..50f06cd 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -515,7 +515,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, struct ceph_fs_client *fsc; const u64 supported_features = CEPH_FEATURE_FLOCK | - CEPH_FEATURE_DIRLAYOUTHASH; + CEPH_FEATURE_DIRLAYOUTHASH | + CEPH_FEATURE_MDS_INLINE_DATA; const u64 required_features = 0; int page_count; size_t size; -- cgit v1.1 From 021b77bee210843bed1ea91b5cad58235ff9c8e5 Mon Sep 17 00:00:00 2001 From: Dan Carpenter Date: Fri, 28 Nov 2014 11:33:34 +0300 Subject: ceph: do_sync is never initialized Probably this code was syncing a lot more often then intended because the do_sync variable wasn't set to zero. Cc: stable@vger.kernel.org # v3.11+ Fixes: c62988ec0910 ('ceph: avoid meaningless calling ceph_caps_revoking if sync_mode == WB_SYNC_ALL.') Signed-off-by: Dan Carpenter Signed-off-by: Ilya Dryomov --- fs/ceph/addr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 70a3b44..f5013d9 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -689,7 +689,7 @@ static int ceph_writepages_start(struct address_space *mapping, int rc = 0; unsigned wsize = 1 << inode->i_blkbits; struct ceph_osd_request *req = NULL; - int do_sync; + int do_sync = 0; u64 truncate_size, snap_size; u32 truncate_seq; -- cgit v1.1 From 275dd19ea4e84c34f985ba097f9cddb539f54a50 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 10 Dec 2014 16:17:31 +0800 Subject: ceph: fix mksnap crash mksnap reply only contain 'target', does not contain 'dentry'. So it's wrong to use req->r_reply_info.head->is_dentry to detect traceless reply. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil --- fs/ceph/dir.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 6526199..fcfd0ab 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -812,7 +812,9 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) acls.pagelist = NULL; } err = ceph_mdsc_do_request(mdsc, dir, req); - if (!err && !req->r_reply_info.head->is_dentry) + if (!err && + !req->r_reply_info.head->is_target && + !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); out: -- cgit v1.1 From 0aeff37abada9f8c08d2b10481a43d3ae406c823 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 17 Dec 2014 21:26:47 +0800 Subject: ceph: fix setting empty extended attribute make sure 'value' is not null. otherwise __ceph_setxattr will remove the extended attribute. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil --- fs/ceph/xattr.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'fs') diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 678b0d2..5a492ca 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -854,7 +854,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, struct ceph_pagelist *pagelist = NULL; int err; - if (value) { + if (size > 0) { /* copy value into pagelist */ pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); if (!pagelist) @@ -864,7 +864,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, err = ceph_pagelist_append(pagelist, value, size); if (err) goto out; - } else { + } else if (!value) { flags |= CEPH_XATTR_REMOVE; } @@ -1001,6 +1001,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name, if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) return generic_setxattr(dentry, name, value, size, flags); + if (size == 0) + value = ""; /* empty EA, do not remove */ + return __ceph_setxattr(dentry, name, value, size, flags); } -- cgit v1.1