summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c14
-rw-r--r--fs/ceph/addr.c324
-rw-r--r--fs/ceph/caps.c11
-rw-r--r--fs/ceph/dir.c69
-rw-r--r--fs/ceph/export.c13
-rw-r--r--fs/ceph/file.c15
-rw-r--r--fs/ceph/inode.c34
-rw-r--r--fs/ceph/mds_client.c7
-rw-r--r--fs/ceph/snap.c16
-rw-r--r--fs/ceph/super.c47
-rw-r--r--fs/ceph/super.h23
-rw-r--r--fs/ceph/xattr.c78
-rw-r--r--include/linux/ceph/ceph_features.h2
-rw-r--r--include/linux/ceph/ceph_fs.h7
-rw-r--r--include/linux/ceph/libceph.h8
-rw-r--r--include/linux/ceph/mon_client.h31
-rw-r--r--include/linux/ceph/osd_client.h15
-rw-r--r--net/ceph/ceph_common.c4
-rw-r--r--net/ceph/debugfs.c17
-rw-r--r--net/ceph/messenger.c29
-rw-r--r--net/ceph/mon_client.c457
-rw-r--r--net/ceph/osd_client.c109
22 files changed, 811 insertions, 519 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4a87678..9c62344 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1847,14 +1847,12 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
if (osd_req->r_result < 0)
obj_request->result = osd_req->r_result;
- rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
-
/*
* We support a 64-bit length, but ultimately it has to be
* passed to the block layer, which just supports a 32-bit
* length field.
*/
- obj_request->xferred = osd_req->r_reply_op_len[0];
+ obj_request->xferred = osd_req->r_ops[0].outdata_len;
rbd_assert(obj_request->xferred < (u64)UINT_MAX);
opcode = osd_req->r_ops[0].op;
@@ -5643,18 +5641,12 @@ static void rbd_sysfs_cleanup(void)
static int rbd_slab_init(void)
{
rbd_assert(!rbd_img_request_cache);
- rbd_img_request_cache = kmem_cache_create("rbd_img_request",
- sizeof (struct rbd_img_request),
- __alignof__(struct rbd_img_request),
- 0, NULL);
+ rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
if (!rbd_img_request_cache)
return -ENOMEM;
rbd_assert(!rbd_obj_request_cache);
- rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
- sizeof (struct rbd_obj_request),
- __alignof__(struct rbd_obj_request),
- 0, NULL);
+ rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
if (!rbd_obj_request_cache)
goto out_err;
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 19adeb0..fc5cae2 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -175,8 +175,8 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
static int ceph_releasepage(struct page *page, gfp_t g)
{
- struct inode *inode = page->mapping ? page->mapping->host : NULL;
- dout("%p releasepage %p idx %lu\n", inode, page, page->index);
+ dout("%p releasepage %p idx %lu\n", page->mapping->host,
+ page, page->index);
WARN_ON(PageDirty(page));
/* Can we release the page from the cache? */
@@ -276,7 +276,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
for (i = 0; i < num_pages; i++) {
struct page *page = osd_data->pages[i];
- if (rc < 0 && rc != ENOENT)
+ if (rc < 0 && rc != -ENOENT)
goto unlock;
if (bytes < (int)PAGE_CACHE_SIZE) {
/* zero (remainder of) page */
@@ -606,71 +606,71 @@ static void writepages_finish(struct ceph_osd_request *req,
struct inode *inode = req->r_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_data *osd_data;
- unsigned wrote;
struct page *page;
- int num_pages;
- int i;
+ int num_pages, total_pages = 0;
+ int i, j;
+ int rc = req->r_result;
struct ceph_snap_context *snapc = req->r_snapc;
struct address_space *mapping = inode->i_mapping;
- int rc = req->r_result;
- u64 bytes = req->r_ops[0].extent.length;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
- long writeback_stat;
- unsigned issued = ceph_caps_issued(ci);
+ bool remove_page;
- osd_data = osd_req_op_extent_osd_data(req, 0);
- BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
- num_pages = calc_pages_for((u64)osd_data->alignment,
- (u64)osd_data->length);
- if (rc >= 0) {
- /*
- * Assume we wrote the pages we originally sent. The
- * osd might reply with fewer pages if our writeback
- * raced with a truncation and was adjusted at the osd,
- * so don't believe the reply.
- */
- wrote = num_pages;
- } else {
- wrote = 0;
+
+ dout("writepages_finish %p rc %d\n", inode, rc);
+ if (rc < 0)
mapping_set_error(mapping, rc);
- }
- dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
- inode, rc, bytes, wrote);
- /* clean all pages */
- for (i = 0; i < num_pages; i++) {
- page = osd_data->pages[i];
- BUG_ON(!page);
- WARN_ON(!PageUptodate(page));
+ /*
+ * We lost the cache cap, need to truncate the page before
+ * it is unlocked, otherwise we'd truncate it later in the
+ * page truncation thread, possibly losing some data that
+ * raced its way in
+ */
+ remove_page = !(ceph_caps_issued(ci) &
+ (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
- writeback_stat =
- atomic_long_dec_return(&fsc->writeback_count);
- if (writeback_stat <
- CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
- clear_bdi_congested(&fsc->backing_dev_info,
- BLK_RW_ASYNC);
+ /* clean all pages */
+ for (i = 0; i < req->r_num_ops; i++) {
+ if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
+ break;
- ceph_put_snap_context(page_snap_context(page));
- page->private = 0;
- ClearPagePrivate(page);
- dout("unlocking %d %p\n", i, page);
- end_page_writeback(page);
+ osd_data = osd_req_op_extent_osd_data(req, i);
+ BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+ num_pages = calc_pages_for((u64)osd_data->alignment,
+ (u64)osd_data->length);
+ total_pages += num_pages;
+ for (j = 0; j < num_pages; j++) {
+ page = osd_data->pages[j];
+ BUG_ON(!page);
+ WARN_ON(!PageUptodate(page));
+
+ if (atomic_long_dec_return(&fsc->writeback_count) <
+ CONGESTION_OFF_THRESH(
+ fsc->mount_options->congestion_kb))
+ clear_bdi_congested(&fsc->backing_dev_info,
+ BLK_RW_ASYNC);
+
+ ceph_put_snap_context(page_snap_context(page));
+ page->private = 0;
+ ClearPagePrivate(page);
+ dout("unlocking %p\n", page);
+ end_page_writeback(page);
+
+ if (remove_page)
+ generic_error_remove_page(inode->i_mapping,
+ page);
- /*
- * We lost the cache cap, need to truncate the page before
- * it is unlocked, otherwise we'd truncate it later in the
- * page truncation thread, possibly losing some data that
- * raced its way in
- */
- if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
- generic_error_remove_page(inode->i_mapping, page);
+ unlock_page(page);
+ }
+ dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
+ inode, osd_data->length, rc >= 0 ? num_pages : 0);
- unlock_page(page);
+ ceph_release_pages(osd_data->pages, num_pages);
}
- dout("%p wrote+cleaned %d pages\n", inode, wrote);
- ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
- ceph_release_pages(osd_data->pages, num_pages);
+ ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
+
+ osd_data = osd_req_op_extent_osd_data(req, 0);
if (osd_data->pages_from_pool)
mempool_free(osd_data->pages,
ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
@@ -778,17 +778,15 @@ retry:
while (!done && index <= end) {
unsigned i;
int first;
- pgoff_t next;
- int pvec_pages, locked_pages;
- struct page **pages = NULL;
+ pgoff_t strip_unit_end = 0;
+ int num_ops = 0, op_idx;
+ int pvec_pages, locked_pages = 0;
+ struct page **pages = NULL, **data_pages;
mempool_t *pool = NULL; /* Becomes non-null if mempool used */
struct page *page;
int want;
- u64 offset, len;
- long writeback_stat;
+ u64 offset = 0, len = 0;
- next = 0;
- locked_pages = 0;
max_pages = max_pages_ever;
get_more_pages:
@@ -824,8 +822,8 @@ get_more_pages:
unlock_page(page);
break;
}
- if (next && (page->index != next)) {
- dout("not consecutive %p\n", page);
+ if (strip_unit_end && (page->index > strip_unit_end)) {
+ dout("end of strip unit %p\n", page);
unlock_page(page);
break;
}
@@ -867,36 +865,31 @@ get_more_pages:
/*
* We have something to write. If this is
* the first locked page this time through,
- * allocate an osd request and a page array
- * that it will use.
+ * calculate max possinle write size and
+ * allocate a page array
*/
if (locked_pages == 0) {
- BUG_ON(pages);
+ u64 objnum;
+ u64 objoff;
+
/* prepare async write request */
offset = (u64)page_offset(page);
len = wsize;
- req = ceph_osdc_new_request(&fsc->client->osdc,
- &ci->i_layout, vino,
- offset, &len, 0,
- do_sync ? 2 : 1,
- CEPH_OSD_OP_WRITE,
- CEPH_OSD_FLAG_WRITE |
- CEPH_OSD_FLAG_ONDISK,
- snapc, truncate_seq,
- truncate_size, true);
- if (IS_ERR(req)) {
- rc = PTR_ERR(req);
+
+ rc = ceph_calc_file_object_mapping(&ci->i_layout,
+ offset, len,
+ &objnum, &objoff,
+ &len);
+ if (rc < 0) {
unlock_page(page);
break;
}
- if (do_sync)
- osd_req_op_init(req, 1,
- CEPH_OSD_OP_STARTSYNC, 0);
-
- req->r_callback = writepages_finish;
- req->r_inode = inode;
+ num_ops = 1 + do_sync;
+ strip_unit_end = page->index +
+ ((len - 1) >> PAGE_CACHE_SHIFT);
+ BUG_ON(pages);
max_pages = calc_pages_for(0, (u64)len);
pages = kmalloc(max_pages * sizeof (*pages),
GFP_NOFS);
@@ -905,6 +898,20 @@ get_more_pages:
pages = mempool_alloc(pool, GFP_NOFS);
BUG_ON(!pages);
}
+
+ len = 0;
+ } else if (page->index !=
+ (offset + len) >> PAGE_CACHE_SHIFT) {
+ if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
+ CEPH_OSD_MAX_OPS)) {
+ redirty_page_for_writepage(wbc, page);
+ unlock_page(page);
+ break;
+ }
+
+ num_ops++;
+ offset = (u64)page_offset(page);
+ len = 0;
}
/* note position of first page in pvec */
@@ -913,18 +920,16 @@ get_more_pages:
dout("%p will write page %p idx %lu\n",
inode, page, page->index);
- writeback_stat =
- atomic_long_inc_return(&fsc->writeback_count);
- if (writeback_stat > CONGESTION_ON_THRESH(
+ if (atomic_long_inc_return(&fsc->writeback_count) >
+ CONGESTION_ON_THRESH(
fsc->mount_options->congestion_kb)) {
set_bdi_congested(&fsc->backing_dev_info,
BLK_RW_ASYNC);
}
- set_page_writeback(page);
pages[locked_pages] = page;
locked_pages++;
- next = page->index + 1;
+ len += PAGE_CACHE_SIZE;
}
/* did we get anything? */
@@ -944,38 +949,119 @@ get_more_pages:
/* shift unused pages over in the pvec... we
* will need to release them below. */
for (j = i; j < pvec_pages; j++) {
- dout(" pvec leftover page %p\n",
- pvec.pages[j]);
+ dout(" pvec leftover page %p\n", pvec.pages[j]);
pvec.pages[j-i+first] = pvec.pages[j];
}
pvec.nr -= i-first;
}
- /* Format the osd request message and submit the write */
+new_request:
offset = page_offset(pages[0]);
- len = (u64)locked_pages << PAGE_CACHE_SHIFT;
- if (snap_size == -1) {
- len = min(len, (u64)i_size_read(inode) - offset);
- /* writepages_finish() clears writeback pages
- * according to the data length, so make sure
- * data length covers all locked pages */
- len = max(len, 1 +
- ((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT));
- } else {
- len = min(len, snap_size - offset);
+ len = wsize;
+
+ req = ceph_osdc_new_request(&fsc->client->osdc,
+ &ci->i_layout, vino,
+ offset, &len, 0, num_ops,
+ CEPH_OSD_OP_WRITE,
+ CEPH_OSD_FLAG_WRITE |
+ CEPH_OSD_FLAG_ONDISK,
+ snapc, truncate_seq,
+ truncate_size, false);
+ if (IS_ERR(req)) {
+ req = ceph_osdc_new_request(&fsc->client->osdc,
+ &ci->i_layout, vino,
+ offset, &len, 0,
+ min(num_ops,
+ CEPH_OSD_SLAB_OPS),
+ CEPH_OSD_OP_WRITE,
+ CEPH_OSD_FLAG_WRITE |
+ CEPH_OSD_FLAG_ONDISK,
+ snapc, truncate_seq,
+ truncate_size, true);
+ BUG_ON(IS_ERR(req));
}
- dout("writepages got %d pages at %llu~%llu\n",
- locked_pages, offset, len);
+ BUG_ON(len < page_offset(pages[locked_pages - 1]) +
+ PAGE_CACHE_SIZE - offset);
+
+ req->r_callback = writepages_finish;
+ req->r_inode = inode;
- osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
+ /* Format the osd request message and submit the write */
+ len = 0;
+ data_pages = pages;
+ op_idx = 0;
+ for (i = 0; i < locked_pages; i++) {
+ u64 cur_offset = page_offset(pages[i]);
+ if (offset + len != cur_offset) {
+ if (op_idx + do_sync + 1 == req->r_num_ops)
+ break;
+ osd_req_op_extent_dup_last(req, op_idx,
+ cur_offset - offset);
+ dout("writepages got pages at %llu~%llu\n",
+ offset, len);
+ osd_req_op_extent_osd_data_pages(req, op_idx,
+ data_pages, len, 0,
!!pool, false);
+ osd_req_op_extent_update(req, op_idx, len);
- pages = NULL; /* request message now owns the pages array */
- pool = NULL;
+ len = 0;
+ offset = cur_offset;
+ data_pages = pages + i;
+ op_idx++;
+ }
- /* Update the write op length in case we changed it */
+ set_page_writeback(pages[i]);
+ len += PAGE_CACHE_SIZE;
+ }
+
+ if (snap_size != -1) {
+ len = min(len, snap_size - offset);
+ } else if (i == locked_pages) {
+ /* writepages_finish() clears writeback pages
+ * according to the data length, so make sure
+ * data length covers all locked pages */
+ u64 min_len = len + 1 - PAGE_CACHE_SIZE;
+ len = min(len, (u64)i_size_read(inode) - offset);
+ len = max(len, min_len);
+ }
+ dout("writepages got pages at %llu~%llu\n", offset, len);
+
+ osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
+ 0, !!pool, false);
+ osd_req_op_extent_update(req, op_idx, len);
- osd_req_op_extent_update(req, 0, len);
+ if (do_sync) {
+ op_idx++;
+ osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
+ }
+ BUG_ON(op_idx + 1 != req->r_num_ops);
+
+ pool = NULL;
+ if (i < locked_pages) {
+ BUG_ON(num_ops <= req->r_num_ops);
+ num_ops -= req->r_num_ops;
+ num_ops += do_sync;
+ locked_pages -= i;
+
+ /* allocate new pages array for next request */
+ data_pages = pages;
+ pages = kmalloc(locked_pages * sizeof (*pages),
+ GFP_NOFS);
+ if (!pages) {
+ pool = fsc->wb_pagevec_pool;
+ pages = mempool_alloc(pool, GFP_NOFS);
+ BUG_ON(!pages);
+ }
+ memcpy(pages, data_pages + i,
+ locked_pages * sizeof(*pages));
+ memset(data_pages + i, 0,
+ locked_pages * sizeof(*pages));
+ } else {
+ BUG_ON(num_ops != req->r_num_ops);
+ index = pages[i - 1]->index + 1;
+ /* request message now owns the pages array */
+ pages = NULL;
+ }
vino = ceph_vino(inode);
ceph_osdc_build_request(req, offset, snapc, vino.snap,
@@ -985,9 +1071,10 @@ get_more_pages:
BUG_ON(rc);
req = NULL;
- /* continue? */
- index = next;
- wbc->nr_to_write -= locked_pages;
+ wbc->nr_to_write -= i;
+ if (pages)
+ goto new_request;
+
if (wbc->nr_to_write <= 0)
done = 1;
@@ -1522,7 +1609,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
ceph_vino(inode), 0, &len, 0, 1,
CEPH_OSD_OP_CREATE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
- ceph_empty_snapc, 0, 0, false);
+ NULL, 0, 0, false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
@@ -1540,9 +1627,8 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
ceph_vino(inode), 0, &len, 1, 3,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
- ceph_empty_snapc,
- ci->i_truncate_seq, ci->i_truncate_size,
- false);
+ NULL, ci->i_truncate_seq,
+ ci->i_truncate_size, false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
@@ -1663,8 +1749,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
goto out;
}
- rd_req = ceph_osdc_alloc_request(&fsc->client->osdc,
- ceph_empty_snapc,
+ rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1, false, GFP_NOFS);
if (!rd_req) {
err = -ENOMEM;
@@ -1678,8 +1763,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
"%llx.00000000", ci->i_vino.ino);
rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
- wr_req = ceph_osdc_alloc_request(&fsc->client->osdc,
- ceph_empty_snapc,
+ wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1, false, GFP_NOFS);
if (!wr_req) {
err = -ENOMEM;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 6fe0ad2..de17bb2 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -991,7 +991,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
u32 seq, u64 flush_tid, u64 oldest_flush_tid,
u32 issue_seq, u32 mseq, u64 size, u64 max_size,
struct timespec *mtime, struct timespec *atime,
- u64 time_warp_seq,
+ struct timespec *ctime, u64 time_warp_seq,
kuid_t uid, kgid_t gid, umode_t mode,
u64 xattr_version,
struct ceph_buffer *xattrs_buf,
@@ -1042,6 +1042,8 @@ static int send_cap_msg(struct ceph_mds_session *session,
ceph_encode_timespec(&fc->mtime, mtime);
if (atime)
ceph_encode_timespec(&fc->atime, atime);
+ if (ctime)
+ ceph_encode_timespec(&fc->ctime, ctime);
fc->time_warp_seq = cpu_to_le32(time_warp_seq);
fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid));
@@ -1116,7 +1118,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
int held, revoking, dropping, keep;
u64 seq, issue_seq, mseq, time_warp_seq, follows;
u64 size, max_size;
- struct timespec mtime, atime;
+ struct timespec mtime, atime, ctime;
int wake = 0;
umode_t mode;
kuid_t uid;
@@ -1180,6 +1182,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ci->i_requested_max_size = max_size;
mtime = inode->i_mtime;
atime = inode->i_atime;
+ ctime = inode->i_ctime;
time_warp_seq = ci->i_time_warp_seq;
uid = inode->i_uid;
gid = inode->i_gid;
@@ -1198,7 +1201,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq,
flush_tid, oldest_flush_tid, issue_seq, mseq,
- size, max_size, &mtime, &atime, time_warp_seq,
+ size, max_size, &mtime, &atime, &ctime, time_warp_seq,
uid, gid, mode, xattr_version, xattr_blob,
follows, inline_data);
if (ret < 0) {
@@ -1320,7 +1323,7 @@ retry:
capsnap->dirty, 0, capsnap->flush_tid, 0,
0, mseq, capsnap->size, 0,
&capsnap->mtime, &capsnap->atime,
- capsnap->time_warp_seq,
+ &capsnap->ctime, capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
capsnap->xattr_version, capsnap->xattr_blob,
capsnap->follows, capsnap->inline_data);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index fd11fb2..fadc243 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -38,7 +38,7 @@ int ceph_init_dentry(struct dentry *dentry)
if (dentry->d_fsdata)
return 0;
- di = kmem_cache_alloc(ceph_dentry_cachep, GFP_KERNEL | __GFP_ZERO);
+ di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
if (!di)
return -ENOMEM; /* oh well */
@@ -68,23 +68,6 @@ out_unlock:
return 0;
}
-struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
-{
- struct inode *inode = NULL;
-
- if (!dentry)
- return NULL;
-
- spin_lock(&dentry->d_lock);
- if (!IS_ROOT(dentry)) {
- inode = d_inode(dentry->d_parent);
- ihold(inode);
- }
- spin_unlock(&dentry->d_lock);
- return inode;
-}
-
-
/*
* for readdir, we encode the directory frag and offset within that
* frag into f_pos.
@@ -624,6 +607,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
int op;
+ int mask;
int err;
dout("lookup %p dentry %p '%pd'\n",
@@ -666,8 +650,12 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
return ERR_CAST(req);
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
- /* we only need inode linkage */
- req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
+
+ mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+ if (ceph_security_xattr_wanted(dir))
+ mask |= CEPH_CAP_XATTR_SHARED;
+ req->r_args.getattr.mask = cpu_to_le32(mask);
+
req->r_locked_dir = dir;
err = ceph_mdsc_do_request(mdsc, NULL, req);
err = ceph_handle_snapdir(req, dentry, err);
@@ -1095,6 +1083,7 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
{
int valid = 0;
+ struct dentry *parent;
struct inode *dir;
if (flags & LOOKUP_RCU)
@@ -1103,7 +1092,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
- dir = ceph_get_dentry_parent_inode(dentry);
+ parent = dget_parent(dentry);
+ dir = d_inode(parent);
/* always trust cached snapped dentries, snapdir dentry */
if (ceph_snap(dir) != CEPH_NOSNAP) {
@@ -1121,13 +1111,48 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
valid = 1;
}
+ if (!valid) {
+ struct ceph_mds_client *mdsc =
+ ceph_sb_to_client(dir->i_sb)->mdsc;
+ struct ceph_mds_request *req;
+ int op, mask, err;
+
+ op = ceph_snap(dir) == CEPH_SNAPDIR ?
+ CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
+ req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
+ if (!IS_ERR(req)) {
+ req->r_dentry = dget(dentry);
+ req->r_num_caps = 2;
+
+ mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+ if (ceph_security_xattr_wanted(dir))
+ mask |= CEPH_CAP_XATTR_SHARED;
+ req->r_args.getattr.mask = mask;
+
+ req->r_locked_dir = dir;
+ err = ceph_mdsc_do_request(mdsc, NULL, req);
+ if (err == 0 || err == -ENOENT) {
+ if (dentry == req->r_dentry) {
+ valid = !d_unhashed(dentry);
+ } else {
+ d_invalidate(req->r_dentry);
+ err = -EAGAIN;
+ }
+ }
+ ceph_mdsc_put_request(req);
+ dout("d_revalidate %p lookup result=%d\n",
+ dentry, err);
+ }
+ }
+
dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
if (valid) {
ceph_dentry_lru_touch(dentry);
} else {
ceph_dir_clear_complete(dir);
}
- iput(dir);
+
+ dput(parent);
return valid;
}
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 3b31723..6e72c98 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -71,12 +71,18 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
inode = ceph_find_inode(sb, vino);
if (!inode) {
struct ceph_mds_request *req;
+ int mask;
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
USE_ANY_MDS);
if (IS_ERR(req))
return ERR_CAST(req);
+ mask = CEPH_STAT_CAP_INODE;
+ if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+ mask |= CEPH_CAP_XATTR_SHARED;
+ req->r_args.getattr.mask = cpu_to_le32(mask);
+
req->r_ino1 = vino;
req->r_num_caps = 1;
err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -128,6 +134,7 @@ static struct dentry *__get_parent(struct super_block *sb,
struct ceph_mds_request *req;
struct inode *inode;
struct dentry *dentry;
+ int mask;
int err;
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT,
@@ -144,6 +151,12 @@ static struct dentry *__get_parent(struct super_block *sb,
.snap = CEPH_NOSNAP,
};
}
+
+ mask = CEPH_STAT_CAP_INODE;
+ if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+ mask |= CEPH_CAP_XATTR_SHARED;
+ req->r_args.getattr.mask = cpu_to_le32(mask);
+
req->r_num_caps = 1;
err = ceph_mdsc_do_request(mdsc, NULL, req);
inode = req->r_target_inode;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index eb9028e..ef38f01 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -157,7 +157,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
case S_IFDIR:
dout("init_file %p %p 0%o (regular)\n", inode, file,
inode->i_mode);
- cf = kmem_cache_alloc(ceph_file_cachep, GFP_KERNEL | __GFP_ZERO);
+ cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
if (cf == NULL) {
ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
return -ENOMEM;
@@ -300,6 +300,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct ceph_mds_request *req;
struct dentry *dn;
struct ceph_acls_info acls = {};
+ int mask;
int err;
dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
@@ -335,6 +336,12 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
acls.pagelist = NULL;
}
}
+
+ mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+ if (ceph_security_xattr_wanted(dir))
+ mask |= CEPH_CAP_XATTR_SHARED;
+ req->r_args.open.mask = cpu_to_le32(mask);
+
req->r_locked_dir = dir; /* caller holds dir->i_mutex */
err = ceph_mdsc_do_request(mdsc,
(flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
@@ -725,7 +732,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
ret = ceph_osdc_start_request(req->r_osdc, req, false);
out:
if (ret < 0) {
- BUG_ON(ret == -EOLDSNAPC);
req->r_result = ret;
ceph_aio_complete_req(req, NULL);
}
@@ -783,7 +789,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
int num_pages = 0;
int flags;
int ret;
- struct timespec mtime = CURRENT_TIME;
+ struct timespec mtime = current_fs_time(inode->i_sb);
size_t count = iov_iter_count(iter);
loff_t pos = iocb->ki_pos;
bool write = iov_iter_rw(iter) == WRITE;
@@ -949,7 +955,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
ret = ceph_osdc_start_request(req->r_osdc,
req, false);
if (ret < 0) {
- BUG_ON(ret == -EOLDSNAPC);
req->r_result = ret;
ceph_aio_complete_req(req, NULL);
}
@@ -988,7 +993,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
int flags;
int check_caps = 0;
int ret;
- struct timespec mtime = CURRENT_TIME;
+ struct timespec mtime = current_fs_time(inode->i_sb);
size_t count = iov_iter_count(from);
if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e48fd8b..ed58b16 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -549,6 +549,10 @@ int ceph_fill_file_size(struct inode *inode, int issued,
if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
(truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
dout("size %lld -> %llu\n", inode->i_size, size);
+ if (size > 0 && S_ISDIR(inode->i_mode)) {
+ pr_err("fill_file_size non-zero size for directory\n");
+ size = 0;
+ }
i_size_write(inode, size);
inode->i_blocks = (size + (1<<9) - 1) >> 9;
ci->i_reported_size = size;
@@ -1261,6 +1265,7 @@ retry_lookup:
dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
dn, d_inode(dn), ceph_vinop(d_inode(dn)),
ceph_vinop(in));
+ d_invalidate(dn);
have_lease = false;
}
@@ -1349,15 +1354,20 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
if (!ctl->page || pgoff != page_index(ctl->page)) {
ceph_readdir_cache_release(ctl);
- ctl->page = grab_cache_page(&dir->i_data, pgoff);
+ if (idx == 0)
+ ctl->page = grab_cache_page(&dir->i_data, pgoff);
+ else
+ ctl->page = find_lock_page(&dir->i_data, pgoff);
if (!ctl->page) {
ctl->index = -1;
- return -ENOMEM;
+ return idx == 0 ? -ENOMEM : 0;
}
/* reading/filling the cache are serialized by
* i_mutex, no need to use page lock */
unlock_page(ctl->page);
ctl->dentries = kmap(ctl->page);
+ if (idx == 0)
+ memset(ctl->dentries, 0, PAGE_CACHE_SIZE);
}
if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
@@ -1380,7 +1390,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
struct qstr dname;
struct dentry *dn;
struct inode *in;
- int err = 0, ret, i;
+ int err = 0, skipped = 0, ret, i;
struct inode *snapdir = NULL;
struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
struct ceph_dentry_info *di;
@@ -1492,7 +1502,17 @@ retry_lookup:
}
if (d_really_is_negative(dn)) {
- struct dentry *realdn = splice_dentry(dn, in);
+ struct dentry *realdn;
+
+ if (ceph_security_xattr_deadlock(in)) {
+ dout(" skip splicing dn %p to inode %p"
+ " (security xattr deadlock)\n", dn, in);
+ iput(in);
+ skipped++;
+ goto next_item;
+ }
+
+ realdn = splice_dentry(dn, in);
if (IS_ERR(realdn)) {
err = PTR_ERR(realdn);
d_drop(dn);
@@ -1509,7 +1529,7 @@ retry_lookup:
req->r_session,
req->r_request_started);
- if (err == 0 && cache_ctl.index >= 0) {
+ if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
ret = fill_readdir_cache(d_inode(parent), dn,
&cache_ctl, req);
if (ret < 0)
@@ -1520,7 +1540,7 @@ next_item:
dput(dn);
}
out:
- if (err == 0) {
+ if (err == 0 && skipped == 0) {
req->r_did_prepopulate = true;
req->r_readdir_cache_idx = cache_ctl.index;
}
@@ -1950,7 +1970,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
if (dirtied) {
inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
&prealloc_cf);
- inode->i_ctime = CURRENT_TIME;
+ inode->i_ctime = current_fs_time(inode->i_sb);
}
release &= issued;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 911d64d..44852c3 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1729,7 +1729,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
- req->r_stamp = CURRENT_TIME;
+ req->r_stamp = current_fs_time(mdsc->fsc->sb);
req->r_op = op;
req->r_direct_mode = mode;
@@ -2540,6 +2540,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* insert trace into our cache */
mutex_lock(&req->r_fill_mutex);
+ current->journal_info = req;
err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
if (err == 0) {
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
@@ -2547,6 +2548,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
ceph_readdir_prepopulate(req, req->r_session);
ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
+ current->journal_info = NULL;
mutex_unlock(&req->r_fill_mutex);
up_read(&mdsc->snap_rwsem);
@@ -3764,7 +3766,6 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
/* do we need it? */
- ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
mutex_lock(&mdsc->mutex);
if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
dout("handle_map epoch %u <= our %u\n",
@@ -3791,6 +3792,8 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
__wake_requests(mdsc, &mdsc->waiting_for_map);
+ ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
+ mdsc->mdsmap->m_epoch);
mutex_unlock(&mdsc->mutex);
schedule_delayed(mdsc);
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 4aa7122..9caaa7f 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -296,8 +296,6 @@ static int cmpu64_rev(const void *a, const void *b)
}
-struct ceph_snap_context *ceph_empty_snapc;
-
/*
* build the snap context for a given realm.
*/
@@ -987,17 +985,3 @@ out:
up_write(&mdsc->snap_rwsem);
return;
}
-
-int __init ceph_snap_init(void)
-{
- ceph_empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
- if (!ceph_empty_snapc)
- return -ENOMEM;
- ceph_empty_snapc->seq = 1;
- return 0;
-}
-
-void ceph_snap_exit(void)
-{
- ceph_put_snap_context(ceph_empty_snapc);
-}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index ca4d5e8..c973043 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -439,8 +439,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT)
seq_puts(m, ",dirstat");
- if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES) == 0)
- seq_puts(m, ",norbytes");
+ if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES))
+ seq_puts(m, ",rbytes");
if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
seq_puts(m, ",noasyncreaddir");
if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
@@ -530,7 +530,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
goto fail;
}
fsc->client->extra_mon_dispatch = extra_mon_dispatch;
- fsc->client->monc.want_mdsmap = 1;
+ ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
fsc->mount_options = fsopt;
@@ -793,22 +793,20 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
struct dentry *root;
int first = 0; /* first vfsmount for this super_block */
- dout("mount start\n");
+ dout("mount start %p\n", fsc);
mutex_lock(&fsc->client->mount_mutex);
- err = __ceph_open_session(fsc->client, started);
- if (err < 0)
- goto out;
+ if (!fsc->sb->s_root) {
+ err = __ceph_open_session(fsc->client, started);
+ if (err < 0)
+ goto out;
- dout("mount opening root\n");
- root = open_root_dentry(fsc, "", started);
- if (IS_ERR(root)) {
- err = PTR_ERR(root);
- goto out;
- }
- if (fsc->sb->s_root) {
- dput(root);
- } else {
+ dout("mount opening root\n");
+ root = open_root_dentry(fsc, "", started);
+ if (IS_ERR(root)) {
+ err = PTR_ERR(root);
+ goto out;
+ }
fsc->sb->s_root = root;
first = 1;
@@ -818,6 +816,7 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
}
if (path[0] == 0) {
+ root = fsc->sb->s_root;
dget(root);
} else {
dout("mount opening base mountpoint\n");
@@ -833,16 +832,14 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
mutex_unlock(&fsc->client->mount_mutex);
return root;
-out:
- mutex_unlock(&fsc->client->mount_mutex);
- return ERR_PTR(err);
-
fail:
if (first) {
dput(fsc->sb->s_root);
fsc->sb->s_root = NULL;
}
- goto out;
+out:
+ mutex_unlock(&fsc->client->mount_mutex);
+ return ERR_PTR(err);
}
static int ceph_set_super(struct super_block *s, void *data)
@@ -1042,19 +1039,14 @@ static int __init init_ceph(void)
ceph_flock_init();
ceph_xattr_init();
- ret = ceph_snap_init();
- if (ret)
- goto out_xattr;
ret = register_filesystem(&ceph_fs_type);
if (ret)
- goto out_snap;
+ goto out_xattr;
pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
return 0;
-out_snap:
- ceph_snap_exit();
out_xattr:
ceph_xattr_exit();
destroy_caches();
@@ -1066,7 +1058,6 @@ static void __exit exit_ceph(void)
{
dout("exit_ceph\n");
unregister_filesystem(&ceph_fs_type);
- ceph_snap_exit();
ceph_xattr_exit();
destroy_caches();
}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 9c458eb..e705c4d6 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -37,8 +37,7 @@
#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */
#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */
-#define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES | \
- CEPH_MOUNT_OPT_DCACHE)
+#define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE
#define ceph_set_mount_opt(fsc, opt) \
(fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt;
@@ -469,7 +468,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
#define CEPH_I_POOL_PERM (1 << 4) /* pool rd/wr bits are valid */
#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */
#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */
-
+#define CEPH_I_SEC_INITED (1 << 7) /* security initialized */
static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
long long release_count,
@@ -721,7 +720,6 @@ static inline int default_congestion_kb(void)
/* snap.c */
-extern struct ceph_snap_context *ceph_empty_snapc;
struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
u64 ino);
extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
@@ -738,8 +736,6 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap);
extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
-extern int ceph_snap_init(void);
-extern void ceph_snap_exit(void);
/*
* a cap_snap is "pending" if it is still awaiting an in-progress
@@ -808,6 +804,20 @@ extern void __init ceph_xattr_init(void);
extern void ceph_xattr_exit(void);
extern const struct xattr_handler *ceph_xattr_handlers[];
+#ifdef CONFIG_SECURITY
+extern bool ceph_security_xattr_deadlock(struct inode *in);
+extern bool ceph_security_xattr_wanted(struct inode *in);
+#else
+static inline bool ceph_security_xattr_deadlock(struct inode *in)
+{
+ return false;
+}
+static inline bool ceph_security_xattr_wanted(struct inode *in)
+{
+ return false;
+}
+#endif
+
/* acl.c */
struct ceph_acls_info {
void *default_acl;
@@ -947,7 +957,6 @@ extern void ceph_dentry_lru_touch(struct dentry *dn);
extern void ceph_dentry_lru_del(struct dentry *dn);
extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
-extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
/*
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 819163d..9410abd 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -714,31 +714,62 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
}
}
+static inline int __get_request_mask(struct inode *in) {
+ struct ceph_mds_request *req = current->journal_info;
+ int mask = 0;
+ if (req && req->r_target_inode == in) {
+ if (req->r_op == CEPH_MDS_OP_LOOKUP ||
+ req->r_op == CEPH_MDS_OP_LOOKUPINO ||
+ req->r_op == CEPH_MDS_OP_LOOKUPPARENT ||
+ req->r_op == CEPH_MDS_OP_GETATTR) {
+ mask = le32_to_cpu(req->r_args.getattr.mask);
+ } else if (req->r_op == CEPH_MDS_OP_OPEN ||
+ req->r_op == CEPH_MDS_OP_CREATE) {
+ mask = le32_to_cpu(req->r_args.open.mask);
+ }
+ }
+ return mask;
+}
+
ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
size_t size)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- int err;
struct ceph_inode_xattr *xattr;
struct ceph_vxattr *vxattr = NULL;
+ int req_mask;
+ int err;
if (!ceph_is_valid_xattr(name))
return -ENODATA;
/* let's see if a virtual xattr was requested */
vxattr = ceph_match_vxattr(inode, name);
- if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
- err = vxattr->getxattr_cb(ci, value, size);
+ if (vxattr) {
+ err = -ENODATA;
+ if (!(vxattr->exists_cb && !vxattr->exists_cb(ci)))
+ err = vxattr->getxattr_cb(ci, value, size);
return err;
}
+ req_mask = __get_request_mask(inode);
+
spin_lock(&ci->i_ceph_lock);
dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
ci->i_xattrs.version, ci->i_xattrs.index_version);
if (ci->i_xattrs.version == 0 ||
- !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
+ !((req_mask & CEPH_CAP_XATTR_SHARED) ||
+ __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) {
spin_unlock(&ci->i_ceph_lock);
+
+ /* security module gets xattr while filling trace */
+ if (current->journal_info != NULL) {
+ pr_warn_ratelimited("sync getxattr %p "
+ "during filling trace\n", inode);
+ return -EBUSY;
+ }
+
/* get xattrs from mds (if we don't already have them) */
err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
if (err)
@@ -765,6 +796,9 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
memcpy(value, xattr->val, xattr->val_len);
+ if (current->journal_info != NULL &&
+ !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN))
+ ci->i_ceph_flags |= CEPH_I_SEC_INITED;
out:
spin_unlock(&ci->i_ceph_lock);
return err;
@@ -999,7 +1033,7 @@ retry:
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
&prealloc_cf);
ci->i_xattrs.dirty = true;
- inode->i_ctime = CURRENT_TIME;
+ inode->i_ctime = current_fs_time(inode->i_sb);
}
spin_unlock(&ci->i_ceph_lock);
@@ -1015,7 +1049,15 @@ do_sync:
do_sync_unlocked:
if (lock_snap_rwsem)
up_read(&mdsc->snap_rwsem);
- err = ceph_sync_setxattr(dentry, name, value, size, flags);
+
+ /* security module set xattr while filling trace */
+ if (current->journal_info != NULL) {
+ pr_warn_ratelimited("sync setxattr %p "
+ "during filling trace\n", inode);
+ err = -EBUSY;
+ } else {
+ err = ceph_sync_setxattr(dentry, name, value, size, flags);
+ }
out:
ceph_free_cap_flush(prealloc_cf);
kfree(newname);
@@ -1136,7 +1178,7 @@ retry:
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
&prealloc_cf);
ci->i_xattrs.dirty = true;
- inode->i_ctime = CURRENT_TIME;
+ inode->i_ctime = current_fs_time(inode->i_sb);
spin_unlock(&ci->i_ceph_lock);
if (lock_snap_rwsem)
up_read(&mdsc->snap_rwsem);
@@ -1164,3 +1206,25 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
return __ceph_removexattr(dentry, name);
}
+
+#ifdef CONFIG_SECURITY
+bool ceph_security_xattr_wanted(struct inode *in)
+{
+ return in->i_security != NULL;
+}
+
+bool ceph_security_xattr_deadlock(struct inode *in)
+{
+ struct ceph_inode_info *ci;
+ bool ret;
+ if (in->i_security == NULL)
+ return false;
+ ci = ceph_inode(in);
+ spin_lock(&ci->i_ceph_lock);
+ ret = !(ci->i_ceph_flags & CEPH_I_SEC_INITED) &&
+ !(ci->i_xattrs.version > 0 &&
+ __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0));
+ spin_unlock(&ci->i_ceph_lock);
+ return ret;
+}
+#endif
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index 15151f3..ae2f668 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -105,6 +105,7 @@ static inline u64 ceph_sanitize_features(u64 features)
*/
#define CEPH_FEATURES_SUPPORTED_DEFAULT \
(CEPH_FEATURE_NOSRCADDR | \
+ CEPH_FEATURE_SUBSCRIBE2 | \
CEPH_FEATURE_RECONNECT_SEQ | \
CEPH_FEATURE_PGID64 | \
CEPH_FEATURE_PGPOOL3 | \
@@ -127,6 +128,7 @@ static inline u64 ceph_sanitize_features(u64 features)
#define CEPH_FEATURES_REQUIRED_DEFAULT \
(CEPH_FEATURE_NOSRCADDR | \
+ CEPH_FEATURE_SUBSCRIBE2 | \
CEPH_FEATURE_RECONNECT_SEQ | \
CEPH_FEATURE_PGID64 | \
CEPH_FEATURE_PGPOOL3 | \
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index d7d072a..37f28bf 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -198,8 +198,8 @@ struct ceph_client_mount {
#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */
struct ceph_mon_subscribe_item {
- __le64 have_version; __le64 have;
- __u8 onetime;
+ __le64 start;
+ __u8 flags;
} __attribute__ ((packed));
struct ceph_mon_subscribe_ack {
@@ -376,7 +376,8 @@ union ceph_mds_request_args {
__le32 stripe_count; /* ... */
__le32 object_size;
__le32 file_replication;
- __le32 unused; /* used to be preferred osd */
+ __le32 mask; /* CEPH_CAP_* */
+ __le32 old_size;
} __attribute__ ((packed)) open;
struct {
__le32 flags;
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 3e3799c..e7975e4 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -47,7 +47,6 @@ struct ceph_options {
unsigned long mount_timeout; /* jiffies */
unsigned long osd_idle_ttl; /* jiffies */
unsigned long osd_keepalive_timeout; /* jiffies */
- unsigned long monc_ping_timeout; /* jiffies */
/*
* any type that can't be simply compared or doesn't need need
@@ -68,7 +67,12 @@ struct ceph_options {
#define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000)
#define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000)
#define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000)
-#define CEPH_MONC_PING_TIMEOUT_DEFAULT msecs_to_jiffies(30 * 1000)
+
+#define CEPH_MONC_HUNT_INTERVAL msecs_to_jiffies(3 * 1000)
+#define CEPH_MONC_PING_INTERVAL msecs_to_jiffies(10 * 1000)
+#define CEPH_MONC_PING_TIMEOUT msecs_to_jiffies(30 * 1000)
+#define CEPH_MONC_HUNT_BACKOFF 2
+#define CEPH_MONC_HUNT_MAX_MULT 10
#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024)
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 81810dc..e230e7e 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -68,18 +68,24 @@ struct ceph_mon_client {
bool hunting;
int cur_mon; /* last monitor i contacted */
- unsigned long sub_sent, sub_renew_after;
+ unsigned long sub_renew_after;
+ unsigned long sub_renew_sent;
struct ceph_connection con;
+ bool had_a_connection;
+ int hunt_mult; /* [1..CEPH_MONC_HUNT_MAX_MULT] */
+
/* pending generic requests */
struct rb_root generic_request_tree;
int num_generic_requests;
u64 last_tid;
- /* mds/osd map */
- int want_mdsmap;
- int want_next_osdmap; /* 1 = want, 2 = want+asked */
- u32 have_osdmap, have_mdsmap;
+ /* subs, indexed with CEPH_SUB_* */
+ struct {
+ struct ceph_mon_subscribe_item item;
+ bool want;
+ u32 have; /* epoch */
+ } subs[3];
#ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_file;
@@ -93,14 +99,23 @@ extern int ceph_monmap_contains(struct ceph_monmap *m,
extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl);
extern void ceph_monc_stop(struct ceph_mon_client *monc);
+enum {
+ CEPH_SUB_MDSMAP = 0,
+ CEPH_SUB_MONMAP,
+ CEPH_SUB_OSDMAP,
+};
+
+extern const char *ceph_sub_str[];
+
/*
* The model here is to indicate that we need a new map of at least
- * epoch @want, and also call in when we receive a map. We will
+ * epoch @epoch, and also call in when we receive a map. We will
* periodically rerequest the map from the monitor cluster until we
* get what we want.
*/
-extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have);
-extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
+bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
+ bool continuous);
+void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch);
extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 7506b48..4343df8 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -43,7 +43,8 @@ struct ceph_osd {
};
-#define CEPH_OSD_MAX_OP 3
+#define CEPH_OSD_SLAB_OPS 2
+#define CEPH_OSD_MAX_OPS 16
enum ceph_osd_data_type {
CEPH_OSD_DATA_TYPE_NONE = 0,
@@ -77,7 +78,10 @@ struct ceph_osd_data {
struct ceph_osd_req_op {
u16 op; /* CEPH_OSD_OP_* */
u32 flags; /* CEPH_OSD_OP_FLAG_* */
- u32 payload_len;
+ u32 indata_len; /* request */
+ u32 outdata_len; /* reply */
+ s32 rval;
+
union {
struct ceph_osd_data raw_data_in;
struct {
@@ -136,7 +140,6 @@ struct ceph_osd_request {
/* request osd ops array */
unsigned int r_num_ops;
- struct ceph_osd_req_op r_ops[CEPH_OSD_MAX_OP];
/* these are updated on each send */
__le32 *r_request_osdmap_epoch;
@@ -148,8 +151,6 @@ struct ceph_osd_request {
struct ceph_eversion *r_request_reassert_version;
int r_result;
- int r_reply_op_len[CEPH_OSD_MAX_OP];
- s32 r_reply_op_result[CEPH_OSD_MAX_OP];
int r_got_reply;
int r_linger;
@@ -174,6 +175,8 @@ struct ceph_osd_request {
unsigned long r_stamp; /* send OR check time */
struct ceph_snap_context *r_snapc; /* snap context for writes */
+
+ struct ceph_osd_req_op r_ops[];
};
struct ceph_request_redirect {
@@ -263,6 +266,8 @@ extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
u64 truncate_size, u32 truncate_seq);
extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
unsigned int which, u64 length);
+extern void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
+ unsigned int which, u64 offset_inc);
extern struct ceph_osd_data *osd_req_op_extent_osd_data(
struct ceph_osd_request *osd_req,
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index bcbec33..dcc18c6 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -361,7 +361,6 @@ ceph_parse_options(char *options, const char *dev_name,
opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
- opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT;
/* get mon ip(s) */
/* ip1[:port1][,ip2[:port2]...] */
@@ -686,6 +685,9 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started)
return client->auth_err;
}
+ pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid);
+ ceph_debugfs_client_init(client);
+
return 0;
}
EXPORT_SYMBOL(__ceph_open_session);
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 593dc2e..b902fbc 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -112,15 +112,20 @@ static int monc_show(struct seq_file *s, void *p)
struct ceph_mon_generic_request *req;
struct ceph_mon_client *monc = &client->monc;
struct rb_node *rp;
+ int i;
mutex_lock(&monc->mutex);
- if (monc->have_mdsmap)
- seq_printf(s, "have mdsmap %u\n", (unsigned int)monc->have_mdsmap);
- if (monc->have_osdmap)
- seq_printf(s, "have osdmap %u\n", (unsigned int)monc->have_osdmap);
- if (monc->want_next_osdmap)
- seq_printf(s, "want next osdmap\n");
+ for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+ seq_printf(s, "have %s %u", ceph_sub_str[i],
+ monc->subs[i].have);
+ if (monc->subs[i].want)
+ seq_printf(s, " want %llu%s",
+ le64_to_cpu(monc->subs[i].item.start),
+ (monc->subs[i].item.flags &
+ CEPH_SUBSCRIBE_ONETIME ? "" : "+"));
+ seq_putc(s, '\n');
+ }
for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) {
__u16 op;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9382619..1831f63 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -235,18 +235,12 @@ static struct workqueue_struct *ceph_msgr_wq;
static int ceph_msgr_slab_init(void)
{
BUG_ON(ceph_msg_cache);
- ceph_msg_cache = kmem_cache_create("ceph_msg",
- sizeof (struct ceph_msg),
- __alignof__(struct ceph_msg), 0, NULL);
-
+ ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
if (!ceph_msg_cache)
return -ENOMEM;
BUG_ON(ceph_msg_data_cache);
- ceph_msg_data_cache = kmem_cache_create("ceph_msg_data",
- sizeof (struct ceph_msg_data),
- __alignof__(struct ceph_msg_data),
- 0, NULL);
+ ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
if (ceph_msg_data_cache)
return 0;
@@ -1221,25 +1215,19 @@ static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
static void prepare_write_message_footer(struct ceph_connection *con)
{
struct ceph_msg *m = con->out_msg;
- int v = con->out_kvec_left;
m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
dout("prepare_write_message_footer %p\n", con);
- con->out_kvec[v].iov_base = &m->footer;
+ con_out_kvec_add(con, sizeof_footer(con), &m->footer);
if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
if (con->ops->sign_message)
con->ops->sign_message(m);
else
m->footer.sig = 0;
- con->out_kvec[v].iov_len = sizeof(m->footer);
- con->out_kvec_bytes += sizeof(m->footer);
} else {
m->old_footer.flags = m->footer.flags;
- con->out_kvec[v].iov_len = sizeof(m->old_footer);
- con->out_kvec_bytes += sizeof(m->old_footer);
}
- con->out_kvec_left++;
con->out_more = m->more_to_follow;
con->out_msg_done = true;
}
@@ -2409,11 +2397,7 @@ static int read_partial_message(struct ceph_connection *con)
}
/* footer */
- if (need_sign)
- size = sizeof(m->footer);
- else
- size = sizeof(m->old_footer);
-
+ size = sizeof_footer(con);
end += size;
ret = read_partial(con, end, size, &m->footer);
if (ret <= 0)
@@ -3089,10 +3073,7 @@ void ceph_msg_revoke(struct ceph_msg *msg)
con->out_skip += con_out_kvec_skip(con);
} else {
BUG_ON(!msg->data_length);
- if (con->peer_features & CEPH_FEATURE_MSG_AUTH)
- con->out_skip += sizeof(msg->footer);
- else
- con->out_skip += sizeof(msg->old_footer);
+ con->out_skip += sizeof_footer(con);
}
/* data, middle, front */
if (msg->data_length)
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index de85ddd..cf638c0 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -122,51 +122,91 @@ static void __close_session(struct ceph_mon_client *monc)
ceph_msg_revoke(monc->m_subscribe);
ceph_msg_revoke_incoming(monc->m_subscribe_ack);
ceph_con_close(&monc->con);
- monc->cur_mon = -1;
+
monc->pending_auth = 0;
ceph_auth_reset(monc->auth);
}
/*
- * Open a session with a (new) monitor.
+ * Pick a new monitor at random and set cur_mon. If we are repicking
+ * (i.e. cur_mon is already set), be sure to pick a different one.
*/
-static int __open_session(struct ceph_mon_client *monc)
+static void pick_new_mon(struct ceph_mon_client *monc)
{
- char r;
- int ret;
+ int old_mon = monc->cur_mon;
- if (monc->cur_mon < 0) {
- get_random_bytes(&r, 1);
- monc->cur_mon = r % monc->monmap->num_mon;
- dout("open_session num=%d r=%d -> mon%d\n",
- monc->monmap->num_mon, r, monc->cur_mon);
- monc->sub_sent = 0;
- monc->sub_renew_after = jiffies; /* i.e., expired */
- monc->want_next_osdmap = !!monc->want_next_osdmap;
-
- dout("open_session mon%d opening\n", monc->cur_mon);
- ceph_con_open(&monc->con,
- CEPH_ENTITY_TYPE_MON, monc->cur_mon,
- &monc->monmap->mon_inst[monc->cur_mon].addr);
-
- /* send an initial keepalive to ensure our timestamp is
- * valid by the time we are in an OPENED state */
- ceph_con_keepalive(&monc->con);
-
- /* initiatiate authentication handshake */
- ret = ceph_auth_build_hello(monc->auth,
- monc->m_auth->front.iov_base,
- monc->m_auth->front_alloc_len);
- __send_prepared_auth_request(monc, ret);
+ BUG_ON(monc->monmap->num_mon < 1);
+
+ if (monc->monmap->num_mon == 1) {
+ monc->cur_mon = 0;
} else {
- dout("open_session mon%d already open\n", monc->cur_mon);
+ int max = monc->monmap->num_mon;
+ int o = -1;
+ int n;
+
+ if (monc->cur_mon >= 0) {
+ if (monc->cur_mon < monc->monmap->num_mon)
+ o = monc->cur_mon;
+ if (o >= 0)
+ max--;
+ }
+
+ n = prandom_u32() % max;
+ if (o >= 0 && n >= o)
+ n++;
+
+ monc->cur_mon = n;
}
- return 0;
+
+ dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
+ monc->cur_mon, monc->monmap->num_mon);
+}
+
+/*
+ * Open a session with a new monitor.
+ */
+static void __open_session(struct ceph_mon_client *monc)
+{
+ int ret;
+
+ pick_new_mon(monc);
+
+ monc->hunting = true;
+ if (monc->had_a_connection) {
+ monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
+ if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
+ monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
+ }
+
+ monc->sub_renew_after = jiffies; /* i.e., expired */
+ monc->sub_renew_sent = 0;
+
+ dout("%s opening mon%d\n", __func__, monc->cur_mon);
+ ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
+ &monc->monmap->mon_inst[monc->cur_mon].addr);
+
+ /*
+ * send an initial keepalive to ensure our timestamp is valid
+ * by the time we are in an OPENED state
+ */
+ ceph_con_keepalive(&monc->con);
+
+ /* initiate authentication handshake */
+ ret = ceph_auth_build_hello(monc->auth,
+ monc->m_auth->front.iov_base,
+ monc->m_auth->front_alloc_len);
+ BUG_ON(ret <= 0);
+ __send_prepared_auth_request(monc, ret);
}
-static bool __sub_expired(struct ceph_mon_client *monc)
+static void reopen_session(struct ceph_mon_client *monc)
{
- return time_after_eq(jiffies, monc->sub_renew_after);
+ if (!monc->hunting)
+ pr_info("mon%d %s session lost, hunting for new mon\n",
+ monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
+
+ __close_session(monc);
+ __open_session(monc);
}
/*
@@ -174,74 +214,70 @@ static bool __sub_expired(struct ceph_mon_client *monc)
*/
static void __schedule_delayed(struct ceph_mon_client *monc)
{
- struct ceph_options *opt = monc->client->options;
unsigned long delay;
- if (monc->cur_mon < 0 || __sub_expired(monc)) {
- delay = 10 * HZ;
- } else {
- delay = 20 * HZ;
- if (opt->monc_ping_timeout > 0)
- delay = min(delay, opt->monc_ping_timeout / 3);
- }
+ if (monc->hunting)
+ delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
+ else
+ delay = CEPH_MONC_PING_INTERVAL;
+
dout("__schedule_delayed after %lu\n", delay);
- schedule_delayed_work(&monc->delayed_work,
- round_jiffies_relative(delay));
+ mod_delayed_work(system_wq, &monc->delayed_work,
+ round_jiffies_relative(delay));
}
+const char *ceph_sub_str[] = {
+ [CEPH_SUB_MDSMAP] = "mdsmap",
+ [CEPH_SUB_MONMAP] = "monmap",
+ [CEPH_SUB_OSDMAP] = "osdmap",
+};
+
/*
- * Send subscribe request for mdsmap and/or osdmap.
+ * Send subscribe request for one or more maps, according to
+ * monc->subs.
*/
static void __send_subscribe(struct ceph_mon_client *monc)
{
- dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
- (unsigned int)monc->sub_sent, __sub_expired(monc),
- monc->want_next_osdmap);
- if ((__sub_expired(monc) && !monc->sub_sent) ||
- monc->want_next_osdmap == 1) {
- struct ceph_msg *msg = monc->m_subscribe;
- struct ceph_mon_subscribe_item *i;
- void *p, *end;
- int num;
-
- p = msg->front.iov_base;
- end = p + msg->front_alloc_len;
-
- num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
- ceph_encode_32(&p, num);
-
- if (monc->want_next_osdmap) {
- dout("__send_subscribe to 'osdmap' %u\n",
- (unsigned int)monc->have_osdmap);
- ceph_encode_string(&p, end, "osdmap", 6);
- i = p;
- i->have = cpu_to_le64(monc->have_osdmap);
- i->onetime = 1;
- p += sizeof(*i);
- monc->want_next_osdmap = 2; /* requested */
- }
- if (monc->want_mdsmap) {
- dout("__send_subscribe to 'mdsmap' %u+\n",
- (unsigned int)monc->have_mdsmap);
- ceph_encode_string(&p, end, "mdsmap", 6);
- i = p;
- i->have = cpu_to_le64(monc->have_mdsmap);
- i->onetime = 0;
- p += sizeof(*i);
- }
- ceph_encode_string(&p, end, "monmap", 6);
- i = p;
- i->have = 0;
- i->onetime = 0;
- p += sizeof(*i);
-
- msg->front.iov_len = p - msg->front.iov_base;
- msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
- ceph_msg_revoke(msg);
- ceph_con_send(&monc->con, ceph_msg_get(msg));
-
- monc->sub_sent = jiffies | 1; /* never 0 */
+ struct ceph_msg *msg = monc->m_subscribe;
+ void *p = msg->front.iov_base;
+ void *const end = p + msg->front_alloc_len;
+ int num = 0;
+ int i;
+
+ dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
+
+ BUG_ON(monc->cur_mon < 0);
+
+ if (!monc->sub_renew_sent)
+ monc->sub_renew_sent = jiffies | 1; /* never 0 */
+
+ msg->hdr.version = cpu_to_le16(2);
+
+ for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+ if (monc->subs[i].want)
+ num++;
}
+ BUG_ON(num < 1); /* monmap sub is always there */
+ ceph_encode_32(&p, num);
+ for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+ const char *s = ceph_sub_str[i];
+
+ if (!monc->subs[i].want)
+ continue;
+
+ dout("%s %s start %llu flags 0x%x\n", __func__, s,
+ le64_to_cpu(monc->subs[i].item.start),
+ monc->subs[i].item.flags);
+ ceph_encode_string(&p, end, s, strlen(s));
+ memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
+ p += sizeof(monc->subs[i].item);
+ }
+
+ BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
+ msg->front.iov_len = p - msg->front.iov_base;
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ ceph_msg_revoke(msg);
+ ceph_con_send(&monc->con, ceph_msg_get(msg));
}
static void handle_subscribe_ack(struct ceph_mon_client *monc,
@@ -255,15 +291,16 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
seconds = le32_to_cpu(h->duration);
mutex_lock(&monc->mutex);
- if (monc->hunting) {
- pr_info("mon%d %s session established\n",
- monc->cur_mon,
- ceph_pr_addr(&monc->con.peer_addr.in_addr));
- monc->hunting = false;
+ if (monc->sub_renew_sent) {
+ monc->sub_renew_after = monc->sub_renew_sent +
+ (seconds >> 1) * HZ - 1;
+ dout("%s sent %lu duration %d renew after %lu\n", __func__,
+ monc->sub_renew_sent, seconds, monc->sub_renew_after);
+ monc->sub_renew_sent = 0;
+ } else {
+ dout("%s sent %lu renew after %lu, ignoring\n", __func__,
+ monc->sub_renew_sent, monc->sub_renew_after);
}
- dout("handle_subscribe_ack after %d seconds\n", seconds);
- monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
- monc->sub_sent = 0;
mutex_unlock(&monc->mutex);
return;
bad:
@@ -272,36 +309,82 @@ bad:
}
/*
- * Keep track of which maps we have
+ * Register interest in a map
+ *
+ * @sub: one of CEPH_SUB_*
+ * @epoch: X for "every map since X", or 0 for "just the latest"
*/
-int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
+static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
+ u32 epoch, bool continuous)
+{
+ __le64 start = cpu_to_le64(epoch);
+ u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
+
+ dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
+ epoch, continuous);
+
+ if (monc->subs[sub].want &&
+ monc->subs[sub].item.start == start &&
+ monc->subs[sub].item.flags == flags)
+ return false;
+
+ monc->subs[sub].item.start = start;
+ monc->subs[sub].item.flags = flags;
+ monc->subs[sub].want = true;
+
+ return true;
+}
+
+bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
+ bool continuous)
{
+ bool need_request;
+
mutex_lock(&monc->mutex);
- monc->have_mdsmap = got;
+ need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
mutex_unlock(&monc->mutex);
- return 0;
+
+ return need_request;
}
-EXPORT_SYMBOL(ceph_monc_got_mdsmap);
+EXPORT_SYMBOL(ceph_monc_want_map);
-int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
+/*
+ * Keep track of which maps we have
+ *
+ * @sub: one of CEPH_SUB_*
+ */
+static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
+ u32 epoch)
+{
+ dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
+
+ if (monc->subs[sub].want) {
+ if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
+ monc->subs[sub].want = false;
+ else
+ monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
+ }
+
+ monc->subs[sub].have = epoch;
+}
+
+void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
{
mutex_lock(&monc->mutex);
- monc->have_osdmap = got;
- monc->want_next_osdmap = 0;
+ __ceph_monc_got_map(monc, sub, epoch);
mutex_unlock(&monc->mutex);
- return 0;
}
+EXPORT_SYMBOL(ceph_monc_got_map);
/*
* Register interest in the next osdmap
*/
void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
{
- dout("request_next_osdmap have %u\n", monc->have_osdmap);
+ dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
mutex_lock(&monc->mutex);
- if (!monc->want_next_osdmap)
- monc->want_next_osdmap = 1;
- if (monc->want_next_osdmap < 2)
+ if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
+ monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
__send_subscribe(monc);
mutex_unlock(&monc->mutex);
}
@@ -320,15 +403,15 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
long ret;
mutex_lock(&monc->mutex);
- while (monc->have_osdmap < epoch) {
+ while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
mutex_unlock(&monc->mutex);
if (timeout && time_after_eq(jiffies, started + timeout))
return -ETIMEDOUT;
ret = wait_event_interruptible_timeout(monc->client->auth_wq,
- monc->have_osdmap >= epoch,
- ceph_timeout_jiffies(timeout));
+ monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
+ ceph_timeout_jiffies(timeout));
if (ret < 0)
return ret;
@@ -341,11 +424,14 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
EXPORT_SYMBOL(ceph_monc_wait_osdmap);
/*
- *
+ * Open a session with a random monitor. Request monmap and osdmap,
+ * which are waited upon in __ceph_open_session().
*/
int ceph_monc_open_session(struct ceph_mon_client *monc)
{
mutex_lock(&monc->mutex);
+ __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
+ __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
__open_session(monc);
__schedule_delayed(monc);
mutex_unlock(&monc->mutex);
@@ -353,29 +439,15 @@ int ceph_monc_open_session(struct ceph_mon_client *monc)
}
EXPORT_SYMBOL(ceph_monc_open_session);
-/*
- * We require the fsid and global_id in order to initialize our
- * debugfs dir.
- */
-static bool have_debugfs_info(struct ceph_mon_client *monc)
-{
- dout("have_debugfs_info fsid %d globalid %lld\n",
- (int)monc->client->have_fsid, monc->auth->global_id);
- return monc->client->have_fsid && monc->auth->global_id > 0;
-}
-
static void ceph_monc_handle_map(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
struct ceph_client *client = monc->client;
struct ceph_monmap *monmap = NULL, *old = monc->monmap;
void *p, *end;
- int had_debugfs_info, init_debugfs = 0;
mutex_lock(&monc->mutex);
- had_debugfs_info = have_debugfs_info(monc);
-
dout("handle_monmap\n");
p = msg->front.iov_base;
end = p + msg->front.iov_len;
@@ -395,29 +467,11 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
client->monc.monmap = monmap;
kfree(old);
- if (!client->have_fsid) {
- client->have_fsid = true;
- if (!had_debugfs_info && have_debugfs_info(monc)) {
- pr_info("client%lld fsid %pU\n",
- ceph_client_id(monc->client),
- &monc->client->fsid);
- init_debugfs = 1;
- }
- mutex_unlock(&monc->mutex);
-
- if (init_debugfs) {
- /*
- * do debugfs initialization without mutex to avoid
- * creating a locking dependency
- */
- ceph_debugfs_client_init(monc->client);
- }
+ __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
+ client->have_fsid = true;
- goto out_unlocked;
- }
out:
mutex_unlock(&monc->mutex);
-out_unlocked:
wake_up_all(&client->auth_wq);
}
@@ -745,18 +799,15 @@ static void delayed_work(struct work_struct *work)
dout("monc delayed_work\n");
mutex_lock(&monc->mutex);
if (monc->hunting) {
- __close_session(monc);
- __open_session(monc); /* continue hunting */
+ dout("%s continuing hunt\n", __func__);
+ reopen_session(monc);
} else {
- struct ceph_options *opt = monc->client->options;
int is_auth = ceph_auth_is_authenticated(monc->auth);
if (ceph_con_keepalive_expired(&monc->con,
- opt->monc_ping_timeout)) {
+ CEPH_MONC_PING_TIMEOUT)) {
dout("monc keepalive timeout\n");
is_auth = 0;
- __close_session(monc);
- monc->hunting = true;
- __open_session(monc);
+ reopen_session(monc);
}
if (!monc->hunting) {
@@ -764,8 +815,14 @@ static void delayed_work(struct work_struct *work)
__validate_auth(monc);
}
- if (is_auth)
- __send_subscribe(monc);
+ if (is_auth) {
+ unsigned long now = jiffies;
+
+ dout("%s renew subs? now %lu renew after %lu\n",
+ __func__, now, monc->sub_renew_after);
+ if (time_after_eq(now, monc->sub_renew_after))
+ __send_subscribe(monc);
+ }
}
__schedule_delayed(monc);
mutex_unlock(&monc->mutex);
@@ -852,18 +909,14 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
&monc->client->msgr);
monc->cur_mon = -1;
- monc->hunting = true;
- monc->sub_renew_after = jiffies;
- monc->sub_sent = 0;
+ monc->had_a_connection = false;
+ monc->hunt_mult = 1;
INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
monc->generic_request_tree = RB_ROOT;
monc->num_generic_requests = 0;
monc->last_tid = 0;
- monc->have_mdsmap = 0;
- monc->have_osdmap = 0;
- monc->want_next_osdmap = 1;
return 0;
out_auth_reply:
@@ -888,7 +941,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
mutex_lock(&monc->mutex);
__close_session(monc);
-
+ monc->cur_mon = -1;
mutex_unlock(&monc->mutex);
/*
@@ -910,26 +963,40 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
}
EXPORT_SYMBOL(ceph_monc_stop);
+static void finish_hunting(struct ceph_mon_client *monc)
+{
+ if (monc->hunting) {
+ dout("%s found mon%d\n", __func__, monc->cur_mon);
+ monc->hunting = false;
+ monc->had_a_connection = true;
+ monc->hunt_mult /= 2; /* reduce by 50% */
+ if (monc->hunt_mult < 1)
+ monc->hunt_mult = 1;
+ }
+}
+
static void handle_auth_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
int ret;
int was_auth = 0;
- int had_debugfs_info, init_debugfs = 0;
mutex_lock(&monc->mutex);
- had_debugfs_info = have_debugfs_info(monc);
was_auth = ceph_auth_is_authenticated(monc->auth);
monc->pending_auth = 0;
ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
msg->front.iov_len,
monc->m_auth->front.iov_base,
monc->m_auth->front_alloc_len);
+ if (ret > 0) {
+ __send_prepared_auth_request(monc, ret);
+ goto out;
+ }
+
+ finish_hunting(monc);
+
if (ret < 0) {
monc->client->auth_err = ret;
- wake_up_all(&monc->client->auth_wq);
- } else if (ret > 0) {
- __send_prepared_auth_request(monc, ret);
} else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
dout("authenticated, starting session\n");
@@ -939,23 +1006,15 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
__send_subscribe(monc);
__resend_generic_request(monc);
- }
- if (!had_debugfs_info && have_debugfs_info(monc)) {
- pr_info("client%lld fsid %pU\n",
- ceph_client_id(monc->client),
- &monc->client->fsid);
- init_debugfs = 1;
+ pr_info("mon%d %s session established\n", monc->cur_mon,
+ ceph_pr_addr(&monc->con.peer_addr.in_addr));
}
- mutex_unlock(&monc->mutex);
- if (init_debugfs) {
- /*
- * do debugfs initialization without mutex to avoid
- * creating a locking dependency
- */
- ceph_debugfs_client_init(monc->client);
- }
+out:
+ mutex_unlock(&monc->mutex);
+ if (monc->client->auth_err < 0)
+ wake_up_all(&monc->client->auth_wq);
}
static int __validate_auth(struct ceph_mon_client *monc)
@@ -1096,29 +1155,17 @@ static void mon_fault(struct ceph_connection *con)
{
struct ceph_mon_client *monc = con->private;
- if (!monc)
- return;
-
- dout("mon_fault\n");
mutex_lock(&monc->mutex);
- if (!con->private)
- goto out;
-
- if (!monc->hunting)
- pr_info("mon%d %s session lost, "
- "hunting for new mon\n", monc->cur_mon,
- ceph_pr_addr(&monc->con.peer_addr.in_addr));
-
- __close_session(monc);
- if (!monc->hunting) {
- /* start hunting */
- monc->hunting = true;
- __open_session(monc);
- } else {
- /* already hunting, let's wait a bit */
- __schedule_delayed(monc);
+ dout("%s mon%d\n", __func__, monc->cur_mon);
+ if (monc->cur_mon >= 0) {
+ if (!monc->hunting) {
+ dout("%s hunting for new mon\n", __func__);
+ reopen_session(monc);
+ __schedule_delayed(monc);
+ } else {
+ dout("%s already hunting\n", __func__);
+ }
}
-out:
mutex_unlock(&monc->mutex);
}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5bc0537..32355d9d 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,9 +338,10 @@ static void ceph_osdc_release_request(struct kref *kref)
ceph_put_snap_context(req->r_snapc);
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
- else
+ else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
kmem_cache_free(ceph_osd_request_cache, req);
-
+ else
+ kfree(req);
}
void ceph_osdc_get_request(struct ceph_osd_request *req)
@@ -369,28 +370,22 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
struct ceph_msg *msg;
size_t msg_size;
- BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
- BUG_ON(num_ops > CEPH_OSD_MAX_OP);
-
- msg_size = 4 + 4 + 8 + 8 + 4+8;
- msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
- msg_size += 1 + 8 + 4 + 4; /* pg_t */
- msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
- msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
- msg_size += 8; /* snapid */
- msg_size += 8; /* snap_seq */
- msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
- msg_size += 4;
-
if (use_mempool) {
+ BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
req = mempool_alloc(osdc->req_mempool, gfp_flags);
- memset(req, 0, sizeof(*req));
+ } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
+ req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
} else {
- req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
+ BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
+ req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
+ gfp_flags);
}
- if (req == NULL)
+ if (unlikely(!req))
return NULL;
+ /* req only, each op is zeroed in _osd_req_op_init() */
+ memset(req, 0, sizeof(*req));
+
req->r_osdc = osdc;
req->r_mempool = use_mempool;
req->r_num_ops = num_ops;
@@ -408,18 +403,36 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
req->r_base_oloc.pool = -1;
req->r_target_oloc.pool = -1;
+ msg_size = OSD_OPREPLY_FRONT_LEN;
+ if (num_ops > CEPH_OSD_SLAB_OPS) {
+ /* ceph_osd_op and rval */
+ msg_size += (num_ops - CEPH_OSD_SLAB_OPS) *
+ (sizeof(struct ceph_osd_op) + 4);
+ }
+
/* create reply message */
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
else
- msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
- OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
+ msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size,
+ gfp_flags, true);
if (!msg) {
ceph_osdc_put_request(req);
return NULL;
}
req->r_reply = msg;
+ msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
+ msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
+ msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+ msg_size += 1 + 8 + 4 + 4; /* pgid */
+ msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
+ msg_size += 2 + num_ops * sizeof(struct ceph_osd_op);
+ msg_size += 8; /* snapid */
+ msg_size += 8; /* snap_seq */
+ msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
+ msg_size += 4; /* retry_attempt */
+
/* create request message; allow space for oid */
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -498,7 +511,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
payload_len += length;
- op->payload_len = payload_len;
+ op->indata_len = payload_len;
}
EXPORT_SYMBOL(osd_req_op_extent_init);
@@ -517,10 +530,32 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
BUG_ON(length > previous);
op->extent.length = length;
- op->payload_len -= previous - length;
+ op->indata_len -= previous - length;
}
EXPORT_SYMBOL(osd_req_op_extent_update);
+void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
+ unsigned int which, u64 offset_inc)
+{
+ struct ceph_osd_req_op *op, *prev_op;
+
+ BUG_ON(which + 1 >= osd_req->r_num_ops);
+
+ prev_op = &osd_req->r_ops[which];
+ op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
+ /* dup previous one */
+ op->indata_len = prev_op->indata_len;
+ op->outdata_len = prev_op->outdata_len;
+ op->extent = prev_op->extent;
+ /* adjust offset */
+ op->extent.offset += offset_inc;
+ op->extent.length -= offset_inc;
+
+ if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
+ op->indata_len -= offset_inc;
+}
+EXPORT_SYMBOL(osd_req_op_extent_dup_last);
+
void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *class, const char *method)
{
@@ -554,7 +589,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
op->cls.argc = 0; /* currently unused */
- op->payload_len = payload_len;
+ op->indata_len = payload_len;
}
EXPORT_SYMBOL(osd_req_op_cls_init);
@@ -587,7 +622,7 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
op->xattr.cmp_mode = cmp_mode;
ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
- op->payload_len = payload_len;
+ op->indata_len = payload_len;
return 0;
}
EXPORT_SYMBOL(osd_req_op_xattr_init);
@@ -707,7 +742,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
dst->cls.indata_len = cpu_to_le32(data_length);
ceph_osdc_msg_data_add(req->r_request, osd_data);
- src->payload_len += data_length;
+ src->indata_len += data_length;
request_data_len += data_length;
}
osd_data = &src->cls.response_data;
@@ -750,7 +785,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
dst->op = cpu_to_le16(src->op);
dst->flags = cpu_to_le32(src->flags);
- dst->payload_len = cpu_to_le32(src->payload_len);
+ dst->payload_len = cpu_to_le32(src->indata_len);
return request_data_len;
}
@@ -1810,7 +1845,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
ceph_decode_need(&p, end, 4, bad_put);
numops = ceph_decode_32(&p);
- if (numops > CEPH_OSD_MAX_OP)
+ if (numops > CEPH_OSD_MAX_OPS)
goto bad_put;
if (numops != req->r_num_ops)
goto bad_put;
@@ -1821,7 +1856,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
int len;
len = le32_to_cpu(op->payload_len);
- req->r_reply_op_len[i] = len;
+ req->r_ops[i].outdata_len = len;
dout(" op %d has %d bytes\n", i, len);
payload_len += len;
p += sizeof(*op);
@@ -1836,7 +1871,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
retry_attempt = ceph_decode_32(&p);
for (i = 0; i < numops; i++)
- req->r_reply_op_result[i] = ceph_decode_32(&p);
+ req->r_ops[i].rval = ceph_decode_32(&p);
if (le16_to_cpu(msg->hdr.version) >= 6) {
p += 8 + 4; /* skip replay_version */
@@ -2187,7 +2222,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
goto bad;
done:
downgrade_write(&osdc->map_sem);
- ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
+ ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
+ osdc->osdmap->epoch);
/*
* subscribe to subsequent osdmap updates if full to ensure
@@ -2646,8 +2682,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
round_jiffies_relative(osdc->client->options->osd_idle_ttl));
err = -ENOMEM;
- osdc->req_mempool = mempool_create_kmalloc_pool(10,
- sizeof(struct ceph_osd_request));
+ osdc->req_mempool = mempool_create_slab_pool(10,
+ ceph_osd_request_cache);
if (!osdc->req_mempool)
goto out;
@@ -2782,11 +2818,12 @@ EXPORT_SYMBOL(ceph_osdc_writepages);
int ceph_osdc_setup(void)
{
+ size_t size = sizeof(struct ceph_osd_request) +
+ CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
+
BUG_ON(ceph_osd_request_cache);
- ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
- sizeof (struct ceph_osd_request),
- __alignof__(struct ceph_osd_request),
- 0, NULL);
+ ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
+ 0, 0, NULL);
return ceph_osd_request_cache ? 0 : -ENOMEM;
}
OpenPOWER on IntegriCloud