summaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/addr.c112
-rw-r--r--fs/ceph/caps.c323
-rw-r--r--fs/ceph/dir.c75
-rw-r--r--fs/ceph/export.c26
-rw-r--r--fs/ceph/file.c131
-rw-r--r--fs/ceph/inode.c23
-rw-r--r--fs/ceph/mds_client.c23
-rw-r--r--fs/ceph/mdsmap.c163
-rw-r--r--fs/ceph/snap.c2
-rw-r--r--fs/ceph/super.c12
-rw-r--r--fs/ceph/super.h15
11 files changed, 577 insertions, 328 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index ef3ebd7..9cd0c0e 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -315,7 +315,32 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
struct page **pages;
pgoff_t next_index;
int nr_pages = 0;
- int ret;
+ int got = 0;
+ int ret = 0;
+
+ if (!current->journal_info) {
+ /* caller of readpages does not hold buffer and read caps
+ * (fadvise, madvise and readahead cases) */
+ int want = CEPH_CAP_FILE_CACHE;
+ ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
+ if (ret < 0) {
+ dout("start_read %p, error getting cap\n", inode);
+ } else if (!(got & want)) {
+ dout("start_read %p, no cache cap\n", inode);
+ ret = 0;
+ }
+ if (ret <= 0) {
+ if (got)
+ ceph_put_cap_refs(ci, got);
+ while (!list_empty(page_list)) {
+ page = list_entry(page_list->prev,
+ struct page, lru);
+ list_del(&page->lru);
+ put_page(page);
+ }
+ return ret;
+ }
+ }
off = (u64) page_offset(page);
@@ -338,15 +363,18 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
CEPH_OSD_FLAG_READ, NULL,
ci->i_truncate_seq, ci->i_truncate_size,
false);
- if (IS_ERR(req))
- return PTR_ERR(req);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ goto out;
+ }
/* build page vector */
nr_pages = calc_pages_for(0, len);
pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
- ret = -ENOMEM;
- if (!pages)
- goto out;
+ if (!pages) {
+ ret = -ENOMEM;
+ goto out_put;
+ }
for (i = 0; i < nr_pages; ++i) {
page = list_entry(page_list->prev, struct page, lru);
BUG_ON(PageLocked(page));
@@ -378,6 +406,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
if (ret < 0)
goto out_pages;
ceph_osdc_put_request(req);
+
+ /* After adding locked pages to page cache, the inode holds cache cap.
+ * So we can drop our cap refs. */
+ if (got)
+ ceph_put_cap_refs(ci, got);
+
return nr_pages;
out_pages:
@@ -386,8 +420,11 @@ out_pages:
unlock_page(pages[i]);
}
ceph_put_page_vector(pages, nr_pages, false);
-out:
+out_put:
ceph_osdc_put_request(req);
+out:
+ if (got)
+ ceph_put_cap_refs(ci, got);
return ret;
}
@@ -424,7 +461,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
rc = start_read(inode, page_list, max);
if (rc < 0)
goto out;
- BUG_ON(rc == 0);
}
out:
ceph_fscache_readpages_cancel(inode, page_list);
@@ -438,7 +474,9 @@ out:
* only snap context we are allowed to write back.
*/
static struct ceph_snap_context *get_oldest_context(struct inode *inode,
- loff_t *snap_size)
+ loff_t *snap_size,
+ u64 *truncate_size,
+ u32 *truncate_seq)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_snap_context *snapc = NULL;
@@ -452,6 +490,10 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
snapc = ceph_get_snap_context(capsnap->context);
if (snap_size)
*snap_size = capsnap->size;
+ if (truncate_size)
+ *truncate_size = capsnap->truncate_size;
+ if (truncate_seq)
+ *truncate_seq = capsnap->truncate_seq;
break;
}
}
@@ -459,6 +501,10 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
snapc = ceph_get_snap_context(ci->i_head_snapc);
dout(" head snapc %p has %d dirty pages\n",
snapc, ci->i_wrbuffer_ref_head);
+ if (truncate_size)
+ *truncate_size = capsnap->truncate_size;
+ if (truncate_seq)
+ *truncate_seq = capsnap->truncate_seq;
}
spin_unlock(&ci->i_ceph_lock);
return snapc;
@@ -501,7 +547,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
dout("writepage %p page %p not dirty?\n", inode, page);
goto out;
}
- oldest = get_oldest_context(inode, &snap_size);
+ oldest = get_oldest_context(inode, &snap_size,
+ &truncate_size, &truncate_seq);
if (snapc->seq > oldest->seq) {
dout("writepage %p page %p snapc %p not writeable - noop\n",
inode, page, snapc);
@@ -512,12 +559,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
}
ceph_put_snap_context(oldest);
- spin_lock(&ci->i_ceph_lock);
- truncate_seq = ci->i_truncate_seq;
- truncate_size = ci->i_truncate_size;
if (snap_size == -1)
snap_size = i_size_read(inode);
- spin_unlock(&ci->i_ceph_lock);
/* is this a partial page at end of file? */
if (page_off >= snap_size) {
@@ -764,7 +807,8 @@ retry:
/* find oldest snap context with dirty data */
ceph_put_snap_context(snapc);
snap_size = -1;
- snapc = get_oldest_context(inode, &snap_size);
+ snapc = get_oldest_context(inode, &snap_size,
+ &truncate_size, &truncate_seq);
if (!snapc) {
/* hmm, why does writepages get called when there
is no dirty data? */
@@ -774,11 +818,7 @@ retry:
dout(" oldest snapc is %p seq %lld (%d snaps)\n",
snapc, snapc->seq, snapc->num_snaps);
- spin_lock(&ci->i_ceph_lock);
- truncate_seq = ci->i_truncate_seq;
- truncate_size = ci->i_truncate_size;
i_size = i_size_read(inode);
- spin_unlock(&ci->i_ceph_lock);
if (last_snapc && snapc != last_snapc) {
/* if we switched to a newer snapc, restart our scan at the
@@ -1124,7 +1164,8 @@ out:
static int context_is_writeable_or_written(struct inode *inode,
struct ceph_snap_context *snapc)
{
- struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
+ struct ceph_snap_context *oldest = get_oldest_context(inode, NULL,
+ NULL, NULL);
int ret = !oldest || snapc->seq <= oldest->seq;
ceph_put_snap_context(oldest);
@@ -1169,7 +1210,7 @@ retry_locked:
* this page is already dirty in another (older) snap
* context! is it writeable now?
*/
- oldest = get_oldest_context(inode, NULL);
+ oldest = get_oldest_context(inode, NULL, NULL, NULL);
if (snapc->seq > oldest->seq) {
ceph_put_snap_context(oldest);
@@ -1276,25 +1317,27 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
struct page *page, void *fsdata)
{
struct inode *inode = file_inode(file);
- unsigned from = pos & (PAGE_SIZE - 1);
int check_cap = 0;
dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
inode, page, (int)pos, (int)copied, (int)len);
/* zero the stale part of the page if we did a short copy */
- if (copied < len)
- zero_user_segment(page, from+copied, len);
+ if (!PageUptodate(page)) {
+ if (copied < len) {
+ copied = 0;
+ goto out;
+ }
+ SetPageUptodate(page);
+ }
/* did file size increase? */
if (pos+copied > i_size_read(inode))
check_cap = ceph_inode_set_size(inode, pos+copied);
- if (!PageUptodate(page))
- SetPageUptodate(page);
-
set_page_dirty(page);
+out:
unlock_page(page);
put_page(page);
@@ -1371,9 +1414,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
- ci->i_inline_version == CEPH_INLINE_NONE)
+ ci->i_inline_version == CEPH_INLINE_NONE) {
+ current->journal_info = vma->vm_file;
ret = filemap_fault(vma, vmf);
- else
+ current->journal_info = NULL;
+ } else
ret = -EAGAIN;
dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
@@ -1905,6 +1950,15 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
struct ceph_string *pool_ns;
int ret, flags;
+ if (ci->i_vino.snap != CEPH_NOSNAP) {
+ /*
+ * Pool permission check needs to write to the first object.
+ * But for snapshot, head of the first object may have alread
+ * been deleted. Skip check to avoid creating orphan object.
+ */
+ return 0;
+ }
+
if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
NOPOOLPERM))
return 0;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 16e6ded..baea866 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -987,96 +987,127 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
__cap_delay_cancel(mdsc, ci);
}
+struct cap_msg_args {
+ struct ceph_mds_session *session;
+ u64 ino, cid, follows;
+ u64 flush_tid, oldest_flush_tid, size, max_size;
+ u64 xattr_version;
+ struct ceph_buffer *xattr_buf;
+ struct timespec atime, mtime, ctime;
+ int op, caps, wanted, dirty;
+ u32 seq, issue_seq, mseq, time_warp_seq;
+ u32 flags;
+ kuid_t uid;
+ kgid_t gid;
+ umode_t mode;
+ bool inline_data;
+};
+
/*
* Build and send a cap message to the given MDS.
*
* Caller should be holding s_mutex.
*/
-static int send_cap_msg(struct ceph_mds_session *session,
- u64 ino, u64 cid, int op,
- int caps, int wanted, int dirty,
- u32 seq, u64 flush_tid, u64 oldest_flush_tid,
- u32 issue_seq, u32 mseq, u64 size, u64 max_size,
- struct timespec *mtime, struct timespec *atime,
- struct timespec *ctime, u32 time_warp_seq,
- kuid_t uid, kgid_t gid, umode_t mode,
- u64 xattr_version,
- struct ceph_buffer *xattrs_buf,
- u64 follows, bool inline_data)
+static int send_cap_msg(struct cap_msg_args *arg)
{
struct ceph_mds_caps *fc;
struct ceph_msg *msg;
void *p;
size_t extra_len;
+ struct timespec zerotime = {0};
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
" seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
- " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op),
- cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
- ceph_cap_string(dirty),
- seq, issue_seq, flush_tid, oldest_flush_tid,
- mseq, follows, size, max_size,
- xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
+ " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op),
+ arg->cid, arg->ino, ceph_cap_string(arg->caps),
+ ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty),
+ arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid,
+ arg->mseq, arg->follows, arg->size, arg->max_size,
+ arg->xattr_version,
+ arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0);
/* flock buffer size + inline version + inline data size +
* osd_epoch_barrier + oldest_flush_tid */
- extra_len = 4 + 8 + 4 + 4 + 8;
+ extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4;
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
GFP_NOFS, false);
if (!msg)
return -ENOMEM;
- msg->hdr.version = cpu_to_le16(6);
- msg->hdr.tid = cpu_to_le64(flush_tid);
+ msg->hdr.version = cpu_to_le16(10);
+ msg->hdr.tid = cpu_to_le64(arg->flush_tid);
fc = msg->front.iov_base;
memset(fc, 0, sizeof(*fc));
- fc->cap_id = cpu_to_le64(cid);
- fc->op = cpu_to_le32(op);
- fc->seq = cpu_to_le32(seq);
- fc->issue_seq = cpu_to_le32(issue_seq);
- fc->migrate_seq = cpu_to_le32(mseq);
- fc->caps = cpu_to_le32(caps);
- fc->wanted = cpu_to_le32(wanted);
- fc->dirty = cpu_to_le32(dirty);
- fc->ino = cpu_to_le64(ino);
- fc->snap_follows = cpu_to_le64(follows);
-
- fc->size = cpu_to_le64(size);
- fc->max_size = cpu_to_le64(max_size);
- if (mtime)
- ceph_encode_timespec(&fc->mtime, mtime);
- if (atime)
- ceph_encode_timespec(&fc->atime, atime);
- if (ctime)
- ceph_encode_timespec(&fc->ctime, ctime);
- fc->time_warp_seq = cpu_to_le32(time_warp_seq);
-
- fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid));
- fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
- fc->mode = cpu_to_le32(mode);
+ fc->cap_id = cpu_to_le64(arg->cid);
+ fc->op = cpu_to_le32(arg->op);
+ fc->seq = cpu_to_le32(arg->seq);
+ fc->issue_seq = cpu_to_le32(arg->issue_seq);
+ fc->migrate_seq = cpu_to_le32(arg->mseq);
+ fc->caps = cpu_to_le32(arg->caps);
+ fc->wanted = cpu_to_le32(arg->wanted);
+ fc->dirty = cpu_to_le32(arg->dirty);
+ fc->ino = cpu_to_le64(arg->ino);
+ fc->snap_follows = cpu_to_le64(arg->follows);
+
+ fc->size = cpu_to_le64(arg->size);
+ fc->max_size = cpu_to_le64(arg->max_size);
+ ceph_encode_timespec(&fc->mtime, &arg->mtime);
+ ceph_encode_timespec(&fc->atime, &arg->atime);
+ ceph_encode_timespec(&fc->ctime, &arg->ctime);
+ fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq);
+
+ fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid));
+ fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid));
+ fc->mode = cpu_to_le32(arg->mode);
+
+ fc->xattr_version = cpu_to_le64(arg->xattr_version);
+ if (arg->xattr_buf) {
+ msg->middle = ceph_buffer_get(arg->xattr_buf);
+ fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
+ msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
+ }
p = fc + 1;
- /* flock buffer size */
+ /* flock buffer size (version 2) */
ceph_encode_32(&p, 0);
- /* inline version */
- ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
+ /* inline version (version 4) */
+ ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
/* inline data size */
ceph_encode_32(&p, 0);
- /* osd_epoch_barrier */
+ /* osd_epoch_barrier (version 5) */
ceph_encode_32(&p, 0);
- /* oldest_flush_tid */
- ceph_encode_64(&p, oldest_flush_tid);
+ /* oldest_flush_tid (version 6) */
+ ceph_encode_64(&p, arg->oldest_flush_tid);
- fc->xattr_version = cpu_to_le64(xattr_version);
- if (xattrs_buf) {
- msg->middle = ceph_buffer_get(xattrs_buf);
- fc->xattr_len = cpu_to_le32(xattrs_buf->vec.iov_len);
- msg->hdr.middle_len = cpu_to_le32(xattrs_buf->vec.iov_len);
- }
+ /*
+ * caller_uid/caller_gid (version 7)
+ *
+ * Currently, we don't properly track which caller dirtied the caps
+ * last, and force a flush of them when there is a conflict. For now,
+ * just set this to 0:0, to emulate how the MDS has worked up to now.
+ */
+ ceph_encode_32(&p, 0);
+ ceph_encode_32(&p, 0);
+
+ /* pool namespace (version 8) (mds always ignores this) */
+ ceph_encode_32(&p, 0);
- ceph_con_send(&session->s_con, msg);
+ /*
+ * btime and change_attr (version 9)
+ *
+ * We just zero these out for now, as the MDS ignores them unless
+ * the requisite feature flags are set (which we don't do yet).
+ */
+ ceph_encode_timespec(p, &zerotime);
+ p += sizeof(struct ceph_timespec);
+ ceph_encode_64(&p, 0);
+
+ /* Advisory flags (version 10) */
+ ceph_encode_32(&p, arg->flags);
+
+ ceph_con_send(&arg->session->s_con, msg);
return 0;
}
@@ -1115,27 +1146,17 @@ void ceph_queue_caps_release(struct inode *inode)
* caller should hold snap_rwsem (read), s_mutex.
*/
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
- int op, int used, int want, int retain, int flushing,
- u64 flush_tid, u64 oldest_flush_tid)
+ int op, bool sync, int used, int want, int retain,
+ int flushing, u64 flush_tid, u64 oldest_flush_tid)
__releases(cap->ci->i_ceph_lock)
{
struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode;
- u64 cap_id = cap->cap_id;
- int held, revoking, dropping, keep;
- u64 follows, size, max_size;
- u32 seq, issue_seq, mseq, time_warp_seq;
- struct timespec mtime, atime, ctime;
+ struct cap_msg_args arg;
+ int held, revoking, dropping;
int wake = 0;
- umode_t mode;
- kuid_t uid;
- kgid_t gid;
- struct ceph_mds_session *session;
- u64 xattr_version = 0;
- struct ceph_buffer *xattr_blob = NULL;
int delayed = 0;
int ret;
- bool inline_data;
held = cap->issued | cap->implemented;
revoking = cap->implemented & ~cap->issued;
@@ -1148,7 +1169,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ceph_cap_string(revoking));
BUG_ON((retain & CEPH_CAP_PIN) == 0);
- session = cap->session;
+ arg.session = cap->session;
/* don't release wanted unless we've waited a bit. */
if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
@@ -1177,40 +1198,51 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
cap->implemented &= cap->issued | used;
cap->mds_wanted = want;
- follows = flushing ? ci->i_head_snapc->seq : 0;
-
- keep = cap->implemented;
- seq = cap->seq;
- issue_seq = cap->issue_seq;
- mseq = cap->mseq;
- size = inode->i_size;
- ci->i_reported_size = size;
- max_size = ci->i_wanted_max_size;
- ci->i_requested_max_size = max_size;
- mtime = inode->i_mtime;
- atime = inode->i_atime;
- ctime = inode->i_ctime;
- time_warp_seq = ci->i_time_warp_seq;
- uid = inode->i_uid;
- gid = inode->i_gid;
- mode = inode->i_mode;
+ arg.ino = ceph_vino(inode).ino;
+ arg.cid = cap->cap_id;
+ arg.follows = flushing ? ci->i_head_snapc->seq : 0;
+ arg.flush_tid = flush_tid;
+ arg.oldest_flush_tid = oldest_flush_tid;
+
+ arg.size = inode->i_size;
+ ci->i_reported_size = arg.size;
+ arg.max_size = ci->i_wanted_max_size;
+ ci->i_requested_max_size = arg.max_size;
if (flushing & CEPH_CAP_XATTR_EXCL) {
__ceph_build_xattrs_blob(ci);
- xattr_blob = ci->i_xattrs.blob;
- xattr_version = ci->i_xattrs.version;
+ arg.xattr_version = ci->i_xattrs.version;
+ arg.xattr_buf = ci->i_xattrs.blob;
+ } else {
+ arg.xattr_buf = NULL;
}
- inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+ arg.mtime = inode->i_mtime;
+ arg.atime = inode->i_atime;
+ arg.ctime = inode->i_ctime;
+
+ arg.op = op;
+ arg.caps = cap->implemented;
+ arg.wanted = want;
+ arg.dirty = flushing;
+
+ arg.seq = cap->seq;
+ arg.issue_seq = cap->issue_seq;
+ arg.mseq = cap->mseq;
+ arg.time_warp_seq = ci->i_time_warp_seq;
+
+ arg.uid = inode->i_uid;
+ arg.gid = inode->i_gid;
+ arg.mode = inode->i_mode;
+
+ arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+ arg.flags = 0;
+ if (sync)
+ arg.flags |= CEPH_CLIENT_CAPS_SYNC;
spin_unlock(&ci->i_ceph_lock);
- ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
- op, keep, want, flushing, seq,
- flush_tid, oldest_flush_tid, issue_seq, mseq,
- size, max_size, &mtime, &atime, &ctime, time_warp_seq,
- uid, gid, mode, xattr_version, xattr_blob,
- follows, inline_data);
+ ret = send_cap_msg(&arg);
if (ret < 0) {
dout("error sending cap msg, must requeue %p\n", inode);
delayed = 1;
@@ -1227,15 +1259,42 @@ static inline int __send_flush_snap(struct inode *inode,
struct ceph_cap_snap *capsnap,
u32 mseq, u64 oldest_flush_tid)
{
- return send_cap_msg(session, ceph_vino(inode).ino, 0,
- CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
- capsnap->dirty, 0, capsnap->cap_flush.tid,
- oldest_flush_tid, 0, mseq, capsnap->size, 0,
- &capsnap->mtime, &capsnap->atime,
- &capsnap->ctime, capsnap->time_warp_seq,
- capsnap->uid, capsnap->gid, capsnap->mode,
- capsnap->xattr_version, capsnap->xattr_blob,
- capsnap->follows, capsnap->inline_data);
+ struct cap_msg_args arg;
+
+ arg.session = session;
+ arg.ino = ceph_vino(inode).ino;
+ arg.cid = 0;
+ arg.follows = capsnap->follows;
+ arg.flush_tid = capsnap->cap_flush.tid;
+ arg.oldest_flush_tid = oldest_flush_tid;
+
+ arg.size = capsnap->size;
+ arg.max_size = 0;
+ arg.xattr_version = capsnap->xattr_version;
+ arg.xattr_buf = capsnap->xattr_blob;
+
+ arg.atime = capsnap->atime;
+ arg.mtime = capsnap->mtime;
+ arg.ctime = capsnap->ctime;
+
+ arg.op = CEPH_CAP_OP_FLUSHSNAP;
+ arg.caps = capsnap->issued;
+ arg.wanted = 0;
+ arg.dirty = capsnap->dirty;
+
+ arg.seq = 0;
+ arg.issue_seq = 0;
+ arg.mseq = mseq;
+ arg.time_warp_seq = capsnap->time_warp_seq;
+
+ arg.uid = capsnap->uid;
+ arg.gid = capsnap->gid;
+ arg.mode = capsnap->mode;
+
+ arg.inline_data = capsnap->inline_data;
+ arg.flags = 0;
+
+ return send_cap_msg(&arg);
}
/*
@@ -1858,9 +1917,9 @@ ack:
sent++;
/* __send_cap drops i_ceph_lock */
- delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
- want, retain, flushing,
- flush_tid, oldest_flush_tid);
+ delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false,
+ cap_used, want, retain, flushing,
+ flush_tid, oldest_flush_tid);
goto retry; /* retake i_ceph_lock and restart our cap scan. */
}
@@ -1924,9 +1983,9 @@ retry:
&flush_tid, &oldest_flush_tid);
/* __send_cap drops i_ceph_lock */
- delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
- (cap->issued | cap->implemented),
- flushing, flush_tid, oldest_flush_tid);
+ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true,
+ used, want, (cap->issued | cap->implemented),
+ flushing, flush_tid, oldest_flush_tid);
if (delayed) {
spin_lock(&ci->i_ceph_lock);
@@ -1996,7 +2055,7 @@ static int unsafe_request_wait(struct inode *inode)
}
spin_unlock(&ci->i_unsafe_lock);
- dout("unsafe_requeset_wait %p wait on tid %llu %llu\n",
+ dout("unsafe_request_wait %p wait on tid %llu %llu\n",
inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
if (req1) {
ret = !wait_for_completion_timeout(&req1->r_safe_completion,
@@ -2119,7 +2178,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
inode, cap, cf->tid, ceph_cap_string(cf->caps));
ci->i_ceph_flags |= CEPH_I_NODELAY;
ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
- __ceph_caps_used(ci),
+ false, __ceph_caps_used(ci),
__ceph_caps_wanted(ci),
cap->issued | cap->implemented,
cf->caps, cf->tid, oldest_flush_tid);
@@ -2479,6 +2538,27 @@ static void check_max_size(struct inode *inode, loff_t endoff)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
}
+int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
+{
+ int ret, err = 0;
+
+ BUG_ON(need & ~CEPH_CAP_FILE_RD);
+ BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
+ ret = ceph_pool_perm_check(ci, need);
+ if (ret < 0)
+ return ret;
+
+ ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
+ if (ret) {
+ if (err == -EAGAIN) {
+ ret = 0;
+ } else if (err < 0) {
+ ret = err;
+ }
+ }
+ return ret;
+}
+
/*
* Wait for caps, and take cap references. If we can't get a WR cap
* due to a small max_size, make sure we check_max_size (and possibly
@@ -2507,9 +2587,15 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
if (err < 0)
ret = err;
} else {
- ret = wait_event_interruptible(ci->i_cap_wq,
- try_get_cap_refs(ci, need, want, endoff,
- true, &_got, &err));
+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
+ add_wait_queue(&ci->i_cap_wq, &wait);
+
+ while (!try_get_cap_refs(ci, need, want, endoff,
+ true, &_got, &err))
+ wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
+
+ remove_wait_queue(&ci->i_cap_wq, &wait);
+
if (err == -EAGAIN)
continue;
if (err < 0)
@@ -3570,6 +3656,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
cap->cap_id = le64_to_cpu(h->cap_id);
cap->mseq = mseq;
cap->seq = seq;
+ cap->issue_seq = seq;
spin_lock(&session->s_cap_lock);
list_add_tail(&cap->session_caps,
&session->s_cap_releases);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 78180d1..d7a9369 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -32,40 +32,19 @@ const struct dentry_operations ceph_dentry_ops;
/*
* Initialize ceph dentry state.
*/
-int ceph_init_dentry(struct dentry *dentry)
+static int ceph_d_init(struct dentry *dentry)
{
struct ceph_dentry_info *di;
- if (dentry->d_fsdata)
- return 0;
-
di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
if (!di)
return -ENOMEM; /* oh well */
- spin_lock(&dentry->d_lock);
- if (dentry->d_fsdata) {
- /* lost a race */
- kmem_cache_free(ceph_dentry_cachep, di);
- goto out_unlock;
- }
-
- if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP)
- d_set_d_op(dentry, &ceph_dentry_ops);
- else if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_SNAPDIR)
- d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
- else
- d_set_d_op(dentry, &ceph_snap_dentry_ops);
-
di->dentry = dentry;
di->lease_session = NULL;
di->time = jiffies;
- /* avoid reordering d_fsdata setup so that the check above is safe */
- smp_mb();
dentry->d_fsdata = di;
ceph_dentry_lru_add(dentry);
-out_unlock:
- spin_unlock(&dentry->d_lock);
return 0;
}
@@ -737,10 +716,6 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
if (dentry->d_name.len > NAME_MAX)
return ERR_PTR(-ENAMETOOLONG);
- err = ceph_init_dentry(dentry);
- if (err < 0)
- return ERR_PTR(err);
-
/* can we conclude ENOENT locally? */
if (d_really_is_negative(dentry)) {
struct ceph_inode_info *ci = ceph_inode(dir);
@@ -1261,26 +1236,30 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
return -ECHILD;
op = ceph_snap(dir) == CEPH_SNAPDIR ?
- CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
+ CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_GETATTR;
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
if (!IS_ERR(req)) {
req->r_dentry = dget(dentry);
- req->r_num_caps = 2;
+ req->r_num_caps = op == CEPH_MDS_OP_GETATTR ? 1 : 2;
mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
if (ceph_security_xattr_wanted(dir))
mask |= CEPH_CAP_XATTR_SHARED;
req->r_args.getattr.mask = mask;
- req->r_locked_dir = dir;
err = ceph_mdsc_do_request(mdsc, NULL, req);
- if (err == 0 || err == -ENOENT) {
- if (dentry == req->r_dentry) {
- valid = !d_unhashed(dentry);
- } else {
- d_invalidate(req->r_dentry);
- err = -EAGAIN;
- }
+ switch (err) {
+ case 0:
+ if (d_really_is_positive(dentry) &&
+ d_inode(dentry) == req->r_target_inode)
+ valid = 1;
+ break;
+ case -ENOENT:
+ if (d_really_is_negative(dentry))
+ valid = 1;
+ /* Fallthrough */
+ default:
+ break;
}
ceph_mdsc_put_request(req);
dout("d_revalidate %p lookup result=%d\n",
@@ -1319,16 +1298,6 @@ static void ceph_d_release(struct dentry *dentry)
kmem_cache_free(ceph_dentry_cachep, di);
}
-static int ceph_snapdir_d_revalidate(struct dentry *dentry,
- unsigned int flags)
-{
- /*
- * Eventually, we'll want to revalidate snapped metadata
- * too... probably...
- */
- return 1;
-}
-
/*
* When the VFS prunes a dentry from the cache, we need to clear the
* complete flag on the parent directory.
@@ -1347,6 +1316,9 @@ static void ceph_d_prune(struct dentry *dentry)
if (d_unhashed(dentry))
return;
+ if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_SNAPDIR)
+ return;
+
/*
* we hold d_lock, so d_parent is stable, and d_fsdata is never
* cleared until d_release
@@ -1517,14 +1489,5 @@ const struct dentry_operations ceph_dentry_ops = {
.d_revalidate = ceph_d_revalidate,
.d_release = ceph_d_release,
.d_prune = ceph_d_prune,
-};
-
-const struct dentry_operations ceph_snapdir_dentry_ops = {
- .d_revalidate = ceph_snapdir_d_revalidate,
- .d_release = ceph_d_release,
-};
-
-const struct dentry_operations ceph_snap_dentry_ops = {
- .d_release = ceph_d_release,
- .d_prune = ceph_d_prune,
+ .d_init = ceph_d_init,
};
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 1780218..180bbef 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -62,7 +62,6 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
{
struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc;
struct inode *inode;
- struct dentry *dentry;
struct ceph_vino vino;
int err;
@@ -94,16 +93,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
return ERR_PTR(-ESTALE);
}
- dentry = d_obtain_alias(inode);
- if (IS_ERR(dentry))
- return dentry;
- err = ceph_init_dentry(dentry);
- if (err < 0) {
- dput(dentry);
- return ERR_PTR(err);
- }
- dout("__fh_to_dentry %llx %p dentry %p\n", ino, inode, dentry);
- return dentry;
+ return d_obtain_alias(inode);
}
/*
@@ -131,7 +121,6 @@ static struct dentry *__get_parent(struct super_block *sb,
struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc;
struct ceph_mds_request *req;
struct inode *inode;
- struct dentry *dentry;
int mask;
int err;
@@ -164,18 +153,7 @@ static struct dentry *__get_parent(struct super_block *sb,
if (!inode)
return ERR_PTR(-ENOENT);
- dentry = d_obtain_alias(inode);
- if (IS_ERR(dentry))
- return dentry;
- err = ceph_init_dentry(dentry);
- if (err < 0) {
- dput(dentry);
- return ERR_PTR(err);
- }
- dout("__get_parent ino %llx parent %p ino %llx.%llx\n",
- child ? ceph_ino(d_inode(child)) : ino,
- dentry, ceph_vinop(inode));
- return dentry;
+ return d_obtain_alias(inode);
}
static struct dentry *ceph_get_parent(struct dentry *child)
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index f995e35..045d30d 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -351,10 +351,6 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
if (dentry->d_name.len > NAME_MAX)
return -ENAMETOOLONG;
- err = ceph_init_dentry(dentry);
- if (err < 0)
- return err;
-
if (flags & O_CREAT) {
err = ceph_pre_init_acls(dir, &mode, &acls);
if (err < 0)
@@ -458,71 +454,60 @@ enum {
* only return a short read to the caller if we hit EOF.
*/
static int striped_read(struct inode *inode,
- u64 off, u64 len,
+ u64 pos, u64 len,
struct page **pages, int num_pages,
- int *checkeof)
+ int page_align, int *checkeof)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode);
- u64 pos, this_len, left;
+ u64 this_len;
loff_t i_size;
- int page_align, pages_left;
- int read, ret;
- struct page **page_pos;
+ int page_idx;
+ int ret, read = 0;
bool hit_stripe, was_short;
/*
* we may need to do multiple reads. not atomic, unfortunately.
*/
- pos = off;
- left = len;
- page_pos = pages;
- pages_left = num_pages;
- read = 0;
-
more:
- page_align = pos & ~PAGE_MASK;
- this_len = left;
+ this_len = len;
+ page_idx = (page_align + read) >> PAGE_SHIFT;
ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
&ci->i_layout, pos, &this_len,
- ci->i_truncate_seq,
- ci->i_truncate_size,
- page_pos, pages_left, page_align);
+ ci->i_truncate_seq, ci->i_truncate_size,
+ pages + page_idx, num_pages - page_idx,
+ ((page_align + read) & ~PAGE_MASK));
if (ret == -ENOENT)
ret = 0;
- hit_stripe = this_len < left;
+ hit_stripe = this_len < len;
was_short = ret >= 0 && ret < this_len;
- dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
+ dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
i_size = i_size_read(inode);
if (ret >= 0) {
- int didpages;
if (was_short && (pos + ret < i_size)) {
int zlen = min(this_len - ret, i_size - pos - ret);
- int zoff = (off & ~PAGE_MASK) + read + ret;
+ int zoff = page_align + read + ret;
dout(" zero gap %llu to %llu\n",
- pos + ret, pos + ret + zlen);
+ pos + ret, pos + ret + zlen);
ceph_zero_page_vector_range(zoff, zlen, pages);
ret += zlen;
}
- didpages = (page_align + ret) >> PAGE_SHIFT;
+ read += ret;
pos += ret;
- read = pos - off;
- left -= ret;
- page_pos += didpages;
- pages_left -= didpages;
+ len -= ret;
/* hit stripe and need continue*/
- if (left && hit_stripe && pos < i_size)
+ if (len && hit_stripe && pos < i_size)
goto more;
}
if (read > 0) {
ret = read;
/* did we bounce off eof? */
- if (pos + left > i_size)
+ if (pos + len > i_size)
*checkeof = CHECK_EOF;
}
@@ -536,15 +521,16 @@ more:
*
* If the read spans object boundary, just do multiple reads.
*/
-static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
- int *checkeof)
+static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
+ int *checkeof)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
struct page **pages;
u64 off = iocb->ki_pos;
- int num_pages, ret;
- size_t len = iov_iter_count(i);
+ int num_pages;
+ ssize_t ret;
+ size_t len = iov_iter_count(to);
dout("sync_read on file %p %llu~%u %s\n", file, off,
(unsigned)len,
@@ -563,35 +549,56 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
if (ret < 0)
return ret;
- num_pages = calc_pages_for(off, len);
- pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
- ret = striped_read(inode, off, len, pages,
- num_pages, checkeof);
- if (ret > 0) {
- int l, k = 0;
- size_t left = ret;
-
- while (left) {
- size_t page_off = off & ~PAGE_MASK;
- size_t copy = min_t(size_t, left,
- PAGE_SIZE - page_off);
- l = copy_page_to_iter(pages[k++], page_off, copy, i);
- off += l;
- left -= l;
- if (l < copy)
- break;
+ if (unlikely(to->type & ITER_PIPE)) {
+ size_t page_off;
+ ret = iov_iter_get_pages_alloc(to, &pages, len,
+ &page_off);
+ if (ret <= 0)
+ return -ENOMEM;
+ num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+
+ ret = striped_read(inode, off, ret, pages, num_pages,
+ page_off, checkeof);
+ if (ret > 0) {
+ iov_iter_advance(to, ret);
+ off += ret;
+ } else {
+ iov_iter_advance(to, 0);
}
+ ceph_put_page_vector(pages, num_pages, false);
+ } else {
+ num_pages = calc_pages_for(off, len);
+ pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ ret = striped_read(inode, off, len, pages, num_pages,
+ (off & ~PAGE_MASK), checkeof);
+ if (ret > 0) {
+ int l, k = 0;
+ size_t left = ret;
+
+ while (left) {
+ size_t page_off = off & ~PAGE_MASK;
+ size_t copy = min_t(size_t, left,
+ PAGE_SIZE - page_off);
+ l = copy_page_to_iter(pages[k++], page_off,
+ copy, to);
+ off += l;
+ left -= l;
+ if (l < copy)
+ break;
+ }
+ }
+ ceph_release_page_vector(pages, num_pages);
}
- ceph_release_page_vector(pages, num_pages);
if (off > iocb->ki_pos) {
ret = off - iocb->ki_pos;
iocb->ki_pos = off;
}
- dout("sync_read result %d\n", ret);
+ dout("sync_read result %zd\n", ret);
return ret;
}
@@ -853,7 +860,7 @@ void ceph_sync_write_wait(struct inode *inode)
dout("sync_write_wait on tid %llu (until %llu)\n",
req->r_tid, last_tid);
- wait_for_completion(&req->r_safe_completion);
+ wait_for_completion(&req->r_done_completion);
ceph_osdc_put_request(req);
spin_lock(&ci->i_unsafe_lock);
@@ -906,7 +913,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
pos >> PAGE_SHIFT,
(pos + count) >> PAGE_SHIFT);
if (ret2 < 0)
- dout("invalidate_inode_pages2_range returned %d\n", ret);
+ dout("invalidate_inode_pages2_range returned %d\n", ret2);
flags = CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ONDISK |
@@ -1249,8 +1256,9 @@ again:
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
ceph_cap_string(got));
-
+ current->journal_info = filp;
ret = generic_file_read_iter(iocb, to);
+ current->journal_info = NULL;
}
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
@@ -1770,6 +1778,7 @@ const struct file_operations ceph_file_fops = {
.fsync = ceph_fsync,
.lock = ceph_lock,
.flock = ceph_flock,
+ .splice_read = generic_file_splice_read,
.splice_write = iter_file_splice_write,
.unlocked_ioctl = ceph_ioctl,
.compat_ioctl = ceph_ioctl,
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ef4d046..398e532 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1023,16 +1023,17 @@ static void update_dentry_lease(struct dentry *dentry,
long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
struct inode *dir;
- /* only track leases on regular dentries */
- if (dentry->d_op != &ceph_dentry_ops)
- return;
-
spin_lock(&dentry->d_lock);
dout("update_dentry_lease %p duration %lu ms ttl %lu\n",
dentry, duration, ttl);
/* make lease_rdcache_gen match directory */
dir = d_inode(dentry->d_parent);
+
+ /* only track leases on regular dentries */
+ if (ceph_snap(dir) != CEPH_NOSNAP)
+ goto out_unlock;
+
di->lease_shared_gen = ceph_inode(dir)->i_shared_gen;
if (duration == 0)
@@ -1202,12 +1203,7 @@ retry_lookup:
err = -ENOMEM;
goto done;
}
- err = ceph_init_dentry(dn);
- if (err < 0) {
- dput(dn);
- dput(parent);
- goto done;
- }
+ err = 0;
} else if (d_really_is_positive(dn) &&
(ceph_ino(d_inode(dn)) != vino.ino ||
ceph_snap(d_inode(dn)) != vino.snap)) {
@@ -1561,12 +1557,6 @@ retry_lookup:
err = -ENOMEM;
goto out;
}
- ret = ceph_init_dentry(dn);
- if (ret < 0) {
- dput(dn);
- err = ret;
- goto out;
- }
} else if (d_really_is_positive(dn) &&
(ceph_ino(d_inode(dn)) != vino.ino ||
ceph_snap(d_inode(dn)) != vino.snap)) {
@@ -1879,7 +1869,6 @@ retry:
* symlinks
*/
static const struct inode_operations ceph_symlink_iops = {
- .readlink = generic_readlink,
.get_link = simple_get_link,
.setattr = ceph_setattr,
.getattr = ceph_getattr,
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 815acd1..4f49253 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2100,17 +2100,26 @@ static int __do_request(struct ceph_mds_client *mdsc,
err = -EIO;
goto finish;
}
+ if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
+ if (mdsc->mdsmap_err) {
+ err = mdsc->mdsmap_err;
+ dout("do_request mdsmap err %d\n", err);
+ goto finish;
+ }
+ if (!(mdsc->fsc->mount_options->flags &
+ CEPH_MOUNT_OPT_MOUNTWAIT) &&
+ !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
+ err = -ENOENT;
+ pr_info("probably no mds server is up\n");
+ goto finish;
+ }
+ }
put_request_session(req);
mds = __choose_mds(mdsc, req);
if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
- if (mdsc->mdsmap_err) {
- err = mdsc->mdsmap_err;
- dout("do_request mdsmap err %d\n", err);
- goto finish;
- }
dout("do_request no mds or not active, waiting for map\n");
list_add(&req->r_wait, &mdsc->waiting_for_map);
goto out;
@@ -3943,13 +3952,13 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
}
-static int verify_authorizer_reply(struct ceph_connection *con, int len)
+static int verify_authorizer_reply(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
struct ceph_mds_client *mdsc = s->s_mdsc;
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
- return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
+ return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
}
static int invalidate_authorizer(struct ceph_connection *con)
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 8c3591a..5454e23 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -42,6 +42,60 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
return i;
}
+#define __decode_and_drop_type(p, end, type, bad) \
+ do { \
+ if (*p + sizeof(type) > end) \
+ goto bad; \
+ *p += sizeof(type); \
+ } while (0)
+
+#define __decode_and_drop_set(p, end, type, bad) \
+ do { \
+ u32 n; \
+ size_t need; \
+ ceph_decode_32_safe(p, end, n, bad); \
+ need = sizeof(type) * n; \
+ ceph_decode_need(p, end, need, bad); \
+ *p += need; \
+ } while (0)
+
+#define __decode_and_drop_map(p, end, ktype, vtype, bad) \
+ do { \
+ u32 n; \
+ size_t need; \
+ ceph_decode_32_safe(p, end, n, bad); \
+ need = (sizeof(ktype) + sizeof(vtype)) * n; \
+ ceph_decode_need(p, end, need, bad); \
+ *p += need; \
+ } while (0)
+
+
+static int __decode_and_drop_compat_set(void **p, void* end)
+{
+ int i;
+ /* compat, ro_compat, incompat*/
+ for (i = 0; i < 3; i++) {
+ u32 n;
+ ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);
+ /* mask */
+ *p += sizeof(u64);
+ /* names (map<u64, string>) */
+ n = ceph_decode_32(p);
+ while (n-- > 0) {
+ u32 len;
+ ceph_decode_need(p, end, sizeof(u64) + sizeof(u32),
+ bad);
+ *p += sizeof(u64);
+ len = ceph_decode_32(p);
+ ceph_decode_need(p, end, len, bad);
+ *p += len;
+ }
+ }
+ return 0;
+bad:
+ return -1;
+}
+
/*
* Decode an MDS map
*
@@ -55,6 +109,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
int i, j, n;
int err = -EINVAL;
u8 mdsmap_v, mdsmap_cv;
+ u16 mdsmap_ev;
m = kzalloc(sizeof(*m), GFP_NOFS);
if (m == NULL)
@@ -83,7 +138,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_info = kcalloc(m->m_max_mds, sizeof(*m->m_info), GFP_NOFS);
if (m->m_info == NULL)
- goto badmem;
+ goto nomem;
/* pick out active nodes from mds_info (state > 0) */
n = ceph_decode_32(p);
@@ -166,7 +221,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
info->export_targets = kcalloc(num_export_targets,
sizeof(u32), GFP_NOFS);
if (info->export_targets == NULL)
- goto badmem;
+ goto nomem;
for (j = 0; j < num_export_targets; j++)
info->export_targets[j] =
ceph_decode_32(&pexport_targets);
@@ -180,24 +235,104 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_num_data_pg_pools = n;
m->m_data_pg_pools = kcalloc(n, sizeof(u64), GFP_NOFS);
if (!m->m_data_pg_pools)
- goto badmem;
+ goto nomem;
ceph_decode_need(p, end, sizeof(u64)*(n+1), bad);
for (i = 0; i < n; i++)
m->m_data_pg_pools[i] = ceph_decode_64(p);
m->m_cas_pg_pool = ceph_decode_64(p);
+ m->m_enabled = m->m_epoch > 1;
+
+ mdsmap_ev = 1;
+ if (mdsmap_v >= 2) {
+ ceph_decode_16_safe(p, end, mdsmap_ev, bad_ext);
+ }
+ if (mdsmap_ev >= 3) {
+ if (__decode_and_drop_compat_set(p, end) < 0)
+ goto bad_ext;
+ }
+ /* metadata_pool */
+ if (mdsmap_ev < 5) {
+ __decode_and_drop_type(p, end, u32, bad_ext);
+ } else {
+ __decode_and_drop_type(p, end, u64, bad_ext);
+ }
- /* ok, we don't care about the rest. */
+ /* created + modified + tableserver */
+ __decode_and_drop_type(p, end, struct ceph_timespec, bad_ext);
+ __decode_and_drop_type(p, end, struct ceph_timespec, bad_ext);
+ __decode_and_drop_type(p, end, u32, bad_ext);
+
+ /* in */
+ {
+ int num_laggy = 0;
+ ceph_decode_32_safe(p, end, n, bad_ext);
+ ceph_decode_need(p, end, sizeof(u32) * n, bad_ext);
+
+ for (i = 0; i < n; i++) {
+ s32 mds = ceph_decode_32(p);
+ if (mds >= 0 && mds < m->m_max_mds) {
+ if (m->m_info[mds].laggy)
+ num_laggy++;
+ }
+ }
+ m->m_num_laggy = num_laggy;
+ }
+
+ /* inc */
+ __decode_and_drop_map(p, end, u32, u32, bad_ext);
+ /* up */
+ __decode_and_drop_map(p, end, u32, u64, bad_ext);
+ /* failed */
+ __decode_and_drop_set(p, end, u32, bad_ext);
+ /* stopped */
+ __decode_and_drop_set(p, end, u32, bad_ext);
+
+ if (mdsmap_ev >= 4) {
+ /* last_failure_osd_epoch */
+ __decode_and_drop_type(p, end, u32, bad_ext);
+ }
+ if (mdsmap_ev >= 6) {
+ /* ever_allowed_snaps */
+ __decode_and_drop_type(p, end, u8, bad_ext);
+ /* explicitly_allowed_snaps */
+ __decode_and_drop_type(p, end, u8, bad_ext);
+ }
+ if (mdsmap_ev >= 7) {
+ /* inline_data_enabled */
+ __decode_and_drop_type(p, end, u8, bad_ext);
+ }
+ if (mdsmap_ev >= 8) {
+ u32 name_len;
+ /* enabled */
+ ceph_decode_8_safe(p, end, m->m_enabled, bad_ext);
+ ceph_decode_32_safe(p, end, name_len, bad_ext);
+ ceph_decode_need(p, end, name_len, bad_ext);
+ *p += name_len;
+ }
+ /* damaged */
+ if (mdsmap_ev >= 9) {
+ size_t need;
+ ceph_decode_32_safe(p, end, n, bad_ext);
+ need = sizeof(u32) * n;
+ ceph_decode_need(p, end, need, bad_ext);
+ *p += need;
+ m->m_damaged = n > 0;
+ } else {
+ m->m_damaged = false;
+ }
+bad_ext:
*p = end;
dout("mdsmap_decode success epoch %u\n", m->m_epoch);
return m;
-
-badmem:
+nomem:
err = -ENOMEM;
+ goto out_err;
bad:
pr_err("corrupt mdsmap\n");
print_hex_dump(KERN_DEBUG, "mdsmap: ",
DUMP_PREFIX_OFFSET, 16, 1,
start, end - start, true);
+out_err:
ceph_mdsmap_destroy(m);
return ERR_PTR(err);
}
@@ -212,3 +347,19 @@ void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
kfree(m->m_data_pg_pools);
kfree(m);
}
+
+bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m)
+{
+ int i, nr_active = 0;
+ if (!m->m_enabled)
+ return false;
+ if (m->m_damaged)
+ return false;
+ if (m->m_num_laggy > 0)
+ return false;
+ for (i = 0; i < m->m_max_mds; i++) {
+ if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE)
+ nr_active++;
+ }
+ return nr_active > 0;
+}
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 9ff5219..8f8b41c 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -593,6 +593,8 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
capsnap->atime = inode->i_atime;
capsnap->ctime = inode->i_ctime;
capsnap->time_warp_seq = ci->i_time_warp_seq;
+ capsnap->truncate_size = ci->i_truncate_size;
+ capsnap->truncate_seq = ci->i_truncate_seq;
if (capsnap->dirty_pages) {
dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
"still has %d dirty pages\n", inode, capsnap,
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index b382e59..6bd20d7 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -137,6 +137,8 @@ enum {
Opt_nofscache,
Opt_poolperm,
Opt_nopoolperm,
+ Opt_require_active_mds,
+ Opt_norequire_active_mds,
#ifdef CONFIG_CEPH_FS_POSIX_ACL
Opt_acl,
#endif
@@ -171,6 +173,8 @@ static match_table_t fsopt_tokens = {
{Opt_nofscache, "nofsc"},
{Opt_poolperm, "poolperm"},
{Opt_nopoolperm, "nopoolperm"},
+ {Opt_require_active_mds, "require_active_mds"},
+ {Opt_norequire_active_mds, "norequire_active_mds"},
#ifdef CONFIG_CEPH_FS_POSIX_ACL
{Opt_acl, "acl"},
#endif
@@ -287,6 +291,12 @@ static int parse_fsopt_token(char *c, void *private)
case Opt_nopoolperm:
fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM;
break;
+ case Opt_require_active_mds:
+ fsopt->flags &= ~CEPH_MOUNT_OPT_MOUNTWAIT;
+ break;
+ case Opt_norequire_active_mds:
+ fsopt->flags |= CEPH_MOUNT_OPT_MOUNTWAIT;
+ break;
#ifdef CONFIG_CEPH_FS_POSIX_ACL
case Opt_acl:
fsopt->sb_flags |= MS_POSIXACL;
@@ -795,7 +805,6 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc,
root = ERR_PTR(-ENOMEM);
goto out;
}
- ceph_init_dentry(root);
dout("open_root_inode success, root dentry is %p\n", root);
} else {
root = ERR_PTR(err);
@@ -879,6 +888,7 @@ static int ceph_set_super(struct super_block *s, void *data)
fsc->sb = s;
s->s_op = &ceph_super_ops;
+ s->s_d_op = &ceph_dentry_ops;
s->s_export_op = &ceph_export_ops;
s->s_time_gran = 1000; /* 1000 ns == 1 us */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 3e3fa916..3373b61 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -36,6 +36,7 @@
#define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */
#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */
#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */
+#define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */
#define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE
@@ -180,6 +181,8 @@ struct ceph_cap_snap {
u64 size;
struct timespec mtime, atime, ctime;
u64 time_warp_seq;
+ u64 truncate_size;
+ u32 truncate_seq;
int writing; /* a sync write is still in progress */
int dirty_pages; /* dirty pages awaiting writeback */
bool inline_data;
@@ -905,6 +908,8 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page);
+extern int ceph_try_get_caps(struct ceph_inode_info *ci,
+ int need, int want, int *got);
/* for counting open files by mode */
extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
@@ -934,8 +939,7 @@ extern const struct file_operations ceph_dir_fops;
extern const struct file_operations ceph_snapdir_fops;
extern const struct inode_operations ceph_dir_iops;
extern const struct inode_operations ceph_snapdir_iops;
-extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
- ceph_snapdir_dentry_ops;
+extern const struct dentry_operations ceph_dentry_ops;
extern loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order);
extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
@@ -951,13 +955,6 @@ extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
-/*
- * our d_ops vary depending on whether the inode is live,
- * snapshotted (read-only), or a virtual ".snap" directory.
- */
-int ceph_init_dentry(struct dentry *dentry);
-
-
/* ioctl.c */
extern long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg);
OpenPOWER on IntegriCloud