summaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/addr.c70
-rw-r--r--fs/ceph/auth.c1
-rw-r--r--fs/ceph/auth_none.h2
-rw-r--r--fs/ceph/auth_x.c32
-rw-r--r--fs/ceph/caps.c63
-rw-r--r--fs/ceph/dir.c16
-rw-r--r--fs/ceph/file.c3
-rw-r--r--fs/ceph/inode.c18
-rw-r--r--fs/ceph/mds_client.c34
-rw-r--r--fs/ceph/messenger.c48
-rw-r--r--fs/ceph/messenger.h1
-rw-r--r--fs/ceph/osd_client.c26
-rw-r--r--fs/ceph/osd_client.h3
-rw-r--r--fs/ceph/osdmap.c209
-rw-r--r--fs/ceph/osdmap.h3
-rw-r--r--fs/ceph/rados.h7
-rw-r--r--fs/ceph/snap.c50
-rw-r--r--fs/ceph/super.c30
-rw-r--r--fs/ceph/super.h4
19 files changed, 399 insertions, 221 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index aa3cd7c..a9005d8 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -337,16 +337,15 @@ out:
/*
* Get ref for the oldest snapc for an inode with dirty data... that is, the
* only snap context we are allowed to write back.
- *
- * Caller holds i_lock.
*/
-static struct ceph_snap_context *__get_oldest_context(struct inode *inode,
- u64 *snap_size)
+static struct ceph_snap_context *get_oldest_context(struct inode *inode,
+ u64 *snap_size)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_snap_context *snapc = NULL;
struct ceph_cap_snap *capsnap = NULL;
+ spin_lock(&inode->i_lock);
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
capsnap->context, capsnap->dirty_pages);
@@ -357,21 +356,11 @@ static struct ceph_snap_context *__get_oldest_context(struct inode *inode,
break;
}
}
- if (!snapc && ci->i_snap_realm) {
- snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
+ if (!snapc && ci->i_head_snapc) {
+ snapc = ceph_get_snap_context(ci->i_head_snapc);
dout(" head snapc %p has %d dirty pages\n",
snapc, ci->i_wrbuffer_ref_head);
}
- return snapc;
-}
-
-static struct ceph_snap_context *get_oldest_context(struct inode *inode,
- u64 *snap_size)
-{
- struct ceph_snap_context *snapc = NULL;
-
- spin_lock(&inode->i_lock);
- snapc = __get_oldest_context(inode, snap_size);
spin_unlock(&inode->i_lock);
return snapc;
}
@@ -392,7 +381,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
int len = PAGE_CACHE_SIZE;
loff_t i_size;
int err = 0;
- struct ceph_snap_context *snapc;
+ struct ceph_snap_context *snapc, *oldest;
u64 snap_size = 0;
long writeback_stat;
@@ -413,13 +402,16 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
dout("writepage %p page %p not dirty?\n", inode, page);
goto out;
}
- if (snapc != get_oldest_context(inode, &snap_size)) {
+ oldest = get_oldest_context(inode, &snap_size);
+ if (snapc->seq > oldest->seq) {
dout("writepage %p page %p snapc %p not writeable - noop\n",
inode, page, (void *)page->private);
/* we should only noop if called by kswapd */
WARN_ON((current->flags & PF_MEMALLOC) == 0);
+ ceph_put_snap_context(oldest);
goto out;
}
+ ceph_put_snap_context(oldest);
/* is this a partial page at end of file? */
if (snap_size)
@@ -458,7 +450,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
ClearPagePrivate(page);
end_page_writeback(page);
ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
- ceph_put_snap_context(snapc);
+ ceph_put_snap_context(snapc); /* page's reference */
out:
return err;
}
@@ -512,12 +504,11 @@ static void writepages_finish(struct ceph_osd_request *req,
int i;
struct ceph_snap_context *snapc = req->r_snapc;
struct address_space *mapping = inode->i_mapping;
- struct writeback_control *wbc = req->r_wbc;
__s32 rc = -EIO;
u64 bytes = 0;
struct ceph_client *client = ceph_inode_to_client(inode);
long writeback_stat;
- unsigned issued = __ceph_caps_issued(ci, NULL);
+ unsigned issued = ceph_caps_issued(ci);
/* parse reply */
replyhead = msg->front.iov_base;
@@ -554,13 +545,9 @@ static void writepages_finish(struct ceph_osd_request *req,
clear_bdi_congested(&client->backing_dev_info,
BLK_RW_ASYNC);
- if (i >= wrote) {
- dout("inode %p skipping page %p\n", inode, page);
- wbc->pages_skipped++;
- }
+ ceph_put_snap_context((void *)page->private);
page->private = 0;
ClearPagePrivate(page);
- ceph_put_snap_context(snapc);
dout("unlocking %d %p\n", i, page);
end_page_writeback(page);
@@ -618,7 +605,7 @@ static int ceph_writepages_start(struct address_space *mapping,
int range_whole = 0;
int should_loop = 1;
pgoff_t max_pages = 0, max_pages_ever = 0;
- struct ceph_snap_context *snapc = NULL, *last_snapc = NULL;
+ struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
struct pagevec pvec;
int done = 0;
int rc = 0;
@@ -770,9 +757,10 @@ get_more_pages:
}
/* only if matching snap context */
- if (snapc != (void *)page->private) {
- dout("page snapc %p != oldest %p\n",
- (void *)page->private, snapc);
+ pgsnapc = (void *)page->private;
+ if (pgsnapc->seq > snapc->seq) {
+ dout("page snapc %p %lld > oldest %p %lld\n",
+ pgsnapc, pgsnapc->seq, snapc, snapc->seq);
unlock_page(page);
if (!locked_pages)
continue; /* keep looking for snap */
@@ -806,7 +794,6 @@ get_more_pages:
alloc_page_vec(client, req);
req->r_callback = writepages_finish;
req->r_inode = inode;
- req->r_wbc = wbc;
}
/* note position of first page in pvec */
@@ -914,7 +901,10 @@ static int context_is_writeable_or_written(struct inode *inode,
struct ceph_snap_context *snapc)
{
struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
- return !oldest || snapc->seq <= oldest->seq;
+ int ret = !oldest || snapc->seq <= oldest->seq;
+
+ ceph_put_snap_context(oldest);
+ return ret;
}
/*
@@ -936,8 +926,8 @@ static int ceph_update_writeable_page(struct file *file,
int pos_in_page = pos & ~PAGE_CACHE_MASK;
int end_in_page = pos_in_page + len;
loff_t i_size;
- struct ceph_snap_context *snapc;
int r;
+ struct ceph_snap_context *snapc, *oldest;
retry_locked:
/* writepages currently holds page lock, but if we change that later, */
@@ -947,23 +937,24 @@ retry_locked:
BUG_ON(!ci->i_snap_realm);
down_read(&mdsc->snap_rwsem);
BUG_ON(!ci->i_snap_realm->cached_context);
- if (page->private &&
- (void *)page->private != ci->i_snap_realm->cached_context) {
+ snapc = (void *)page->private;
+ if (snapc && snapc != ci->i_head_snapc) {
/*
* this page is already dirty in another (older) snap
* context! is it writeable now?
*/
- snapc = get_oldest_context(inode, NULL);
+ oldest = get_oldest_context(inode, NULL);
up_read(&mdsc->snap_rwsem);
- if (snapc != (void *)page->private) {
+ if (snapc->seq > oldest->seq) {
+ ceph_put_snap_context(oldest);
dout(" page %p snapc %p not current or oldest\n",
- page, (void *)page->private);
+ page, snapc);
/*
* queue for writeback, and wait for snapc to
* be writeable or written
*/
- snapc = ceph_get_snap_context((void *)page->private);
+ snapc = ceph_get_snap_context(snapc);
unlock_page(page);
ceph_queue_writeback(inode);
r = wait_event_interruptible(ci->i_cap_wq,
@@ -973,6 +964,7 @@ retry_locked:
return r;
return -EAGAIN;
}
+ ceph_put_snap_context(oldest);
/* yay, writeable, do it now (without dropping page lock) */
dout(" page %p snapc %p not current, but oldest\n",
diff --git a/fs/ceph/auth.c b/fs/ceph/auth.c
index f6394b9..818afe7 100644
--- a/fs/ceph/auth.c
+++ b/fs/ceph/auth.c
@@ -3,6 +3,7 @@
#include <linux/module.h>
#include <linux/slab.h>
#include <linux/err.h>
+#include <linux/slab.h>
#include "types.h"
#include "auth_none.h"
diff --git a/fs/ceph/auth_none.h b/fs/ceph/auth_none.h
index 56c0553..8164df1 100644
--- a/fs/ceph/auth_none.h
+++ b/fs/ceph/auth_none.h
@@ -1,6 +1,8 @@
#ifndef _FS_CEPH_AUTH_NONE_H
#define _FS_CEPH_AUTH_NONE_H
+#include <linux/slab.h>
+
#include "auth.h"
/*
diff --git a/fs/ceph/auth_x.c b/fs/ceph/auth_x.c
index d9001a4..fee5a08d 100644
--- a/fs/ceph/auth_x.c
+++ b/fs/ceph/auth_x.c
@@ -12,8 +12,6 @@
#include "auth.h"
#include "decode.h"
-struct kmem_cache *ceph_x_ticketbuf_cachep;
-
#define TEMP_TICKET_BUF_LEN 256
static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed);
@@ -131,13 +129,12 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac,
char *ticket_buf;
u8 struct_v;
- dbuf = kmem_cache_alloc(ceph_x_ticketbuf_cachep, GFP_NOFS | GFP_ATOMIC);
+ dbuf = kmalloc(TEMP_TICKET_BUF_LEN, GFP_NOFS);
if (!dbuf)
return -ENOMEM;
ret = -ENOMEM;
- ticket_buf = kmem_cache_alloc(ceph_x_ticketbuf_cachep,
- GFP_NOFS | GFP_ATOMIC);
+ ticket_buf = kmalloc(TEMP_TICKET_BUF_LEN, GFP_NOFS);
if (!ticket_buf)
goto out_dbuf;
@@ -251,9 +248,9 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac,
ret = 0;
out:
- kmem_cache_free(ceph_x_ticketbuf_cachep, ticket_buf);
+ kfree(ticket_buf);
out_dbuf:
- kmem_cache_free(ceph_x_ticketbuf_cachep, dbuf);
+ kfree(dbuf);
return ret;
bad:
@@ -605,8 +602,6 @@ static void ceph_x_destroy(struct ceph_auth_client *ac)
remove_ticket_handler(ac, th);
}
- kmem_cache_destroy(ceph_x_ticketbuf_cachep);
-
kfree(ac->private);
ac->private = NULL;
}
@@ -641,26 +636,20 @@ int ceph_x_init(struct ceph_auth_client *ac)
int ret;
dout("ceph_x_init %p\n", ac);
+ ret = -ENOMEM;
xi = kzalloc(sizeof(*xi), GFP_NOFS);
if (!xi)
- return -ENOMEM;
+ goto out;
- ret = -ENOMEM;
- ceph_x_ticketbuf_cachep = kmem_cache_create("ceph_x_ticketbuf",
- TEMP_TICKET_BUF_LEN, 8,
- (SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD),
- NULL);
- if (!ceph_x_ticketbuf_cachep)
- goto done_nomem;
ret = -EINVAL;
if (!ac->secret) {
pr_err("no secret set (for auth_x protocol)\n");
- goto done_nomem;
+ goto out_nomem;
}
ret = ceph_crypto_key_unarmor(&xi->secret, ac->secret);
if (ret)
- goto done_nomem;
+ goto out_nomem;
xi->starting = true;
xi->ticket_handlers = RB_ROOT;
@@ -670,10 +659,9 @@ int ceph_x_init(struct ceph_auth_client *ac)
ac->ops = &ceph_x_ops;
return 0;
-done_nomem:
+out_nomem:
kfree(xi);
- if (ceph_x_ticketbuf_cachep)
- kmem_cache_destroy(ceph_x_ticketbuf_cachep);
+out:
return ret;
}
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 3710e07..d940053 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -858,6 +858,8 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
}
/*
+ * Remove a cap. Take steps to deal with a racing iterate_session_caps.
+ *
* caller should hold i_lock.
* caller will not hold session s_mutex if called from destroy_inode.
*/
@@ -866,15 +868,10 @@ void __ceph_remove_cap(struct ceph_cap *cap)
struct ceph_mds_session *session = cap->session;
struct ceph_inode_info *ci = cap->ci;
struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc;
+ int removed = 0;
dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
- /* remove from inode list */
- rb_erase(&cap->ci_node, &ci->i_caps);
- cap->ci = NULL;
- if (ci->i_auth_cap == cap)
- ci->i_auth_cap = NULL;
-
/* remove from session list */
spin_lock(&session->s_cap_lock);
if (session->s_cap_iterator == cap) {
@@ -885,10 +882,18 @@ void __ceph_remove_cap(struct ceph_cap *cap)
list_del_init(&cap->session_caps);
session->s_nr_caps--;
cap->session = NULL;
+ removed = 1;
}
+ /* protect backpointer with s_cap_lock: see iterate_session_caps */
+ cap->ci = NULL;
spin_unlock(&session->s_cap_lock);
- if (cap->session == NULL)
+ /* remove from inode list */
+ rb_erase(&cap->ci_node, &ci->i_caps);
+ if (ci->i_auth_cap == cap)
+ ci->i_auth_cap = NULL;
+
+ if (removed)
ceph_put_cap(cap);
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
@@ -1205,6 +1210,12 @@ retry:
if (capsnap->dirty_pages || capsnap->writing)
continue;
+ /*
+ * if cap writeback already occurred, we should have dropped
+ * the capsnap in ceph_put_wrbuffer_cap_refs.
+ */
+ BUG_ON(capsnap->dirty == 0);
+
/* pick mds, take s_mutex */
mds = __ceph_get_cap_mds(ci, &mseq);
if (session && session->s_mds != mds) {
@@ -1855,8 +1866,8 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
} else {
pr_err("%p auth cap %p not mds%d ???\n", inode,
cap, session->s_mds);
- spin_unlock(&inode->i_lock);
}
+ spin_unlock(&inode->i_lock);
}
}
@@ -2118,8 +2129,8 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
}
spin_unlock(&inode->i_lock);
- dout("put_cap_refs %p had %s %s\n", inode, ceph_cap_string(had),
- last ? "last" : "");
+ dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
+ last ? " last" : "", put ? " put" : "");
if (last && !flushsnaps)
ceph_check_caps(ci, 0, NULL);
@@ -2143,7 +2154,8 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
{
struct inode *inode = &ci->vfs_inode;
int last = 0;
- int last_snap = 0;
+ int complete_capsnap = 0;
+ int drop_capsnap = 0;
int found = 0;
struct ceph_cap_snap *capsnap = NULL;
@@ -2166,19 +2178,32 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
if (capsnap->context == snapc) {
found = 1;
- capsnap->dirty_pages -= nr;
- last_snap = !capsnap->dirty_pages;
break;
}
}
BUG_ON(!found);
+ capsnap->dirty_pages -= nr;
+ if (capsnap->dirty_pages == 0) {
+ complete_capsnap = 1;
+ if (capsnap->dirty == 0)
+ /* cap writeback completed before we created
+ * the cap_snap; no FLUSHSNAP is needed */
+ drop_capsnap = 1;
+ }
dout("put_wrbuffer_cap_refs on %p cap_snap %p "
- " snap %lld %d/%d -> %d/%d %s%s\n",
+ " snap %lld %d/%d -> %d/%d %s%s%s\n",
inode, capsnap, capsnap->context->seq,
ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
ci->i_wrbuffer_ref, capsnap->dirty_pages,
last ? " (wrbuffer last)" : "",
- last_snap ? " (capsnap last)" : "");
+ complete_capsnap ? " (complete capsnap)" : "",
+ drop_capsnap ? " (drop capsnap)" : "");
+ if (drop_capsnap) {
+ ceph_put_snap_context(capsnap->context);
+ list_del(&capsnap->ci_item);
+ list_del(&capsnap->flushing_item);
+ ceph_put_cap_snap(capsnap);
+ }
}
spin_unlock(&inode->i_lock);
@@ -2186,10 +2211,12 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
if (last) {
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
iput(inode);
- } else if (last_snap) {
+ } else if (complete_capsnap) {
ceph_flush_snaps(ci);
wake_up(&ci->i_cap_wq);
}
+ if (drop_capsnap)
+ iput(inode);
}
/*
@@ -2465,8 +2492,8 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
break;
}
WARN_ON(capsnap->dirty_pages || capsnap->writing);
- dout(" removing cap_snap %p follows %lld\n",
- capsnap, follows);
+ dout(" removing %p cap_snap %p follows %lld\n",
+ inode, capsnap, follows);
ceph_put_snap_context(capsnap->context);
list_del(&capsnap->ci_item);
list_del(&capsnap->flushing_item);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 7261dc6..650d2db 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -171,11 +171,11 @@ more:
spin_lock(&inode->i_lock);
spin_lock(&dcache_lock);
+ last = dentry;
+
if (err < 0)
goto out_unlock;
- last = dentry;
-
p = p->prev;
filp->f_pos++;
@@ -312,7 +312,7 @@ more:
req->r_readdir_offset = fi->next_offset;
req->r_args.readdir.frag = cpu_to_le32(frag);
req->r_args.readdir.max_entries = cpu_to_le32(max_entries);
- req->r_num_caps = max_entries;
+ req->r_num_caps = max_entries + 1;
err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err < 0) {
ceph_mdsc_put_request(req);
@@ -489,6 +489,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
struct inode *inode = ceph_get_snapdir(parent);
dout("ENOENT on snapdir %p '%.*s', linking to snapdir %p\n",
dentry, dentry->d_name.len, dentry->d_name.name, inode);
+ BUG_ON(!d_unhashed(dentry));
d_add(dentry, inode);
err = 0;
}
@@ -879,7 +880,16 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
* do_request, above). If there is no trace, we need
* to do it here.
*/
+
+ /* d_move screws up d_subdirs order */
+ ceph_i_clear(new_dir, CEPH_I_COMPLETE);
+
d_move(old_dentry, new_dentry);
+
+ /* ensure target dentry is invalidated, despite
+ rehashing bug in vfs_rename_dir */
+ new_dentry->d_time = jiffies;
+ ceph_dentry(new_dentry)->lease_shared_gen = 0;
}
ceph_mdsc_put_request(req);
return err;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 4add3d5..ed6f197 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -665,7 +665,8 @@ more:
* throw out any page cache pages in this range. this
* may block.
*/
- truncate_inode_pages_range(inode->i_mapping, pos, pos+len);
+ truncate_inode_pages_range(inode->i_mapping, pos,
+ (pos+len) | (PAGE_CACHE_SIZE-1));
} else {
pages = alloc_page_vector(num_pages);
if (IS_ERR(pages)) {
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index aca82d5..85b4d2f 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -733,6 +733,10 @@ no_change:
__ceph_get_fmode(ci, cap_fmode);
spin_unlock(&inode->i_lock);
}
+ } else if (cap_fmode >= 0) {
+ pr_warning("mds issued no caps on %llx.%llx\n",
+ ceph_vinop(inode));
+ __ceph_get_fmode(ci, cap_fmode);
}
/* update delegation info? */
@@ -886,6 +890,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
struct inode *in = NULL;
struct ceph_mds_reply_inode *ininfo;
struct ceph_vino vino;
+ struct ceph_client *client = ceph_sb_to_client(sb);
int i = 0;
int err = 0;
@@ -949,7 +954,14 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
return err;
}
- if (rinfo->head->is_dentry && !req->r_aborted) {
+ /*
+ * ignore null lease/binding on snapdir ENOENT, or else we
+ * will have trouble splicing in the virtual snapdir later
+ */
+ if (rinfo->head->is_dentry && !req->r_aborted &&
+ (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
+ client->mount_args->snapdir_name,
+ req->r_dentry->d_name.len))) {
/*
* lookup link rename : null -> possibly existing inode
* mknod symlink mkdir : null -> new inode
@@ -989,6 +1001,10 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
dn, dn->d_name.len, dn->d_name.name);
dout("fill_trace doing d_move %p -> %p\n",
req->r_old_dentry, dn);
+
+ /* d_move screws up d_subdirs order */
+ ceph_i_clear(dir, CEPH_I_COMPLETE);
+
d_move(req->r_old_dentry, dn);
dout(" src %p '%.*s' dst %p '%.*s'\n",
req->r_old_dentry,
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 60a9a4a..24561a5 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -736,9 +736,10 @@ static void cleanup_cap_releases(struct ceph_mds_session *session)
}
/*
- * Helper to safely iterate over all caps associated with a session.
+ * Helper to safely iterate over all caps associated with a session, with
+ * special care taken to handle a racing __ceph_remove_cap().
*
- * caller must hold session s_mutex
+ * Caller must hold session s_mutex.
*/
static int iterate_session_caps(struct ceph_mds_session *session,
int (*cb)(struct inode *, struct ceph_cap *,
@@ -2136,7 +2137,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
struct ceph_mds_session *session = NULL;
struct ceph_msg *reply;
struct rb_node *p;
- int err;
+ int err = -ENOMEM;
struct ceph_pagelist *pagelist;
pr_info("reconnect to recovering mds%d\n", mds);
@@ -2185,7 +2186,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
goto fail;
err = iterate_session_caps(session, encode_caps_cb, pagelist);
if (err < 0)
- goto out;
+ goto fail;
/*
* snaprealms. we provide mds with the ino, seq (version), and
@@ -2213,28 +2214,31 @@ send:
reply->nr_pages = calc_pages_for(0, pagelist->length);
ceph_con_send(&session->s_con, reply);
- if (session) {
- session->s_state = CEPH_MDS_SESSION_OPEN;
- __wake_requests(mdsc, &session->s_waiting);
- }
+ session->s_state = CEPH_MDS_SESSION_OPEN;
+ mutex_unlock(&session->s_mutex);
+
+ mutex_lock(&mdsc->mutex);
+ __wake_requests(mdsc, &session->s_waiting);
+ mutex_unlock(&mdsc->mutex);
+
+ ceph_put_mds_session(session);
-out:
up_read(&mdsc->snap_rwsem);
- if (session) {
- mutex_unlock(&session->s_mutex);
- ceph_put_mds_session(session);
- }
mutex_lock(&mdsc->mutex);
return;
fail:
ceph_msg_put(reply);
+ up_read(&mdsc->snap_rwsem);
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
fail_nomsg:
ceph_pagelist_release(pagelist);
kfree(pagelist);
fail_nopagelist:
- pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
- goto out;
+ pr_err("error %d preparing reconnect for mds%d\n", err, mds);
+ mutex_lock(&mdsc->mutex);
+ return;
}
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 8f1715f..cd4fadb 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -30,6 +30,10 @@ static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
+#ifdef CONFIG_LOCKDEP
+static struct lock_class_key socket_class;
+#endif
+
static void queue_con(struct ceph_connection *con);
static void con_work(struct work_struct *);
@@ -228,6 +232,10 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
con->sock = sock;
sock->sk->sk_allocation = GFP_NOFS;
+#ifdef CONFIG_LOCKDEP
+ lockdep_set_class(&sock->sk->sk_lock, &socket_class);
+#endif
+
set_sock_callbacks(sock, con);
dout("connect %s\n", pr_addr(&con->peer_addr.in_addr));
@@ -333,6 +341,7 @@ static void reset_connection(struct ceph_connection *con)
con->out_msg = NULL;
}
con->in_seq = 0;
+ con->in_seq_acked = 0;
}
/*
@@ -483,7 +492,14 @@ static void prepare_write_message(struct ceph_connection *con)
list_move_tail(&m->list_head, &con->out_sent);
}
- m->hdr.seq = cpu_to_le64(++con->out_seq);
+ /*
+ * only assign outgoing seq # if we haven't sent this message
+ * yet. if it is requeued, resend with it's original seq.
+ */
+ if (m->needs_out_seq) {
+ m->hdr.seq = cpu_to_le64(++con->out_seq);
+ m->needs_out_seq = false;
+ }
dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
@@ -1325,6 +1341,7 @@ static int read_partial_message(struct ceph_connection *con)
unsigned front_len, middle_len, data_len, data_off;
int datacrc = con->msgr->nocrc;
int skip;
+ u64 seq;
dout("read_partial_message con %p msg %p\n", con, m);
@@ -1359,6 +1376,25 @@ static int read_partial_message(struct ceph_connection *con)
return -EIO;
data_off = le16_to_cpu(con->in_hdr.data_off);
+ /* verify seq# */
+ seq = le64_to_cpu(con->in_hdr.seq);
+ if ((s64)seq - (s64)con->in_seq < 1) {
+ pr_info("skipping %s%lld %s seq %lld, expected %lld\n",
+ ENTITY_NAME(con->peer_name),
+ pr_addr(&con->peer_addr.in_addr),
+ seq, con->in_seq + 1);
+ con->in_base_pos = -front_len - middle_len - data_len -
+ sizeof(m->footer);
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ con->in_seq++;
+ return 0;
+ } else if ((s64)seq - (s64)con->in_seq > 1) {
+ pr_err("read_partial_message bad seq %lld expected %lld\n",
+ seq, con->in_seq + 1);
+ con->error_msg = "bad message sequence # for incoming message";
+ return -EBADMSG;
+ }
+
/* allocate message? */
if (!con->in_msg) {
dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
@@ -1370,6 +1406,7 @@ static int read_partial_message(struct ceph_connection *con)
con->in_base_pos = -front_len - middle_len - data_len -
sizeof(m->footer);
con->in_tag = CEPH_MSGR_TAG_READY;
+ con->in_seq++;
return 0;
}
if (IS_ERR(con->in_msg)) {
@@ -1956,6 +1993,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
+ msg->needs_out_seq = true;
+
/* queue */
mutex_lock(&con->mutex);
BUG_ON(!list_empty(&msg->list_head));
@@ -2021,6 +2060,7 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
con->in_tag = CEPH_MSGR_TAG_READY;
+ con->in_seq++;
} else {
dout("con_revoke_pages %p msg %p pages %p no-op\n",
con, con->in_msg, msg);
@@ -2054,15 +2094,19 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
kref_init(&m->kref);
INIT_LIST_HEAD(&m->list_head);
+ m->hdr.tid = 0;
m->hdr.type = cpu_to_le16(type);
+ m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
+ m->hdr.version = 0;
m->hdr.front_len = cpu_to_le32(front_len);
m->hdr.middle_len = 0;
m->hdr.data_len = cpu_to_le32(page_len);
m->hdr.data_off = cpu_to_le16(page_off);
- m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
+ m->hdr.reserved = 0;
m->footer.front_crc = 0;
m->footer.middle_crc = 0;
m->footer.data_crc = 0;
+ m->footer.flags = 0;
m->front_max = front_len;
m->front_is_vmalloc = false;
m->more_to_follow = false;
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index a343dae..a5caf91 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -86,6 +86,7 @@ struct ceph_msg {
struct kref kref;
bool front_is_vmalloc;
bool more_to_follow;
+ bool needs_out_seq;
int front_max;
struct ceph_msgpool *pool;
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index c7b4ded..3514f71 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -565,7 +565,8 @@ static int __map_osds(struct ceph_osd_client *osdc,
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
struct ceph_pg pgid;
- int o = -1;
+ int acting[CEPH_PG_MAX_SIZE];
+ int o = -1, num = 0;
int err;
dout("map_osds %p tid %lld\n", req, req->r_tid);
@@ -576,10 +577,16 @@ static int __map_osds(struct ceph_osd_client *osdc,
pgid = reqhead->layout.ol_pgid;
req->r_pgid = pgid;
- o = ceph_calc_pg_primary(osdc->osdmap, pgid);
+ err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
+ if (err > 0) {
+ o = acting[0];
+ num = err;
+ }
if ((req->r_osd && req->r_osd->o_osd == o &&
- req->r_sent >= req->r_osd->o_incarnation) ||
+ req->r_sent >= req->r_osd->o_incarnation &&
+ req->r_num_pg_osds == num &&
+ memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
(req->r_osd == NULL && o == -1))
return 0; /* no change */
@@ -587,6 +594,10 @@ static int __map_osds(struct ceph_osd_client *osdc,
req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
req->r_osd ? req->r_osd->o_osd : -1);
+ /* record full pg acting set */
+ memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
+ req->r_num_pg_osds = num;
+
if (req->r_osd) {
__cancel_request(req);
list_del_init(&req->r_osd_item);
@@ -612,7 +623,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
__remove_osd_from_lru(req->r_osd);
list_add(&req->r_osd_item, &req->r_osd->o_requests);
}
- err = 1; /* osd changed */
+ err = 1; /* osd or pg changed */
out:
return err;
@@ -779,16 +790,18 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
struct ceph_osd_request *req;
u64 tid;
int numops, object_len, flags;
+ s32 result;
tid = le64_to_cpu(msg->hdr.tid);
if (msg->front.iov_len < sizeof(*rhead))
goto bad;
numops = le32_to_cpu(rhead->num_ops);
object_len = le32_to_cpu(rhead->object_len);
+ result = le32_to_cpu(rhead->result);
if (msg->front.iov_len != sizeof(*rhead) + object_len +
numops * sizeof(struct ceph_osd_op))
goto bad;
- dout("handle_reply %p tid %llu\n", msg, tid);
+ dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
/* lookup */
mutex_lock(&osdc->request_mutex);
@@ -834,7 +847,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
dout("handle_reply tid %llu flags %d\n", tid, flags);
/* either this is a read, or we got the safe response */
- if ((flags & CEPH_OSD_FLAG_ONDISK) ||
+ if (result < 0 ||
+ (flags & CEPH_OSD_FLAG_ONDISK) ||
((flags & CEPH_OSD_FLAG_WRITE) == 0))
__unregister_request(osdc, req);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index b075991..ce77698 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -48,6 +48,8 @@ struct ceph_osd_request {
struct list_head r_osd_item;
struct ceph_osd *r_osd;
struct ceph_pg r_pgid;
+ int r_pg_osds[CEPH_PG_MAX_SIZE];
+ int r_num_pg_osds;
struct ceph_connection *r_con_filling_msg;
@@ -66,7 +68,6 @@ struct ceph_osd_request {
struct list_head r_unsafe_item;
struct inode *r_inode; /* for use by callbacks */
- struct writeback_control *r_wbc; /* ditto */
char r_oid[40]; /* object name */
int r_oid_len;
diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c
index 21c6623..cfdd8f4 100644
--- a/fs/ceph/osdmap.c
+++ b/fs/ceph/osdmap.c
@@ -314,71 +314,6 @@ bad:
return ERR_PTR(err);
}
-
-/*
- * osd map
- */
-void ceph_osdmap_destroy(struct ceph_osdmap *map)
-{
- dout("osdmap_destroy %p\n", map);
- if (map->crush)
- crush_destroy(map->crush);
- while (!RB_EMPTY_ROOT(&map->pg_temp)) {
- struct ceph_pg_mapping *pg =
- rb_entry(rb_first(&map->pg_temp),
- struct ceph_pg_mapping, node);
- rb_erase(&pg->node, &map->pg_temp);
- kfree(pg);
- }
- while (!RB_EMPTY_ROOT(&map->pg_pools)) {
- struct ceph_pg_pool_info *pi =
- rb_entry(rb_first(&map->pg_pools),
- struct ceph_pg_pool_info, node);
- rb_erase(&pi->node, &map->pg_pools);
- kfree(pi);
- }
- kfree(map->osd_state);
- kfree(map->osd_weight);
- kfree(map->osd_addr);
- kfree(map);
-}
-
-/*
- * adjust max osd value. reallocate arrays.
- */
-static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
-{
- u8 *state;
- struct ceph_entity_addr *addr;
- u32 *weight;
-
- state = kcalloc(max, sizeof(*state), GFP_NOFS);
- addr = kcalloc(max, sizeof(*addr), GFP_NOFS);
- weight = kcalloc(max, sizeof(*weight), GFP_NOFS);
- if (state == NULL || addr == NULL || weight == NULL) {
- kfree(state);
- kfree(addr);
- kfree(weight);
- return -ENOMEM;
- }
-
- /* copy old? */
- if (map->osd_state) {
- memcpy(state, map->osd_state, map->max_osd*sizeof(*state));
- memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr));
- memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight));
- kfree(map->osd_state);
- kfree(map->osd_addr);
- kfree(map->osd_weight);
- }
-
- map->osd_state = state;
- map->osd_weight = weight;
- map->osd_addr = addr;
- map->max_osd = max;
- return 0;
-}
-
/*
* rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
* to a set of osds)
@@ -482,6 +417,13 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
return NULL;
}
+static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
+{
+ rb_erase(&pi->node, root);
+ kfree(pi->name);
+ kfree(pi);
+}
+
void __decode_pool(void **p, struct ceph_pg_pool_info *pi)
{
ceph_decode_copy(p, &pi->v, sizeof(pi->v));
@@ -490,6 +432,98 @@ void __decode_pool(void **p, struct ceph_pg_pool_info *pi)
*p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2;
}
+static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
+{
+ struct ceph_pg_pool_info *pi;
+ u32 num, len, pool;
+
+ ceph_decode_32_safe(p, end, num, bad);
+ dout(" %d pool names\n", num);
+ while (num--) {
+ ceph_decode_32_safe(p, end, pool, bad);
+ ceph_decode_32_safe(p, end, len, bad);
+ dout(" pool %d len %d\n", pool, len);
+ pi = __lookup_pg_pool(&map->pg_pools, pool);
+ if (pi) {
+ kfree(pi->name);
+ pi->name = kmalloc(len + 1, GFP_NOFS);
+ if (pi->name) {
+ memcpy(pi->name, *p, len);
+ pi->name[len] = '\0';
+ dout(" name is %s\n", pi->name);
+ }
+ }
+ *p += len;
+ }
+ return 0;
+
+bad:
+ return -EINVAL;
+}
+
+/*
+ * osd map
+ */
+void ceph_osdmap_destroy(struct ceph_osdmap *map)
+{
+ dout("osdmap_destroy %p\n", map);
+ if (map->crush)
+ crush_destroy(map->crush);
+ while (!RB_EMPTY_ROOT(&map->pg_temp)) {
+ struct ceph_pg_mapping *pg =
+ rb_entry(rb_first(&map->pg_temp),
+ struct ceph_pg_mapping, node);
+ rb_erase(&pg->node, &map->pg_temp);
+ kfree(pg);
+ }
+ while (!RB_EMPTY_ROOT(&map->pg_pools)) {
+ struct ceph_pg_pool_info *pi =
+ rb_entry(rb_first(&map->pg_pools),
+ struct ceph_pg_pool_info, node);
+ __remove_pg_pool(&map->pg_pools, pi);
+ }
+ kfree(map->osd_state);
+ kfree(map->osd_weight);
+ kfree(map->osd_addr);
+ kfree(map);
+}
+
+/*
+ * adjust max osd value. reallocate arrays.
+ */
+static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
+{
+ u8 *state;
+ struct ceph_entity_addr *addr;
+ u32 *weight;
+
+ state = kcalloc(max, sizeof(*state), GFP_NOFS);
+ addr = kcalloc(max, sizeof(*addr), GFP_NOFS);
+ weight = kcalloc(max, sizeof(*weight), GFP_NOFS);
+ if (state == NULL || addr == NULL || weight == NULL) {
+ kfree(state);
+ kfree(addr);
+ kfree(weight);
+ return -ENOMEM;
+ }
+
+ /* copy old? */
+ if (map->osd_state) {
+ memcpy(state, map->osd_state, map->max_osd*sizeof(*state));
+ memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr));
+ memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight));
+ kfree(map->osd_state);
+ kfree(map->osd_addr);
+ kfree(map->osd_weight);
+ }
+
+ map->osd_state = state;
+ map->osd_weight = weight;
+ map->osd_addr = addr;
+ map->max_osd = max;
+ return 0;
+}
+
/*
* decode a full map.
*/
@@ -526,7 +560,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
ceph_decode_32_safe(p, end, max, bad);
while (max--) {
ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
- pi = kmalloc(sizeof(*pi), GFP_NOFS);
+ pi = kzalloc(sizeof(*pi), GFP_NOFS);
if (!pi)
goto bad;
pi->id = ceph_decode_32(p);
@@ -539,6 +573,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
__decode_pool(p, pi);
__insert_pg_pool(&map->pg_pools, pi);
}
+
+ if (version >= 5 && __decode_pool_names(p, end, map) < 0)
+ goto bad;
+
ceph_decode_32_safe(p, end, map->pool_max, bad);
ceph_decode_32_safe(p, end, map->flags, bad);
@@ -712,7 +750,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
}
pi = __lookup_pg_pool(&map->pg_pools, pool);
if (!pi) {
- pi = kmalloc(sizeof(*pi), GFP_NOFS);
+ pi = kzalloc(sizeof(*pi), GFP_NOFS);
if (!pi) {
err = -ENOMEM;
goto bad;
@@ -722,6 +760,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
}
__decode_pool(p, pi);
}
+ if (version >= 5 && __decode_pool_names(p, end, map) < 0)
+ goto bad;
/* old_pool */
ceph_decode_32_safe(p, end, len, bad);
@@ -730,10 +770,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
ceph_decode_32_safe(p, end, pool, bad);
pi = __lookup_pg_pool(&map->pg_pools, pool);
- if (pi) {
- rb_erase(&pi->node, &map->pg_pools);
- kfree(pi);
- }
+ if (pi)
+ __remove_pg_pool(&map->pg_pools, pi);
}
/* new_up */
@@ -1003,12 +1041,33 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
}
/*
+ * Return acting set for given pgid.
+ */
+int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
+ int *acting)
+{
+ int rawosds[CEPH_PG_MAX_SIZE], *osds;
+ int i, o, num = CEPH_PG_MAX_SIZE;
+
+ osds = calc_pg_raw(osdmap, pgid, rawosds, &num);
+ if (!osds)
+ return -1;
+
+ /* primary is first up osd */
+ o = 0;
+ for (i = 0; i < num; i++)
+ if (ceph_osd_is_up(osdmap, osds[i]))
+ acting[o++] = osds[i];
+ return o;
+}
+
+/*
* Return primary osd for given pgid, or -1 if none.
*/
int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid)
{
- int rawosds[10], *osds;
- int i, num = ARRAY_SIZE(rawosds);
+ int rawosds[CEPH_PG_MAX_SIZE], *osds;
+ int i, num = CEPH_PG_MAX_SIZE;
osds = calc_pg_raw(osdmap, pgid, rawosds, &num);
if (!osds)
@@ -1016,9 +1075,7 @@ int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid)
/* primary is first up osd */
for (i = 0; i < num; i++)
- if (ceph_osd_is_up(osdmap, osds[i])) {
+ if (ceph_osd_is_up(osdmap, osds[i]))
return osds[i];
- break;
- }
return -1;
}
diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h
index 1fb55af..970b547 100644
--- a/fs/ceph/osdmap.h
+++ b/fs/ceph/osdmap.h
@@ -23,6 +23,7 @@ struct ceph_pg_pool_info {
int id;
struct ceph_pg_pool v;
int pg_num_mask, pgp_num_mask, lpg_num_mask, lpgp_num_mask;
+ char *name;
};
struct ceph_pg_mapping {
@@ -119,6 +120,8 @@ extern int ceph_calc_object_layout(struct ceph_object_layout *ol,
const char *oid,
struct ceph_file_layout *fl,
struct ceph_osdmap *osdmap);
+extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
+ int *acting);
extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
struct ceph_pg pgid);
diff --git a/fs/ceph/rados.h b/fs/ceph/rados.h
index 26ac8b8..fd56451 100644
--- a/fs/ceph/rados.h
+++ b/fs/ceph/rados.h
@@ -11,8 +11,10 @@
/*
* osdmap encoding versions
*/
-#define CEPH_OSDMAP_INC_VERSION 4
-#define CEPH_OSDMAP_VERSION 4
+#define CEPH_OSDMAP_INC_VERSION 5
+#define CEPH_OSDMAP_INC_VERSION_EXT 5
+#define CEPH_OSDMAP_VERSION 5
+#define CEPH_OSDMAP_VERSION_EXT 5
/*
* fs id
@@ -56,6 +58,7 @@ struct ceph_timespec {
#define CEPH_PG_LAYOUT_LINEAR 2
#define CEPH_PG_LAYOUT_HYBRID 3
+#define CEPH_PG_MAX_SIZE 16 /* max # osds in a single pg */
/*
* placement group.
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index e6f9bc5..d5114db 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -431,8 +431,7 @@ static int dup_array(u64 **dst, __le64 *src, int num)
* Caller must hold snap_rwsem for read (i.e., the realm topology won't
* change).
*/
-void ceph_queue_cap_snap(struct ceph_inode_info *ci,
- struct ceph_snap_context *snapc)
+void ceph_queue_cap_snap(struct ceph_inode_info *ci)
{
struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap;
@@ -451,10 +450,11 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
as no new writes are allowed to start when pending, so any
writes in progress now were started before the previous
cap_snap. lucky us. */
- dout("queue_cap_snap %p snapc %p seq %llu used %d"
- " already pending\n", inode, snapc, snapc->seq, used);
+ dout("queue_cap_snap %p already pending\n", inode);
kfree(capsnap);
} else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) {
+ struct ceph_snap_context *snapc = ci->i_head_snapc;
+
igrab(inode);
atomic_set(&capsnap->nref, 1);
@@ -463,7 +463,6 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
INIT_LIST_HEAD(&capsnap->flushing_item);
capsnap->follows = snapc->seq - 1;
- capsnap->context = ceph_get_snap_context(snapc);
capsnap->issued = __ceph_caps_issued(ci, NULL);
capsnap->dirty = __ceph_caps_dirty(ci);
@@ -480,7 +479,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
snapshot. */
capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
ci->i_wrbuffer_ref_head = 0;
- ceph_put_snap_context(ci->i_head_snapc);
+ capsnap->context = snapc;
ci->i_head_snapc = NULL;
list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
@@ -522,15 +521,17 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
capsnap->ctime = inode->i_ctime;
capsnap->time_warp_seq = ci->i_time_warp_seq;
if (capsnap->dirty_pages) {
- dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu "
+ dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
"still has %d dirty pages\n", inode, capsnap,
capsnap->context, capsnap->context->seq,
- capsnap->size, capsnap->dirty_pages);
+ ceph_cap_string(capsnap->dirty), capsnap->size,
+ capsnap->dirty_pages);
return 0;
}
- dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu clean\n",
+ dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
inode, capsnap, capsnap->context,
- capsnap->context->seq, capsnap->size);
+ capsnap->context->seq, ceph_cap_string(capsnap->dirty),
+ capsnap->size);
spin_lock(&mdsc->snap_flush_lock);
list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
@@ -602,7 +603,7 @@ more:
if (lastinode)
iput(lastinode);
lastinode = inode;
- ceph_queue_cap_snap(ci, realm->cached_context);
+ ceph_queue_cap_snap(ci);
spin_lock(&realm->inodes_with_caps_lock);
}
spin_unlock(&realm->inodes_with_caps_lock);
@@ -824,8 +825,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
spin_unlock(&realm->inodes_with_caps_lock);
spin_unlock(&inode->i_lock);
- ceph_queue_cap_snap(ci,
- ci->i_snap_realm->cached_context);
+ ceph_queue_cap_snap(ci);
iput(inode);
continue;
@@ -869,16 +869,20 @@ skip_inode:
continue;
ci = ceph_inode(inode);
spin_lock(&inode->i_lock);
- if (!ci->i_snap_realm)
- goto split_skip_inode;
- ceph_put_snap_realm(mdsc, ci->i_snap_realm);
- spin_lock(&realm->inodes_with_caps_lock);
- list_add(&ci->i_snap_realm_item,
- &realm->inodes_with_caps);
- ci->i_snap_realm = realm;
- spin_unlock(&realm->inodes_with_caps_lock);
- ceph_get_snap_realm(mdsc, realm);
-split_skip_inode:
+ if (list_empty(&ci->i_snap_realm_item)) {
+ struct ceph_snap_realm *oldrealm =
+ ci->i_snap_realm;
+
+ dout(" moving %p to split realm %llx %p\n",
+ inode, realm->ino, realm);
+ spin_lock(&realm->inodes_with_caps_lock);
+ list_add(&ci->i_snap_realm_item,
+ &realm->inodes_with_caps);
+ ci->i_snap_realm = realm;
+ spin_unlock(&realm->inodes_with_caps_lock);
+ ceph_get_snap_realm(mdsc, realm);
+ ceph_put_snap_realm(mdsc, oldrealm);
+ }
spin_unlock(&inode->i_lock);
iput(inode);
}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 75d02ea..110857b 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -47,10 +47,20 @@ const char *ceph_file_part(const char *s, int len)
*/
static void ceph_put_super(struct super_block *s)
{
- struct ceph_client *cl = ceph_client(s);
+ struct ceph_client *client = ceph_sb_to_client(s);
dout("put_super\n");
- ceph_mdsc_close_sessions(&cl->mdsc);
+ ceph_mdsc_close_sessions(&client->mdsc);
+
+ /*
+ * ensure we release the bdi before put_anon_super releases
+ * the device name.
+ */
+ if (s->s_bdi == &client->backing_dev_info) {
+ bdi_unregister(&client->backing_dev_info);
+ s->s_bdi = NULL;
+ }
+
return;
}
@@ -636,6 +646,8 @@ static void ceph_destroy_client(struct ceph_client *client)
destroy_workqueue(client->pg_inv_wq);
destroy_workqueue(client->trunc_wq);
+ bdi_destroy(&client->backing_dev_info);
+
if (client->msgr)
ceph_messenger_destroy(client->msgr);
mempool_destroy(client->wb_pagevec_pool);
@@ -876,14 +888,14 @@ static int ceph_register_bdi(struct super_block *sb, struct ceph_client *client)
{
int err;
- sb->s_bdi = &client->backing_dev_info;
-
/* set ra_pages based on rsize mount option? */
if (client->mount_args->rsize >= PAGE_CACHE_SIZE)
client->backing_dev_info.ra_pages =
(client->mount_args->rsize + PAGE_CACHE_SIZE - 1)
>> PAGE_SHIFT;
err = bdi_register_dev(&client->backing_dev_info, sb->s_dev);
+ if (!err)
+ sb->s_bdi = &client->backing_dev_info;
return err;
}
@@ -957,9 +969,6 @@ static void ceph_kill_sb(struct super_block *s)
dout("kill_sb %p\n", s);
ceph_mdsc_pre_umount(&client->mdsc);
kill_anon_super(s); /* will call put_super after sb is r/o */
- if (s->s_bdi == &client->backing_dev_info)
- bdi_unregister(&client->backing_dev_info);
- bdi_destroy(&client->backing_dev_info);
ceph_destroy_client(client);
}
@@ -996,9 +1005,10 @@ static int __init init_ceph(void)
if (ret)
goto out_icache;
- pr_info("loaded %d.%d.%d (mon/mds/osd proto %d/%d/%d)\n",
- CEPH_VERSION_MAJOR, CEPH_VERSION_MINOR, CEPH_VERSION_PATCH,
- CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL);
+ pr_info("loaded (mon/mds/osd proto %d/%d/%d, osdmap %d/%d %d/%d)\n",
+ CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL,
+ CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT,
+ CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT);
return 0;
out_icache:
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index ca702c6..13513b8 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -10,6 +10,7 @@
#include <linux/fs.h>
#include <linux/mempool.h>
#include <linux/pagemap.h>
+#include <linux/slab.h>
#include <linux/wait.h>
#include <linux/writeback.h>
#include <linux/slab.h>
@@ -715,8 +716,7 @@ extern int ceph_update_snap_trace(struct ceph_mds_client *m,
extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_msg *msg);
-extern void ceph_queue_cap_snap(struct ceph_inode_info *ci,
- struct ceph_snap_context *snapc);
+extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap);
extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
OpenPOWER on IntegriCloud