From 9f12bd119e408388233e7aeb1152f372a8b5dcad Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 20 Sep 2013 19:55:31 +0800 Subject: ceph: drop unconnected inodes Positve dentry and corresponding inode are always accompanied in MDS reply. So no need to keep inode in the cache after dropping all its aliases. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil --- fs/ceph/inode.c | 10 ++++++++++ fs/ceph/super.c | 1 + fs/ceph/super.h | 1 + 3 files changed, 12 insertions(+) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 278fd28..d37b2dc 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -436,6 +436,16 @@ void ceph_destroy_inode(struct inode *inode) call_rcu(&inode->i_rcu, ceph_i_callback); } +int ceph_drop_inode(struct inode *inode) +{ + /* + * Positve dentry and corresponding inode are always accompanied + * in MDS reply. So no need to keep inode in the cache after + * dropping all its aliases. + */ + return 1; +} + /* * Helpers to fill in size, ctime, mtime, and atime. We have to be * careful because either the client or MDS may have more up to date diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 6a0951e..e58bd4a 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -686,6 +686,7 @@ static const struct super_operations ceph_super_ops = { .alloc_inode = ceph_alloc_inode, .destroy_inode = ceph_destroy_inode, .write_inode = ceph_write_inode, + .drop_inode = ceph_drop_inode, .sync_fs = ceph_sync_fs, .put_super = ceph_put_super, .show_options = ceph_show_options, diff --git a/fs/ceph/super.h b/fs/ceph/super.h index ef4ac38..8de94b5 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -691,6 +691,7 @@ extern const struct inode_operations ceph_file_iops; extern struct inode *ceph_alloc_inode(struct super_block *sb); extern void ceph_destroy_inode(struct inode *inode); +extern int ceph_drop_inode(struct inode *inode); extern struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino); -- cgit v1.1 From e8344e668915a7488def414f016dbf7d9fce84b5 Mon Sep 17 00:00:00 2001 From: majianpeng Date: Thu, 12 Sep 2013 13:54:26 +0800 Subject: ceph: Implement writev/pwritev for sync operation. For writev/pwritev sync-operatoin, ceph only do the first iov. I divided the write-sync-operation into two functions. One for direct-write, other for none-direct-sync-write. This is because for none-direct-sync-write we can merge iovs to one. But for direct-write, we can't merge iovs. Signed-off-by: Jianpeng Ma Reviewed-by: Yan, Zheng Signed-off-by: Sage Weil --- fs/ceph/file.c | 273 ++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 193 insertions(+), 80 deletions(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 3de8982..5cf034e 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -489,83 +489,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) } } + /* - * Synchronous write, straight from __user pointer or user pages (if - * O_DIRECT). + * Synchronous write, straight from __user pointer or user pages. * * If write spans object boundary, just do multiple writes. (For a * correct atomic write, we should e.g. take write locks on all * objects, rollback on failure, etc.) */ -static ssize_t ceph_sync_write(struct file *file, const char __user *data, - size_t left, loff_t pos, loff_t *ppos) +static ssize_t +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, size_t count) { + struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_snap_context *snapc; struct ceph_vino vino; struct ceph_osd_request *req; - int num_ops = 1; struct page **pages; int num_pages; - u64 len; int written = 0; int flags; int check_caps = 0; - int page_align, io_align; - unsigned long buf_align; + int page_align; int ret; struct timespec mtime = CURRENT_TIME; - bool own_pages = false; + loff_t pos = iocb->ki_pos; + struct iov_iter i; if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; - dout("sync_write on file %p %lld~%u %s\n", file, pos, - (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); + dout("sync_direct_write on file %p %lld~%u\n", file, pos, + (unsigned)count); - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); if (ret < 0) return ret; ret = invalidate_inode_pages2_range(inode->i_mapping, pos >> PAGE_CACHE_SHIFT, - (pos + left) >> PAGE_CACHE_SHIFT); + (pos + count) >> PAGE_CACHE_SHIFT); if (ret < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE; - if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) - flags |= CEPH_OSD_FLAG_ACK; - else - num_ops++; /* Also include a 'startsync' command. */ - /* - * we may need to do multiple writes here if we span an object - * boundary. this isn't atomic, unfortunately. :( - */ -more: - io_align = pos & ~PAGE_MASK; - buf_align = (unsigned long)data & ~PAGE_MASK; - len = left; - - snapc = ci->i_snap_realm->cached_context; - vino = ceph_vino(inode); - req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, num_ops, - CEPH_OSD_OP_WRITE, flags, snapc, - ci->i_truncate_seq, ci->i_truncate_size, - false); - if (IS_ERR(req)) - return PTR_ERR(req); + iov_iter_init(&i, iov, nr_segs, count, 0); + + while (iov_iter_count(&i) > 0) { + void __user *data = i.iov->iov_base + i.iov_offset; + u64 len = i.iov->iov_len - i.iov_offset; + + page_align = (unsigned long)data & ~PAGE_MASK; + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, pos, &len, + 2,/*include a 'startsync' command*/ + CEPH_OSD_OP_WRITE, flags, snapc, + ci->i_truncate_seq, + ci->i_truncate_size, + false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } - /* write from beginning of first page, regardless of io alignment */ - page_align = file->f_flags & O_DIRECT ? buf_align : io_align; - num_pages = calc_pages_for(page_align, len); - if (file->f_flags & O_DIRECT) { + num_pages = calc_pages_for(page_align, len); pages = ceph_get_direct_page_vector(data, num_pages, false); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -577,60 +573,175 @@ more: * may block. */ truncate_inode_pages_range(inode->i_mapping, pos, - (pos+len) | (PAGE_CACHE_SIZE-1)); - } else { + (pos+len) | (PAGE_CACHE_SIZE-1)); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, + false, false); + + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); + + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + + ceph_put_page_vector(pages, num_pages, false); + +out: + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + iov_iter_advance(&i, (size_t)len); + + if (pos > i_size_read(inode)) { + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } + } else + break; + } + + if (ret != -EOLDSNAPC && written > 0) { + iocb->ki_pos = pos; + ret = written; + } + return ret; +} + + +/* + * Synchronous write, straight from __user pointer or user pages. + * + * If write spans object boundary, just do multiple writes. (For a + * correct atomic write, we should e.g. take write locks on all + * objects, rollback on failure, etc.) + */ +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, size_t count) +{ + struct file *file = iocb->ki_filp; + struct inode *inode = file_inode(file); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_snap_context *snapc; + struct ceph_vino vino; + struct ceph_osd_request *req; + struct page **pages; + u64 len; + int num_pages; + int written = 0; + int flags; + int check_caps = 0; + int ret; + struct timespec mtime = CURRENT_TIME; + loff_t pos = iocb->ki_pos; + struct iov_iter i; + + if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) + return -EROFS; + + dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count); + + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); + if (ret < 0) + return ret; + + ret = invalidate_inode_pages2_range(inode->i_mapping, + pos >> PAGE_CACHE_SHIFT, + (pos + count) >> PAGE_CACHE_SHIFT); + if (ret < 0) + dout("invalidate_inode_pages2_range returned %d\n", ret); + + flags = CEPH_OSD_FLAG_ORDERSNAP | + CEPH_OSD_FLAG_ONDISK | + CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ACK; + + iov_iter_init(&i, iov, nr_segs, count, 0); + + while ((len = iov_iter_count(&i)) > 0) { + size_t left; + int n; + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, pos, &len, 1, + CEPH_OSD_OP_WRITE, flags, snapc, + ci->i_truncate_seq, + ci->i_truncate_size, + false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } + + /* + * write from beginning of first page, + * regardless of io alignment + */ + num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; + pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out; } - ret = ceph_copy_user_to_page_vector(pages, data, pos, len); + + left = len; + for (n = 0; n < num_pages; n++) { + size_t plen = min(left, PAGE_SIZE); + ret = iov_iter_copy_from_user(pages[n], &i, 0, plen); + if (ret != plen) { + ret = -EFAULT; + break; + } + left -= ret; + iov_iter_advance(&i, ret); + } + if (ret < 0) { ceph_release_page_vector(pages, num_pages); goto out; } - if ((file->f_flags & O_SYNC) == 0) { - /* get a second commit callback */ - req->r_unsafe_callback = ceph_sync_write_unsafe; - req->r_inode = inode; - own_pages = true; - } - } - osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, - false, own_pages); + /* get a second commit callback */ + req->r_unsafe_callback = ceph_sync_write_unsafe; + req->r_inode = inode; - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ - ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, + false, true); - ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); - if (!ret) - ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); - if (file->f_flags & O_DIRECT) - ceph_put_page_vector(pages, num_pages, false); - else if (file->f_flags & O_SYNC) - ceph_release_page_vector(pages, num_pages); + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); out: - ceph_osdc_put_request(req); - if (ret == 0) { - pos += len; - written += len; - left -= len; - data += len; - if (left) - goto more; + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + + if (pos > i_size_read(inode)) { + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } + } else + break; + } + if (ret != -EOLDSNAPC && written > 0) { ret = written; - *ppos = pos; - if (pos > i_size_read(inode)) - check_caps = ceph_inode_set_size(inode, pos); - if (check_caps) - ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, - NULL); - } else if (ret != -EOLDSNAPC && written > 0) { - ret = written; + iocb->ki_pos = pos; } return ret; } @@ -772,11 +883,13 @@ retry_snap: inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || - (iocb->ki_filp->f_flags & O_DIRECT) || - (fi->flags & CEPH_F_SYNC)) { + (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) { mutex_unlock(&inode->i_mutex); - written = ceph_sync_write(file, iov->iov_base, count, - pos, &iocb->ki_pos); + if (file->f_flags & O_DIRECT) + written = ceph_sync_direct_write(iocb, iov, + nr_segs, count); + else + written = ceph_sync_write(iocb, iov, nr_segs, count); if (written == -EOLDSNAPC) { dout("aio_write %p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n", -- cgit v1.1 From 8eb4efb091c8d8f70a0e6822288b043f8691ec51 Mon Sep 17 00:00:00 2001 From: majianpeng Date: Thu, 26 Sep 2013 14:42:17 +0800 Subject: ceph: implement readv/preadv for sync operation For readv/preadv sync-operatoin, ceph only do the first iov. Now implement this. Signed-off-by: Jianpeng Ma Reviewed-by: Yan, Zheng --- fs/ceph/file.c | 162 +++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 116 insertions(+), 46 deletions(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 5cf034e..c4419e8 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -408,51 +408,92 @@ more: * * If the read spans object boundary, just do multiple reads. */ -static ssize_t ceph_sync_read(struct file *file, char __user *data, - unsigned len, loff_t *poff, int *checkeof) +static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, + int *checkeof) { + struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct page **pages; - u64 off = *poff; + u64 off = iocb->ki_pos; int num_pages, ret; + size_t len = i->count; - dout("sync_read on file %p %llu~%u %s\n", file, off, len, + dout("sync_read on file %p %llu~%u %s\n", file, off, + (unsigned)len, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); - - if (file->f_flags & O_DIRECT) { - num_pages = calc_pages_for((unsigned long)data, len); - pages = ceph_get_direct_page_vector(data, num_pages, true); - } else { - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); - } - if (IS_ERR(pages)) - return PTR_ERR(pages); - /* * flush any page cache pages in this range. this * will make concurrent normal and sync io slow, * but it will at least behave sensibly when they are * in sequence. */ - ret = filemap_write_and_wait(inode->i_mapping); + ret = filemap_write_and_wait_range(inode->i_mapping, off, + off + len); if (ret < 0) - goto done; + return ret; - ret = striped_read(inode, off, len, pages, num_pages, checkeof, - file->f_flags & O_DIRECT, - (unsigned long)data & ~PAGE_MASK); + if (file->f_flags & O_DIRECT) { + while (iov_iter_count(i)) { + void __user *data = i->iov[0].iov_base + i->iov_offset; + size_t len = i->iov[0].iov_len - i->iov_offset; + + num_pages = calc_pages_for((unsigned long)data, len); + pages = ceph_get_direct_page_vector(data, + num_pages, true); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + ret = striped_read(inode, off, len, + pages, num_pages, checkeof, + 1, (unsigned long)data & ~PAGE_MASK); + ceph_put_page_vector(pages, num_pages, true); + + if (ret <= 0) + break; + off += ret; + iov_iter_advance(i, ret); + if (ret < len) + break; + } + } else { + num_pages = calc_pages_for(off, len); + pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); + if (IS_ERR(pages)) + return PTR_ERR(pages); + ret = striped_read(inode, off, len, pages, + num_pages, checkeof, 0, 0); + if (ret > 0) { + int l, k = 0; + size_t left = len = ret; + + while (left) { + void __user *data = i->iov[0].iov_base + + i->iov_offset; + l = min(i->iov[0].iov_len - i->iov_offset, + left); + + ret = ceph_copy_page_vector_to_user(&pages[k], + data, off, + l); + if (ret > 0) { + iov_iter_advance(i, ret); + left -= ret; + off += ret; + k = calc_pages_for(iocb->ki_pos, + len - left + 1) - 1; + BUG_ON(k >= num_pages && left); + } else + break; + } + } + ceph_release_page_vector(pages, num_pages); + } - if (ret >= 0 && (file->f_flags & O_DIRECT) == 0) - ret = ceph_copy_page_vector_to_user(pages, data, off, ret); - if (ret >= 0) - *poff = off + ret; + if (off > iocb->ki_pos) { + ret = off - iocb->ki_pos; + iocb->ki_pos = off; + } -done: - if (file->f_flags & O_DIRECT) - ceph_put_page_vector(pages, num_pages, true); - else - ceph_release_page_vector(pages, num_pages); dout("sync_read result %d\n", ret); return ret; } @@ -758,55 +799,84 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov, { struct file *filp = iocb->ki_filp; struct ceph_file_info *fi = filp->private_data; - loff_t *ppos = &iocb->ki_pos; - size_t len = iov->iov_len; + size_t len = iocb->ki_nbytes; struct inode *inode = file_inode(filp); struct ceph_inode_info *ci = ceph_inode(inode); - void __user *base = iov->iov_base; ssize_t ret; int want, got = 0; int checkeof = 0, read = 0; - dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", - inode, ceph_vinop(inode), pos, (unsigned)len, inode); again: + dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", + inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode); + if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; else want = CEPH_CAP_FILE_CACHE; ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); if (ret < 0) - goto out; - dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", - inode, ceph_vinop(inode), pos, (unsigned)len, - ceph_cap_string(got)); + return ret; if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (fi->flags & CEPH_F_SYNC)) + (fi->flags & CEPH_F_SYNC)) { + struct iov_iter i; + + dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n", + inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, + ceph_cap_string(got)); + + if (!read) { + ret = generic_segment_checks(iov, &nr_segs, + &len, VERIFY_WRITE); + if (ret) + goto out; + } + + iov_iter_init(&i, iov, nr_segs, len, read); + /* hmm, this isn't really async... */ - ret = ceph_sync_read(filp, base, len, ppos, &checkeof); - else - ret = generic_file_aio_read(iocb, iov, nr_segs, pos); + ret = ceph_sync_read(iocb, &i, &checkeof); + } else { + /* + * We can't modify the content of iov, + * so we only read from beginning. + */ + if (read) { + iocb->ki_pos = pos; + len = iocb->ki_nbytes; + read = 0; + } + dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", + inode, ceph_vinop(inode), pos, (unsigned)len, + ceph_cap_string(got)); + ret = generic_file_aio_read(iocb, iov, nr_segs, pos); + } out: dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); ceph_put_cap_refs(ci, got); if (checkeof && ret >= 0) { - int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); + int statret = ceph_do_getattr(inode, + CEPH_STAT_CAP_SIZE); /* hit EOF or hole? */ - if (statret == 0 && *ppos < inode->i_size) { - dout("aio_read sync_read hit hole, ppos %lld < size %lld, reading more\n", *ppos, inode->i_size); + if (statret == 0 && iocb->ki_pos < inode->i_size && + ret < len) { + dout("sync_read hit hole, ppos %lld < size %lld" + ", reading more\n", iocb->ki_pos, + inode->i_size); + read += ret; - base += ret; len -= ret; checkeof = 0; goto again; } } + if (ret >= 0) ret += read; -- cgit v1.1 From f36132a75aafd0086aeb0eacf348654138d56b49 Mon Sep 17 00:00:00 2001 From: Li Wang Date: Wed, 27 Nov 2013 22:28:13 +0800 Subject: ceph: Clean up if error occurred in finish_read() Clean up if error occurred rather than going through normal process Signed-off-by: Li Wang Signed-off-by: Yunchuan Wen Signed-off-by: Sage Weil --- fs/ceph/addr.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index ec3ba43..c346b84 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -256,6 +256,8 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) for (i = 0; i < num_pages; i++) { struct page *page = osd_data->pages[i]; + if (rc < 0) + goto unlock; if (bytes < (int)PAGE_CACHE_SIZE) { /* zero (remainder of) page */ int s = bytes < 0 ? 0 : bytes; @@ -266,6 +268,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) flush_dcache_page(page); SetPageUptodate(page); ceph_readpage_to_fscache(inode, page); +unlock: unlock_page(page); page_cache_release(page); bytes -= PAGE_CACHE_SIZE; -- cgit v1.1 From 37c89bde5d402c25211a9e31e3166067f85aa31b Mon Sep 17 00:00:00 2001 From: Li Wang Date: Wed, 27 Nov 2013 22:28:14 +0800 Subject: ceph: Add necessary clean up if invalid reply received in handle_reply() Wake up possible waiters, invoke the call back if any, unregister the request Signed-off-by: Li Wang Signed-off-by: Yunchuan Wen Signed-off-by: Sage Weil --- net/ceph/osd_client.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 2b4b32a..a17eaae 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1581,6 +1581,13 @@ done: return; bad_put: + req->r_result = -EIO; + __unregister_request(osdc, req); + if (req->r_callback) + req->r_callback(req, msg); + else + complete_all(&req->r_completion); + complete_request(req); ceph_osdc_put_request(req); bad_mutex: mutex_unlock(&osdc->request_mutex); -- cgit v1.1 From aa8b60e077fa2a4383e79b092b65cd5455ea5ab2 Mon Sep 17 00:00:00 2001 From: Libo Chen Date: Wed, 11 Dec 2013 13:49:11 +0800 Subject: fs: ceph: new helper: file_inode(file) Signed-off-by: Libo Chen Signed-off-by: Sage Weil --- fs/ceph/file.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index c4419e8..d10510a 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -1201,7 +1201,7 @@ static long ceph_fallocate(struct file *file, int mode, loff_t offset, loff_t length) { struct ceph_file_info *fi = file->private_data; - struct inode *inode = file->f_dentry->d_inode; + struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; -- cgit v1.1 From d29adb34a94715174c88ca93e8aba955850c9bde Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 2 Dec 2013 19:11:48 -0800 Subject: libceph: block I/O when PAUSE or FULL osd map flags are set The PAUSEWR and PAUSERD flags are meant to stop the cluster from processing writes and reads, respectively. The FULL flag is set when the cluster determines that it is out of space, and will no longer process writes. PAUSEWR and PAUSERD are purely client-side settings already implemented in userspace clients. The osd does nothing special with these flags. When the FULL flag is set, however, the osd responds to all writes with -ENOSPC. For cephfs, this makes sense, but for rbd the block layer translates this into EIO. If a cluster goes from full to non-full quickly, a filesystem on top of rbd will not behave well, since some writes succeed while others get EIO. Fix this by blocking any writes when the FULL flag is set in the osd client. This is the same strategy used by userspace, so apply it by default. A follow-on patch makes this configurable. __map_request() is called to re-target osd requests in case the available osds changed. Add a paused field to a ceph_osd_request, and set it whenever an appropriate osd map flag is set. Avoid queueing paused requests in __map_request(), but force them to be resent if they become unpaused. Also subscribe to the next osd map from the monitor if any of these flags are set, so paused requests can be unblocked as soon as possible. Fixes: http://tracker.ceph.com/issues/6079 Reviewed-by: Sage Weil Signed-off-by: Josh Durgin --- include/linux/ceph/osd_client.h | 1 + net/ceph/osd_client.c | 29 +++++++++++++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 8f47625..4fb6a89 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -138,6 +138,7 @@ struct ceph_osd_request { __le64 *r_request_pool; void *r_request_pgid; __le32 *r_request_attempts; + bool r_paused; struct ceph_eversion *r_request_reassert_version; int r_result; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a17eaae..1ad9866 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1232,6 +1232,22 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, EXPORT_SYMBOL(ceph_osdc_set_request_linger); /* + * Returns whether a request should be blocked from being sent + * based on the current osdmap and osd_client settings. + * + * Caller should hold map_sem for read. + */ +static bool __req_should_be_paused(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); + bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); + return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) || + (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr); +} + +/* * Pick an osd (the first 'up' osd in the pg), allocate the osd struct * (as needed), and set the request r_osd appropriately. If there is * no up osd, set r_osd to NULL. Move the request to the appropriate list @@ -1248,6 +1264,7 @@ static int __map_request(struct ceph_osd_client *osdc, int acting[CEPH_PG_MAX_SIZE]; int o = -1, num = 0; int err; + bool was_paused; dout("map_request %p tid %lld\n", req, req->r_tid); err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, @@ -1264,12 +1281,18 @@ static int __map_request(struct ceph_osd_client *osdc, num = err; } + was_paused = req->r_paused; + req->r_paused = __req_should_be_paused(osdc, req); + if (was_paused && !req->r_paused) + force_resend = 1; + if ((!force_resend && req->r_osd && req->r_osd->o_osd == o && req->r_sent >= req->r_osd->o_incarnation && req->r_num_pg_osds == num && memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || - (req->r_osd == NULL && o == -1)) + (req->r_osd == NULL && o == -1) || + req->r_paused) return 0; /* no change */ dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", @@ -1811,7 +1834,9 @@ done: * we find out when we are no longer full and stop returning * ENOSPC. */ - if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) ceph_monc_request_next_osdmap(&osdc->client->monc); mutex_lock(&osdc->request_mutex); -- cgit v1.1 From 9a1ea2dbff11547a8e664f143c1ffefc586a577a Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Tue, 10 Dec 2013 09:35:13 -0800 Subject: libceph: resend all writes after the osdmap loses the full flag With the current full handling, there is a race between osds and clients getting the first map marked full. If the osd wins, it will return -ENOSPC to any writes, but the client may already have writes in flight. This results in the client getting the error and propagating it up the stack. For rbd, the block layer turns this into EIO, which can cause corruption in filesystems above it. To avoid this race, osds are being changed to drop writes that came from clients with an osdmap older than the last osdmap marked full. In order for this to work, clients must resend all writes after they encounter a full -> not full transition in the osdmap. osds will wait for an updated map instead of processing a request from a client with a newer map, so resent writes will not be dropped by the osd unless there is another not full -> full transition. This approach requires both osds and clients to be fixed to avoid the race. Old clients talking to osds with this fix may hang instead of returning EIO and potentially corrupting an fs. New clients talking to old osds have the same behavior as before if they encounter this race. Fixes: http://tracker.ceph.com/issues/6938 Reviewed-by: Sage Weil Signed-off-by: Josh Durgin --- net/ceph/osd_client.c | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 1ad9866..9f19935 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1643,14 +1643,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc) * * Caller should hold map_sem for read. */ -static void kick_requests(struct ceph_osd_client *osdc, int force_resend) +static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, + bool force_resend_writes) { struct ceph_osd_request *req, *nreq; struct rb_node *p; int needmap = 0; int err; + bool force_resend_req; - dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); + dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", + force_resend_writes ? " (force resend writes)" : ""); mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; ) { req = rb_entry(p, struct ceph_osd_request, r_node); @@ -1675,7 +1678,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) continue; } - err = __map_request(osdc, req, force_resend); + force_resend_req = force_resend || + (force_resend_writes && + req->r_flags & CEPH_OSD_FLAG_WRITE); + err = __map_request(osdc, req, force_resend_req); if (err < 0) continue; /* error */ if (req->r_osd == NULL) { @@ -1695,7 +1701,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) r_linger_item) { dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); - err = __map_request(osdc, req, force_resend); + err = __map_request(osdc, req, + force_resend || force_resend_writes); dout("__map_request returned %d\n", err); if (err == 0) continue; /* no change and no osd was specified */ @@ -1737,6 +1744,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) struct ceph_osdmap *newmap = NULL, *oldmap; int err; struct ceph_fsid fsid; + bool was_full; dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); p = msg->front.iov_base; @@ -1750,6 +1758,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) down_write(&osdc->map_sem); + was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); + /* incremental maps */ ceph_decode_32_safe(&p, end, nr_maps, bad); dout(" %d inc maps\n", nr_maps); @@ -1774,7 +1784,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ceph_osdmap_destroy(osdc->osdmap); osdc->osdmap = newmap; } - kick_requests(osdc, 0); + was_full = was_full || + ceph_osdmap_flag(osdc->osdmap, + CEPH_OSDMAP_FULL); + kick_requests(osdc, 0, was_full); } else { dout("ignoring incremental map %u len %d\n", epoch, maplen); @@ -1817,7 +1830,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) skipped_map = 1; ceph_osdmap_destroy(oldmap); } - kick_requests(osdc, skipped_map); + was_full = was_full || + ceph_osdmap_flag(osdc->osdmap, + CEPH_OSDMAP_FULL); + kick_requests(osdc, skipped_map, was_full); } p += maplen; nr_maps--; -- cgit v1.1 From 70eebd200696aea897bfb596053d0c688ec1894b Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 13 Dec 2013 15:28:56 +0200 Subject: rbd: rbd_device::dev_id is an int, format it as such rbd_device::dev_id is an int, format it as such. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cb1db29..60536e4 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4385,8 +4385,7 @@ static void rbd_dev_id_get(struct rbd_device *rbd_dev) spin_lock(&rbd_dev_list_lock); list_add_tail(&rbd_dev->node, &rbd_dev_list); spin_unlock(&rbd_dev_list_lock); - dout("rbd_dev %p given dev id %llu\n", rbd_dev, - (unsigned long long) rbd_dev->dev_id); + dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id); } /* @@ -4401,8 +4400,7 @@ static void rbd_dev_id_put(struct rbd_device *rbd_dev) rbd_assert(rbd_id > 0); - dout("rbd_dev %p released dev id %llu\n", rbd_dev, - (unsigned long long) rbd_dev->dev_id); + dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id); spin_lock(&rbd_dev_list_lock); list_del_init(&rbd_dev->node); -- cgit v1.1 From 90da258b887538ed3a2f904fa593173f26a1adbc Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 13 Dec 2013 15:28:56 +0200 Subject: rbd: tweak "loaded" message and module description Tweak "loaded" message, so that it looks like [ 30.184235] rbd: loaded instead of [ 38.056564] rbd: loaded rbd (rados block device) Also move (and slightly tweak) MODULE_DESCRIPTION so that all authors are next to each other in modinfo output. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 60536e4..d743900 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -89,7 +89,6 @@ static int atomic_dec_return_safe(atomic_t *v) } #define RBD_DRV_NAME "rbd" -#define RBD_DRV_NAME_LONG "rbd (rados block device)" #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ @@ -5303,7 +5302,7 @@ static int __init rbd_init(void) if (rc) rbd_slab_exit(); else - pr_info("loaded " RBD_DRV_NAME_LONG "\n"); + pr_info("loaded\n"); return rc; } @@ -5320,9 +5319,8 @@ module_exit(rbd_exit); MODULE_AUTHOR("Alex Elder "); MODULE_AUTHOR("Sage Weil "); MODULE_AUTHOR("Yehuda Sadeh "); -MODULE_DESCRIPTION("rados block device"); - /* following authorship retained from original osdblk.c */ MODULE_AUTHOR("Jeff Garzik "); +MODULE_DESCRIPTION("RADOS Block Device (RBD) driver"); MODULE_LICENSE("GPL"); -- cgit v1.1 From e1b4d96dea61c3078775090e8b121f571aab8fda Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 13 Dec 2013 15:28:57 +0200 Subject: rbd: refactor rbd_init() a bit Refactor rbd_init() a bit to make it more clear what's going on. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index d743900..8b78a08 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -5292,18 +5292,22 @@ static int __init rbd_init(void) if (!libceph_compatible(NULL)) { rbd_warn(NULL, "libceph incompatibility (quitting)"); - return -EINVAL; } + rc = rbd_slab_init(); if (rc) return rc; + rc = rbd_sysfs_init(); if (rc) - rbd_slab_exit(); - else - pr_info("loaded\n"); + goto err_out_slab; + pr_info("loaded\n"); + return 0; + +err_out_slab: + rbd_slab_exit(); return rc; } -- cgit v1.1 From f8a22fc238a449ff982bfb40e30c3f3c9c90a08a Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 13 Dec 2013 15:28:57 +0200 Subject: rbd: switch to ida for rbd id assignments Currently rbd ids are allocated using an atomic variable that keeps track of the highest id currently in use and each new id is simply one more than the value of that variable. That's nice and cheap, but it does mean that rbd ids are allowed to grow boundlessly, and, more importantly, it's completely unpredictable. So, in preparation for single-major device number allocation scheme, which is going to establish and rely on a constant mapping between rbd ids and device numbers, switch to ida for rbd id assignments. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 69 ++++++++++++++++++----------------------------------- 1 file changed, 23 insertions(+), 46 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 8b78a08..d250549 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -41,6 +41,7 @@ #include #include #include +#include #include "rbd_types.h" @@ -385,6 +386,8 @@ static struct kmem_cache *rbd_img_request_cache; static struct kmem_cache *rbd_obj_request_cache; static struct kmem_cache *rbd_segment_name_cache; +static DEFINE_IDA(rbd_dev_id_ida); + static int rbd_img_request_submit(struct rbd_img_request *img_request); static void rbd_dev_device_release(struct device *dev); @@ -4371,20 +4374,27 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev) device_unregister(&rbd_dev->dev); } -static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); - /* * Get a unique rbd identifier for the given new rbd_dev, and add - * the rbd_dev to the global list. The minimum rbd id is 1. + * the rbd_dev to the global list. */ -static void rbd_dev_id_get(struct rbd_device *rbd_dev) +static int rbd_dev_id_get(struct rbd_device *rbd_dev) { - rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max); + int new_dev_id; + + new_dev_id = ida_simple_get(&rbd_dev_id_ida, 0, 0, GFP_KERNEL); + if (new_dev_id < 0) + return new_dev_id; + + rbd_dev->dev_id = new_dev_id; spin_lock(&rbd_dev_list_lock); list_add_tail(&rbd_dev->node, &rbd_dev_list); spin_unlock(&rbd_dev_list_lock); + dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id); + + return 0; } /* @@ -4393,48 +4403,13 @@ static void rbd_dev_id_get(struct rbd_device *rbd_dev) */ static void rbd_dev_id_put(struct rbd_device *rbd_dev) { - struct list_head *tmp; - int rbd_id = rbd_dev->dev_id; - int max_id; - - rbd_assert(rbd_id > 0); - - dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id); spin_lock(&rbd_dev_list_lock); list_del_init(&rbd_dev->node); - - /* - * If the id being "put" is not the current maximum, there - * is nothing special we need to do. - */ - if (rbd_id != atomic64_read(&rbd_dev_id_max)) { - spin_unlock(&rbd_dev_list_lock); - return; - } - - /* - * We need to update the current maximum id. Search the - * list to find out what it is. We're more likely to find - * the maximum at the end, so search the list backward. - */ - max_id = 0; - list_for_each_prev(tmp, &rbd_dev_list) { - struct rbd_device *rbd_dev; - - rbd_dev = list_entry(tmp, struct rbd_device, node); - if (rbd_dev->dev_id > max_id) - max_id = rbd_dev->dev_id; - } spin_unlock(&rbd_dev_list_lock); - /* - * The max id could have been updated by rbd_dev_id_get(), in - * which case it now accurately reflects the new maximum. - * Be careful not to overwrite the maximum value in that - * case. - */ - atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id); - dout(" max dev id has been reset\n"); + ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); + + dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id); } /* @@ -4857,10 +4832,12 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) { int ret; - /* generate unique id: find highest unique id, add one */ - rbd_dev_id_get(rbd_dev); + /* Get an id and fill in device name. */ + + ret = rbd_dev_id_get(rbd_dev); + if (ret) + return ret; - /* Fill in the device name, now that we have its id. */ BUILD_BUG_ON(DEV_NAME_LEN < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); -- cgit v1.1 From dd82fff1e8e7b486887dd88981776bb44e370848 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 13 Dec 2013 15:28:57 +0200 Subject: rbd: add 'minor' sysfs rbd device attribute Introduce /sys/bus/rbd/devices//minor sysfs attribute for exporting rbd whole disk minor numbers. This is a step towards single-major device number allocation scheme, but also a good thing on its own. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: Josh Durgin --- Documentation/ABI/testing/sysfs-bus-rbd | 4 ++++ drivers/block/rbd.c | 13 ++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 0a30647..17b119c 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -33,6 +33,10 @@ major The block device major number. +minor + + The block device minor number. (December 2013, since 3.14.) + name The name of the rbd image. diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index d250549..01ed767 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -323,6 +323,7 @@ struct rbd_device { int dev_id; /* blkdev unique id */ int major; /* blkdev assigned major */ + int minor; struct gendisk *disk; /* blkdev's gendisk and rq */ u32 image_format; /* Either 1 or 2 */ @@ -3397,7 +3398,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", rbd_dev->dev_id); disk->major = rbd_dev->major; - disk->first_minor = 0; + disk->first_minor = rbd_dev->minor; disk->fops = &rbd_bd_ops; disk->private_data = rbd_dev; @@ -3469,7 +3470,14 @@ static ssize_t rbd_major_show(struct device *dev, return sprintf(buf, "%d\n", rbd_dev->major); return sprintf(buf, "(none)\n"); +} + +static ssize_t rbd_minor_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + return sprintf(buf, "%d\n", rbd_dev->minor); } static ssize_t rbd_client_id_show(struct device *dev, @@ -3591,6 +3599,7 @@ static ssize_t rbd_image_refresh(struct device *dev, static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); +static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); @@ -3604,6 +3613,7 @@ static struct attribute *rbd_attrs[] = { &dev_attr_size.attr, &dev_attr_features.attr, &dev_attr_major.attr, + &dev_attr_minor.attr, &dev_attr_client_id.attr, &dev_attr_pool.attr, &dev_attr_pool_id.attr, @@ -4848,6 +4858,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) if (ret < 0) goto err_out_id; rbd_dev->major = ret; + rbd_dev->minor = 0; /* Set up the blkdev mapping. */ -- cgit v1.1 From 92c76dc036e2139226e90851864d3e01e1db5dd8 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 13 Dec 2013 15:28:57 +0200 Subject: rbd: wire up is_visible() sysfs callback for rbd bus In preparation for single-major device number allocation scheme, wire up attribute_group::is_visible() callback for rbd bus. This allows us to make the new single-major attributes conditional. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 01ed767..3fa18b0 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -408,7 +408,18 @@ static struct attribute *rbd_bus_attrs[] = { &bus_attr_remove.attr, NULL, }; -ATTRIBUTE_GROUPS(rbd_bus); + +static umode_t rbd_bus_is_visible(struct kobject *kobj, + struct attribute *attr, int index) +{ + return attr->mode; +} + +static const struct attribute_group rbd_bus_group = { + .attrs = rbd_bus_attrs, + .is_visible = rbd_bus_is_visible, +}; +__ATTRIBUTE_GROUPS(rbd_bus); static struct bus_type rbd_bus_type = { .name = "rbd", -- cgit v1.1 From 9b60e70b3b6a8e4bc2d1b6d9f858a30e1cec496b Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 13 Dec 2013 15:28:57 +0200 Subject: rbd: add support for single-major device number allocation scheme Currently each rbd device is allocated its own major number, which leads to a hard limit of 230-250 images mapped at once. This commit adds support for a new single-major device number allocation scheme, which is hidden behind a new single_major boolean module parameter and is disabled by default for backwards compatibility reasons. (Old userspace cannot correctly unmap images mapped under single-major scheme and would essentially just unmap a random image, if that.) $ rbd showmapped id pool image snap device 0 rbd b100 - /dev/rbd0 1 rbd b101 - /dev/rbd1 2 rbd b102 - /dev/rbd2 3 rbd b103 - /dev/rbd3 Old scheme (modprobe rbd): $ ls -l /dev/rbd* brw-rw---- 1 root disk 253, 0 Dec 10 12:24 /dev/rbd0 brw-rw---- 1 root disk 252, 0 Dec 10 12:28 /dev/rbd1 brw-rw---- 1 root disk 252, 1 Dec 10 12:28 /dev/rbd1p1 brw-rw---- 1 root disk 252, 2 Dec 10 12:28 /dev/rbd1p2 brw-rw---- 1 root disk 252, 3 Dec 10 12:28 /dev/rbd1p3 brw-rw---- 1 root disk 251, 0 Dec 10 12:28 /dev/rbd2 brw-rw---- 1 root disk 251, 1 Dec 10 12:28 /dev/rbd2p1 brw-rw---- 1 root disk 250, 0 Dec 10 12:24 /dev/rbd3 New scheme (modprobe rbd single_major=Y): $ ls -l /dev/rbd* brw-rw---- 1 root disk 253, 0 Dec 10 12:30 /dev/rbd0 brw-rw---- 1 root disk 253, 256 Dec 10 12:30 /dev/rbd1 brw-rw---- 1 root disk 253, 257 Dec 10 12:30 /dev/rbd1p1 brw-rw---- 1 root disk 253, 258 Dec 10 12:30 /dev/rbd1p2 brw-rw---- 1 root disk 253, 259 Dec 10 12:30 /dev/rbd1p3 brw-rw---- 1 root disk 253, 512 Dec 10 12:30 /dev/rbd2 brw-rw---- 1 root disk 253, 513 Dec 10 12:30 /dev/rbd2p1 brw-rw---- 1 root disk 253, 768 Dec 10 12:30 /dev/rbd3 (major 253 was assigned dynamically at module load time) The new limit is 4096 images mapped at once, and it comes from the fact that, as before, 256 minor numbers are reserved for each mapping. (A follow-up commit changes the number of minors reserved and the way we deal with partitions over that number.) If single_major is set to true, two new sysfs interfaces show up: /sys/bus/rbd/{add,remove}_single_major. These are to be used instead of /sys/bus/rbd/{add,remove}, which are disabled for backwards compatibility reasons outlined above. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: Josh Durgin --- Documentation/ABI/testing/sysfs-bus-rbd | 22 ++++++ drivers/block/rbd.c | 132 +++++++++++++++++++++++++++----- 2 files changed, 134 insertions(+), 20 deletions(-) diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 17b119c..501adc2 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -18,6 +18,28 @@ Removal of a device: $ echo > /sys/bus/rbd/remove +What: /sys/bus/rbd/add_single_major +Date: December 2013 +KernelVersion: 3.14 +Contact: Sage Weil +Description: Available only if rbd module is inserted with single_major + parameter set to true. + Usage is the same as for /sys/bus/rbd/add. If present, + should be used instead of the latter: any attempts to use + /sys/bus/rbd/add if /sys/bus/rbd/add_single_major is + available will fail for backwards compatibility reasons. + +What: /sys/bus/rbd/remove_single_major +Date: December 2013 +KernelVersion: 3.14 +Contact: Sage Weil +Description: Available only if rbd module is inserted with single_major + parameter set to true. + Usage is the same as for /sys/bus/rbd/remove. If present, + should be used instead of the latter: any attempts to use + /sys/bus/rbd/remove if /sys/bus/rbd/remove_single_major is + available will fail for backwards compatibility reasons. + Entries under /sys/bus/rbd/devices// -------------------------------------------- diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 3fa18b0..e5ddcb5 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -91,7 +91,7 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_DRV_NAME "rbd" -#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ +#define RBD_PART_SHIFT 8 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" #define RBD_MAX_SNAP_NAME_LEN \ @@ -387,8 +387,17 @@ static struct kmem_cache *rbd_img_request_cache; static struct kmem_cache *rbd_obj_request_cache; static struct kmem_cache *rbd_segment_name_cache; +static int rbd_major; static DEFINE_IDA(rbd_dev_id_ida); +/* + * Default to false for now, as single-major requires >= 0.75 version of + * userspace rbd utility. + */ +static bool single_major = false; +module_param(single_major, bool, S_IRUGO); +MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)"); + static int rbd_img_request_submit(struct rbd_img_request *img_request); static void rbd_dev_device_release(struct device *dev); @@ -397,21 +406,44 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); static ssize_t rbd_remove(struct bus_type *bus, const char *buf, size_t count); +static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf, + size_t count); +static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf, + size_t count); static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping); static void rbd_spec_put(struct rbd_spec *spec); +static int rbd_dev_id_to_minor(int dev_id) +{ + return dev_id << RBD_PART_SHIFT; +} + +static int minor_to_rbd_dev_id(int minor) +{ + return minor >> RBD_PART_SHIFT; +} + static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); +static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); +static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major); static struct attribute *rbd_bus_attrs[] = { &bus_attr_add.attr, &bus_attr_remove.attr, + &bus_attr_add_single_major.attr, + &bus_attr_remove_single_major.attr, NULL, }; static umode_t rbd_bus_is_visible(struct kobject *kobj, struct attribute *attr, int index) { + if (!single_major && + (attr == &bus_attr_add_single_major.attr || + attr == &bus_attr_remove_single_major.attr)) + return 0; + return attr->mode; } @@ -3402,7 +3434,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) u64 segment_size; /* create gendisk info */ - disk = alloc_disk(RBD_MINORS_PER_MAJOR); + disk = alloc_disk(1 << RBD_PART_SHIFT); if (!disk) return -ENOMEM; @@ -4403,7 +4435,9 @@ static int rbd_dev_id_get(struct rbd_device *rbd_dev) { int new_dev_id; - new_dev_id = ida_simple_get(&rbd_dev_id_ida, 0, 0, GFP_KERNEL); + new_dev_id = ida_simple_get(&rbd_dev_id_ida, + 0, minor_to_rbd_dev_id(1 << MINORBITS), + GFP_KERNEL); if (new_dev_id < 0) return new_dev_id; @@ -4863,13 +4897,19 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); - /* Get our block major device number. */ + /* Record our major and minor device numbers. */ - ret = register_blkdev(0, rbd_dev->name); - if (ret < 0) - goto err_out_id; - rbd_dev->major = ret; - rbd_dev->minor = 0; + if (!single_major) { + ret = register_blkdev(0, rbd_dev->name); + if (ret < 0) + goto err_out_id; + + rbd_dev->major = ret; + rbd_dev->minor = 0; + } else { + rbd_dev->major = rbd_major; + rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id); + } /* Set up the blkdev mapping. */ @@ -4901,7 +4941,8 @@ err_out_mapping: err_out_disk: rbd_free_disk(rbd_dev); err_out_blkdev: - unregister_blkdev(rbd_dev->major, rbd_dev->name); + if (!single_major) + unregister_blkdev(rbd_dev->major, rbd_dev->name); err_out_id: rbd_dev_id_put(rbd_dev); rbd_dev_mapping_clear(rbd_dev); @@ -5022,9 +5063,9 @@ err_out_format: return ret; } -static ssize_t rbd_add(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t do_rbd_add(struct bus_type *bus, + const char *buf, + size_t count) { struct rbd_device *rbd_dev = NULL; struct ceph_options *ceph_opts = NULL; @@ -5106,6 +5147,23 @@ err_out_module: return (ssize_t)rc; } +static ssize_t rbd_add(struct bus_type *bus, + const char *buf, + size_t count) +{ + if (single_major) + return -EINVAL; + + return do_rbd_add(bus, buf, count); +} + +static ssize_t rbd_add_single_major(struct bus_type *bus, + const char *buf, + size_t count) +{ + return do_rbd_add(bus, buf, count); +} + static void rbd_dev_device_release(struct device *dev) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); @@ -5113,8 +5171,8 @@ static void rbd_dev_device_release(struct device *dev) rbd_free_disk(rbd_dev); clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); rbd_dev_mapping_clear(rbd_dev); - unregister_blkdev(rbd_dev->major, rbd_dev->name); - rbd_dev->major = 0; + if (!single_major) + unregister_blkdev(rbd_dev->major, rbd_dev->name); rbd_dev_id_put(rbd_dev); rbd_dev_mapping_clear(rbd_dev); } @@ -5145,9 +5203,9 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) } } -static ssize_t rbd_remove(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t do_rbd_remove(struct bus_type *bus, + const char *buf, + size_t count) { struct rbd_device *rbd_dev = NULL; struct list_head *tmp; @@ -5210,6 +5268,23 @@ static ssize_t rbd_remove(struct bus_type *bus, return count; } +static ssize_t rbd_remove(struct bus_type *bus, + const char *buf, + size_t count) +{ + if (single_major) + return -EINVAL; + + return do_rbd_remove(bus, buf, count); +} + +static ssize_t rbd_remove_single_major(struct bus_type *bus, + const char *buf, + size_t count) +{ + return do_rbd_remove(bus, buf, count); +} + /* * create control files in sysfs * /sys/bus/rbd/... @@ -5298,13 +5373,28 @@ static int __init rbd_init(void) if (rc) return rc; + if (single_major) { + rbd_major = register_blkdev(0, RBD_DRV_NAME); + if (rbd_major < 0) { + rc = rbd_major; + goto err_out_slab; + } + } + rc = rbd_sysfs_init(); if (rc) - goto err_out_slab; + goto err_out_blkdev; + + if (single_major) + pr_info("loaded (major %d)\n", rbd_major); + else + pr_info("loaded\n"); - pr_info("loaded\n"); return 0; +err_out_blkdev: + if (single_major) + unregister_blkdev(rbd_major, RBD_DRV_NAME); err_out_slab: rbd_slab_exit(); return rc; @@ -5313,6 +5403,8 @@ err_out_slab: static void __exit rbd_exit(void) { rbd_sysfs_cleanup(); + if (single_major) + unregister_blkdev(rbd_major, RBD_DRV_NAME); rbd_slab_exit(); } -- cgit v1.1 From 61f68816211ee4b884dc0dda8dd4d977548f4865 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 28 Nov 2013 14:28:14 +0800 Subject: ceph: check caps in filemap_fault and page_mkwrite Adds cap check to the page fault handler. The check prevents page fault handler from adding new page to the page cache while Fcb caps are being revoked. This solves Fc revoking hang in multiple clients mmap IO workload. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil --- fs/ceph/addr.c | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 77 insertions(+), 12 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index c346b84..ebda329 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1210,6 +1210,41 @@ const struct address_space_operations ceph_aops = { /* * vm ops */ +static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) +{ + struct inode *inode = file_inode(vma->vm_file); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_file_info *fi = vma->vm_file->private_data; + loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT; + int want, got, ret; + + dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n", + inode, ceph_vinop(inode), off, PAGE_CACHE_SIZE); + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_CACHE; + while (1) { + got = 0; + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); + if (ret == 0) + break; + if (ret != -ERESTARTSYS) { + WARN_ON(1); + return VM_FAULT_SIGBUS; + } + } + dout("filemap_fault %p %llu~%zd got cap refs on %s\n", + inode, off, PAGE_CACHE_SIZE, ceph_cap_string(got)); + + ret = filemap_fault(vma, vmf); + + dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", + inode, off, PAGE_CACHE_SIZE, ceph_cap_string(got), ret); + ceph_put_cap_refs(ci, got); + + return ret; +} /* * Reuse write_begin here for simplicity. @@ -1217,23 +1252,41 @@ const struct address_space_operations ceph_aops = { static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) { struct inode *inode = file_inode(vma->vm_file); - struct page *page = vmf->page; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_file_info *fi = vma->vm_file->private_data; struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct page *page = vmf->page; loff_t off = page_offset(page); - loff_t size, len; - int ret; - - /* Update time before taking page lock */ - file_update_time(vma->vm_file); + loff_t size = i_size_read(inode); + size_t len; + int want, got, ret; - size = i_size_read(inode); if (off + PAGE_CACHE_SIZE <= size) len = PAGE_CACHE_SIZE; else len = size & ~PAGE_CACHE_MASK; - dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode, - off, len, page, page->index); + dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n", + inode, ceph_vinop(inode), off, len, size); + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_BUFFER; + while (1) { + got = 0; + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len); + if (ret == 0) + break; + if (ret != -ERESTARTSYS) { + WARN_ON(1); + return VM_FAULT_SIGBUS; + } + } + dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", + inode, off, len, ceph_cap_string(got)); + + /* Update time before taking page lock */ + file_update_time(vma->vm_file); lock_page(page); @@ -1255,14 +1308,26 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = VM_FAULT_SIGBUS; } out: - dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret); - if (ret != VM_FAULT_LOCKED) + if (ret != VM_FAULT_LOCKED) { unlock_page(page); + } else { + int dirty; + spin_lock(&ci->i_ceph_lock); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + spin_unlock(&ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(inode, dirty); + } + + dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n", + inode, off, len, ceph_cap_string(got), ret); + ceph_put_cap_refs(ci, got); + return ret; } static struct vm_operations_struct ceph_vmops = { - .fault = filemap_fault, + .fault = ceph_filemap_fault, .page_mkwrite = ceph_page_mkwrite, .remap_pages = generic_file_remap_pages, }; -- cgit v1.1 From 7221fe4c2ed72804b28633c8e0217d65abb0023f Mon Sep 17 00:00:00 2001 From: Guangliang Zhao Date: Mon, 11 Nov 2013 15:18:03 +0800 Subject: ceph: add acl for cephfs Signed-off-by: Guangliang Zhao Reviewed-by: Li Wang Reviewed-by: Zheng Yan --- fs/ceph/Kconfig | 13 +++ fs/ceph/Makefile | 1 + fs/ceph/acl.c | 332 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ fs/ceph/caps.c | 1 + fs/ceph/dir.c | 5 + fs/ceph/inode.c | 11 ++ fs/ceph/super.c | 4 + fs/ceph/super.h | 37 ++++++- fs/ceph/xattr.c | 60 ++++++++-- 9 files changed, 451 insertions(+), 13 deletions(-) create mode 100644 fs/ceph/acl.c diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig index ac9a2ef..264e9bf 100644 --- a/fs/ceph/Kconfig +++ b/fs/ceph/Kconfig @@ -25,3 +25,16 @@ config CEPH_FSCACHE caching support for Ceph clients using FS-Cache endif + +config CEPH_FS_POSIX_ACL + bool "Ceph POSIX Access Control Lists" + depends on CEPH_FS + select FS_POSIX_ACL + help + POSIX Access Control Lists (ACLs) support permissions for users and + groups beyond the owner/group/world scheme. + + To learn more about Access Control Lists, visit the POSIX ACLs for + Linux website . + + If you don't know what Access Control Lists are, say N diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile index 32e3010..85a4230 100644 --- a/fs/ceph/Makefile +++ b/fs/ceph/Makefile @@ -10,3 +10,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \ debugfs.o ceph-$(CONFIG_CEPH_FSCACHE) += cache.o +ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c new file mode 100644 index 0000000..64fddbc --- /dev/null +++ b/fs/ceph/acl.c @@ -0,0 +1,332 @@ +/* + * linux/fs/ceph/acl.c + * + * Copyright (C) 2013 Guangliang Zhao, + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "super.h" + +static inline void ceph_set_cached_acl(struct inode *inode, + int type, struct posix_acl *acl) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + + spin_lock(&ci->i_ceph_lock); + if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0)) + set_cached_acl(inode, type, acl); + spin_unlock(&ci->i_ceph_lock); +} + +static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode, + int type) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct posix_acl *acl = ACL_NOT_CACHED; + + spin_lock(&ci->i_ceph_lock); + if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0)) + acl = get_cached_acl(inode, type); + spin_unlock(&ci->i_ceph_lock); + + return acl; +} + +void ceph_forget_all_cached_acls(struct inode *inode) +{ + forget_all_cached_acls(inode); +} + +struct posix_acl *ceph_get_acl(struct inode *inode, int type) +{ + int size; + const char *name; + char *value = NULL; + struct posix_acl *acl; + + if (!IS_POSIXACL(inode)) + return NULL; + + acl = ceph_get_cached_acl(inode, type); + if (acl != ACL_NOT_CACHED) + return acl; + + switch (type) { + case ACL_TYPE_ACCESS: + name = POSIX_ACL_XATTR_ACCESS; + break; + case ACL_TYPE_DEFAULT: + name = POSIX_ACL_XATTR_DEFAULT; + break; + default: + BUG(); + } + + size = __ceph_getxattr(inode, name, "", 0); + if (size > 0) { + value = kzalloc(size, GFP_NOFS); + if (!value) + return ERR_PTR(-ENOMEM); + size = __ceph_getxattr(inode, name, value, size); + } + + if (size > 0) + acl = posix_acl_from_xattr(&init_user_ns, value, size); + else if (size == -ERANGE || size == -ENODATA || size == 0) + acl = NULL; + else + acl = ERR_PTR(-EIO); + + kfree(value); + + if (!IS_ERR(acl)) + ceph_set_cached_acl(inode, type, acl); + + return acl; +} + +static int ceph_set_acl(struct dentry *dentry, struct inode *inode, + struct posix_acl *acl, int type) +{ + int ret = 0, size = 0; + const char *name = NULL; + char *value = NULL; + struct iattr newattrs; + umode_t new_mode = inode->i_mode, old_mode = inode->i_mode; + + if (acl) { + ret = posix_acl_valid(acl); + if (ret < 0) + goto out; + } + + switch (type) { + case ACL_TYPE_ACCESS: + name = POSIX_ACL_XATTR_ACCESS; + if (acl) { + ret = posix_acl_equiv_mode(acl, &new_mode); + if (ret < 0) + goto out; + if (ret == 0) + acl = NULL; + } + break; + case ACL_TYPE_DEFAULT: + if (!S_ISDIR(inode->i_mode)) { + ret = acl ? -EINVAL : 0; + goto out; + } + name = POSIX_ACL_XATTR_DEFAULT; + break; + default: + ret = -EINVAL; + goto out; + } + + if (acl) { + size = posix_acl_xattr_size(acl->a_count); + value = kmalloc(size, GFP_NOFS); + if (!value) { + ret = -ENOMEM; + goto out; + } + + ret = posix_acl_to_xattr(&init_user_ns, acl, value, size); + if (ret < 0) + goto out_free; + } + + if (new_mode != old_mode) { + newattrs.ia_mode = new_mode; + newattrs.ia_valid = ATTR_MODE; + ret = ceph_setattr(dentry, &newattrs); + if (ret) + goto out_free; + } + + if (value) + ret = __ceph_setxattr(dentry, name, value, size, 0); + else + ret = __ceph_removexattr(dentry, name); + + if (ret) { + if (new_mode != old_mode) { + newattrs.ia_mode = old_mode; + newattrs.ia_valid = ATTR_MODE; + ceph_setattr(dentry, &newattrs); + } + goto out_free; + } + + ceph_set_cached_acl(inode, type, acl); + +out_free: + kfree(value); +out: + return ret; +} + +int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir) +{ + struct posix_acl *acl = NULL; + int ret = 0; + + if (!S_ISLNK(inode->i_mode)) { + if (IS_POSIXACL(dir)) { + acl = ceph_get_acl(dir, ACL_TYPE_DEFAULT); + if (IS_ERR(acl)) { + ret = PTR_ERR(acl); + goto out; + } + } + + if (!acl) + inode->i_mode &= ~current_umask(); + } + + if (IS_POSIXACL(dir) && acl) { + if (S_ISDIR(inode->i_mode)) { + ret = ceph_set_acl(dentry, inode, acl, + ACL_TYPE_DEFAULT); + if (ret) + goto out_release; + } + ret = posix_acl_create(&acl, GFP_NOFS, &inode->i_mode); + if (ret < 0) + goto out; + else if (ret > 0) + ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS); + else + cache_no_acl(inode); + } else { + cache_no_acl(inode); + } + +out_release: + posix_acl_release(acl); +out: + return ret; +} + +int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) +{ + struct posix_acl *acl; + int ret = 0; + + if (S_ISLNK(inode->i_mode)) { + ret = -EOPNOTSUPP; + goto out; + } + + if (!IS_POSIXACL(inode)) + goto out; + + acl = ceph_get_acl(inode, ACL_TYPE_ACCESS); + if (IS_ERR_OR_NULL(acl)) { + ret = PTR_ERR(acl); + goto out; + } + + ret = posix_acl_chmod(&acl, GFP_KERNEL, inode->i_mode); + if (ret) + goto out; + ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS); + posix_acl_release(acl); +out: + return ret; +} + +static int ceph_xattr_acl_get(struct dentry *dentry, const char *name, + void *value, size_t size, int type) +{ + struct posix_acl *acl; + int ret = 0; + + if (!IS_POSIXACL(dentry->d_inode)) + return -EOPNOTSUPP; + + acl = ceph_get_acl(dentry->d_inode, type); + if (IS_ERR(acl)) + return PTR_ERR(acl); + if (acl == NULL) + return -ENODATA; + + ret = posix_acl_to_xattr(&init_user_ns, acl, value, size); + posix_acl_release(acl); + + return ret; +} + +static int ceph_xattr_acl_set(struct dentry *dentry, const char *name, + const void *value, size_t size, int flags, int type) +{ + int ret = 0; + struct posix_acl *acl = NULL; + + if (!inode_owner_or_capable(dentry->d_inode)) { + ret = -EPERM; + goto out; + } + + if (!IS_POSIXACL(dentry->d_inode)) { + ret = -EOPNOTSUPP; + goto out; + } + + if (value) { + acl = posix_acl_from_xattr(&init_user_ns, value, size); + if (IS_ERR(acl)) { + ret = PTR_ERR(acl); + goto out; + } + + if (acl) { + ret = posix_acl_valid(acl); + if (ret) + goto out_release; + } + } + + ret = ceph_set_acl(dentry, dentry->d_inode, acl, type); + +out_release: + posix_acl_release(acl); +out: + return ret; +} + +const struct xattr_handler ceph_xattr_acl_default_handler = { + .prefix = POSIX_ACL_XATTR_DEFAULT, + .flags = ACL_TYPE_DEFAULT, + .get = ceph_xattr_acl_get, + .set = ceph_xattr_acl_set, +}; + +const struct xattr_handler ceph_xattr_acl_access_handler = { + .prefix = POSIX_ACL_XATTR_ACCESS, + .flags = ACL_TYPE_ACCESS, + .get = ceph_xattr_acl_get, + .set = ceph_xattr_acl_set, +}; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 3c0a4bd..9289c6b2 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2464,6 +2464,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ceph_buffer_put(ci->i_xattrs.blob); ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); ci->i_xattrs.version = version; + ceph_forget_all_cached_acls(inode); } } diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 2a0bcae..b629e9d 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -693,6 +693,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); + + if (!err) + err = ceph_init_acl(dentry, dentry->d_inode, dir); + if (err) d_drop(dentry); return err; @@ -1293,6 +1297,7 @@ const struct inode_operations ceph_dir_iops = { .getxattr = ceph_getxattr, .listxattr = ceph_listxattr, .removexattr = ceph_removexattr, + .get_acl = ceph_get_acl, .mknod = ceph_mknod, .symlink = ceph_symlink, .mkdir = ceph_mkdir, diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index d37b2dc..a808bfb 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -95,6 +95,7 @@ const struct inode_operations ceph_file_iops = { .getxattr = ceph_getxattr, .listxattr = ceph_listxattr, .removexattr = ceph_removexattr, + .get_acl = ceph_get_acl, }; @@ -680,6 +681,7 @@ static int fill_inode(struct inode *inode, memcpy(ci->i_xattrs.blob->vec.iov_base, iinfo->xattr_data, iinfo->xattr_len); ci->i_xattrs.version = le64_to_cpu(info->xattr_version); + ceph_forget_all_cached_acls(inode); xattr_blob = NULL; } @@ -1612,6 +1614,7 @@ static const struct inode_operations ceph_symlink_iops = { .getxattr = ceph_getxattr, .listxattr = ceph_listxattr, .removexattr = ceph_removexattr, + .get_acl = ceph_get_acl, }; /* @@ -1685,6 +1688,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) dirtied |= CEPH_CAP_AUTH_EXCL; } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 || attr->ia_mode != inode->i_mode) { + inode->i_mode = attr->ia_mode; req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode); mask |= CEPH_SETATTR_MODE; release |= CEPH_CAP_AUTH_SHARED; @@ -1800,6 +1804,12 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (inode_dirty_flags) __mark_inode_dirty(inode, inode_dirty_flags); + if (ia_valid & ATTR_MODE) { + err = ceph_acl_chmod(dentry, inode); + if (err) + goto out_put; + } + if (mask) { req->r_inode = inode; ihold(inode); @@ -1819,6 +1829,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) return err; out: spin_unlock(&ci->i_ceph_lock); +out_put: ceph_mdsc_put_request(req); return err; } diff --git a/fs/ceph/super.c b/fs/ceph/super.c index e58bd4a..c6740e4 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -819,7 +819,11 @@ static int ceph_set_super(struct super_block *s, void *data) s->s_flags = fsc->mount_options->sb_flags; s->s_maxbytes = 1ULL << 40; /* temp value until we get mdsmap */ +#ifdef CONFIG_CEPH_FS_POSIX_ACL + s->s_flags |= MS_POSIXACL; +#endif + s->s_xattr = ceph_xattr_handlers; s->s_fs_info = fsc; fsc->sb = s; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 8de94b5..7fa78a7 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -335,7 +335,6 @@ struct ceph_inode_info { u32 i_fscache_gen; /* sequence, for delayed fscache validate */ struct work_struct i_revalidate_work; #endif - struct inode vfs_inode; /* at end */ }; @@ -725,6 +724,9 @@ extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, /* xattr.c */ extern int ceph_setxattr(struct dentry *, const char *, const void *, size_t, int); +int __ceph_setxattr(struct dentry *, const char *, const void *, size_t, int); +ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t); +int __ceph_removexattr(struct dentry *, const char *); extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t); extern ssize_t ceph_listxattr(struct dentry *, char *, size_t); extern int ceph_removexattr(struct dentry *, const char *); @@ -733,6 +735,39 @@ extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci); extern void __init ceph_xattr_init(void); extern void ceph_xattr_exit(void); +/* acl.c */ +extern const struct xattr_handler ceph_xattr_acl_access_handler; +extern const struct xattr_handler ceph_xattr_acl_default_handler; +extern const struct xattr_handler *ceph_xattr_handlers[]; + +#ifdef CONFIG_CEPH_FS_POSIX_ACL + +struct posix_acl *ceph_get_acl(struct inode *, int); +int ceph_init_acl(struct dentry *, struct inode *, struct inode *); +int ceph_acl_chmod(struct dentry *, struct inode *); +void ceph_forget_all_cached_acls(struct inode *inode); + +#else + +#define ceph_get_acl NULL + +static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode, + struct inode *dir) +{ + return 0; +} + +static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) +{ + return 0; +} + +static inline void ceph_forget_all_cached_acls(struct inode *inode) +{ +} + +#endif + /* caps.c */ extern const char *ceph_cap_string(int c); extern void ceph_handle_caps(struct ceph_mds_session *session, diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index be661d8..c7581f37 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -11,11 +11,24 @@ #define XATTR_CEPH_PREFIX "ceph." #define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1) +/* + * List of handlers for synthetic system.* attributes. Other + * attributes are handled directly. + */ +const struct xattr_handler *ceph_xattr_handlers[] = { +#ifdef CONFIG_CEPH_FS_POSIX_ACL + &ceph_xattr_acl_access_handler, + &ceph_xattr_acl_default_handler, +#endif + NULL, +}; + static bool ceph_is_valid_xattr(const char *name) { return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) || !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN) || + !strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN) || !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) || !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN); } @@ -663,10 +676,9 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci) } } -ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, +ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, size_t size) { - struct inode *inode = dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); int err; struct ceph_inode_xattr *xattr; @@ -675,7 +687,6 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, if (!ceph_is_valid_xattr(name)) return -ENODATA; - /* let's see if a virtual xattr was requested */ vxattr = ceph_match_vxattr(inode, name); if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { @@ -725,6 +736,15 @@ out: return err; } +ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, + size_t size) +{ + if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) + return generic_getxattr(dentry, name, value, size); + + return __ceph_getxattr(dentry->d_inode, name, value, size); +} + ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) { struct inode *inode = dentry->d_inode; @@ -863,8 +883,8 @@ out: return err; } -int ceph_setxattr(struct dentry *dentry, const char *name, - const void *value, size_t size, int flags) +int __ceph_setxattr(struct dentry *dentry, const char *name, + const void *value, size_t size, int flags) { struct inode *inode = dentry->d_inode; struct ceph_vxattr *vxattr; @@ -879,9 +899,6 @@ int ceph_setxattr(struct dentry *dentry, const char *name, struct ceph_inode_xattr *xattr = NULL; int required_blob_size; - if (ceph_snap(inode) != CEPH_NOSNAP) - return -EROFS; - if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -958,6 +975,18 @@ out: return err; } +int ceph_setxattr(struct dentry *dentry, const char *name, + const void *value, size_t size, int flags) +{ + if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP) + return -EROFS; + + if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) + return generic_setxattr(dentry, name, value, size, flags); + + return __ceph_setxattr(dentry, name, value, size, flags); +} + static int ceph_send_removexattr(struct dentry *dentry, const char *name) { struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); @@ -984,7 +1013,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name) return err; } -int ceph_removexattr(struct dentry *dentry, const char *name) +int __ceph_removexattr(struct dentry *dentry, const char *name) { struct inode *inode = dentry->d_inode; struct ceph_vxattr *vxattr; @@ -994,9 +1023,6 @@ int ceph_removexattr(struct dentry *dentry, const char *name) int required_blob_size; int dirty; - if (ceph_snap(inode) != CEPH_NOSNAP) - return -EROFS; - if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -1053,3 +1079,13 @@ out: return err; } +int ceph_removexattr(struct dentry *dentry, const char *name) +{ + if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP) + return -EROFS; + + if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) + return generic_removexattr(dentry, name); + + return __ceph_removexattr(dentry, name); +} -- cgit v1.1 From 3f42bc4beadef554fd0d4e6408e9142da268613b Mon Sep 17 00:00:00 2001 From: Li Wang Date: Thu, 19 Dec 2013 06:03:48 -0800 Subject: ceph fscache: Introduce a routine for uncaching single no data page from fscache Signed-off-by: Li Wang Reviewed-by: Milosz Tanski --- fs/ceph/cache.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h index ba94940..da95f61 100644 --- a/fs/ceph/cache.h +++ b/fs/ceph/cache.h @@ -67,6 +67,14 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp) return fscache_maybe_release_page(ci->fscache, page, gfp); } +static inline void ceph_fscache_readpage_cancel(struct inode *inode, + struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + if (fscache_cookie_valid(ci->fscache) && PageFsCache(page)) + __fscache_uncache_page(ci->fscache, page); +} + static inline void ceph_fscache_readpages_cancel(struct inode *inode, struct list_head *pages) { @@ -145,6 +153,11 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp) return 1; } +static inline void ceph_fscache_readpage_cancel(struct inode *inode, + struct page *page) +{ +} + static inline void ceph_fscache_readpages_cancel(struct inode *inode, struct list_head *pages) { -- cgit v1.1 From 183028052b48db2b34c09fd54f0bc465eaa305eb Mon Sep 17 00:00:00 2001 From: Li Wang Date: Thu, 19 Dec 2013 06:03:49 -0800 Subject: ceph fscache: Uncaching no data page from fscache in readpage() Currently, if one new page allocated into fscache in readpage(), however, with no data read into due to error encountered during reading from OSDs, the slot in fscache is not uncached. This patch fixes this. Signed-off-by: Li Wang Reviewed-by: Milosz Tanski --- fs/ceph/addr.c | 1 + 1 file changed, 1 insertion(+) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index ebda329..791a9a2 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -209,6 +209,7 @@ static int readpage_nounlock(struct file *filp, struct page *page) err = 0; if (err < 0) { SetPageError(page); + ceph_fscache_readpage_cancel(inode, page); goto out; } else { if (err < PAGE_CACHE_SIZE) { -- cgit v1.1 From 7e513d43669a0505ee3b122344176147a674bcbf Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 16 Dec 2013 19:26:32 +0200 Subject: rbd: enable extended devt in single-major mode If single-major device number allocation scheme is turned on, instead of reserving 256 minors per device, which imposes a limit of 4096 images mapped at once, reserve 16 minors per device and enable extended devt feature. This results in a theoretical limit of 65536 images mapped at once, and still allows to have more than 15 partititions: partitions starting with 16th are mapped under major 259 (Block Extended Major): $ rbd showmapped id pool image snap device 0 rbd b5 - /dev/rbd0 # no partitions 1 rbd b2 - /dev/rbd1 # 40 partitions 2 rbd b3 - /dev/rbd2 # 2 partitions $ cat /proc/partitions 251 0 1024 rbd0 251 16 1024 rbd1 251 17 0 rbd1p1 251 18 0 rbd1p2 ... 251 30 0 rbd1p14 251 31 0 rbd1p15 259 0 0 rbd1p16 259 1 0 rbd1p17 ... 259 23 0 rbd1p39 259 24 0 rbd1p40 251 32 1024 rbd2 251 33 0 rbd2p1 251 34 0 rbd2p2 (major 251 was assigned dynamically at module load time) Signed-off-by: Ilya Dryomov Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e5ddcb5..11ae4c1 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -91,7 +91,8 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_DRV_NAME "rbd" -#define RBD_PART_SHIFT 8 +#define RBD_MINORS_PER_MAJOR 256 +#define RBD_SINGLE_MAJOR_PART_SHIFT 4 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" #define RBD_MAX_SNAP_NAME_LEN \ @@ -415,12 +416,12 @@ static void rbd_spec_put(struct rbd_spec *spec); static int rbd_dev_id_to_minor(int dev_id) { - return dev_id << RBD_PART_SHIFT; + return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT; } static int minor_to_rbd_dev_id(int minor) { - return minor >> RBD_PART_SHIFT; + return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; } static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); @@ -3434,7 +3435,9 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) u64 segment_size; /* create gendisk info */ - disk = alloc_disk(1 << RBD_PART_SHIFT); + disk = alloc_disk(single_major ? + (1 << RBD_SINGLE_MAJOR_PART_SHIFT) : + RBD_MINORS_PER_MAJOR); if (!disk) return -ENOMEM; @@ -3442,6 +3445,8 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) rbd_dev->dev_id); disk->major = rbd_dev->major; disk->first_minor = rbd_dev->minor; + if (single_major) + disk->flags |= GENHD_FL_EXT_DEVT; disk->fops = &rbd_bd_ops; disk->private_data = rbd_dev; -- cgit v1.1 From 527a88b9ef287a5c77f016a0090e0bd5338325b1 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 26 Dec 2013 08:37:43 -0600 Subject: MAINTAINERS: update an e-mail address I no longer have direct access to my Inktank e-mail. I still pay attention to rbd, so update its entry in MAINTAINERS accordingly. Signed-off-by: Alex Elder Signed-off-by: Sage Weil --- MAINTAINERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MAINTAINERS b/MAINTAINERS index 13c15c8..8ade3c9 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -6962,7 +6962,7 @@ F: drivers/media/parport/*-qcam* RADOS BLOCK DEVICE (RBD) M: Yehuda Sadeh M: Sage Weil -M: Alex Elder +M: Alex Elder M: ceph-devel@vger.kernel.org W: http://ceph.com/ T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git -- cgit v1.1 From fca270653909404112ea5f6eed274ed5272d5252 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 16 Dec 2013 18:02:40 +0200 Subject: rbd: introduce rbd_dev_header_unwatch_sync() and switch to it Rename rbd_dev_header_watch_sync() to __rbd_dev_header_watch_sync() and introduce two helpers: rbd_dev_header_{,un}watch_sync() to make it more clear what is going on. Signed-off-by: Ilya Dryomov Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 11ae4c1..d078d23 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2913,7 +2913,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) * Request sync osd watch/unwatch. The value of "start" determines * whether a watch request is being initiated or torn down. */ -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) +static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; @@ -2988,6 +2988,22 @@ out_cancel: return ret; } +static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) +{ + return __rbd_dev_header_watch_sync(rbd_dev, true); +} + +static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +{ + int ret; + + ret = __rbd_dev_header_watch_sync(rbd_dev, false); + if (ret) { + rbd_warn(rbd_dev, "unable to tear down watch request: %d\n", + ret); + } +} + /* * Synchronous osd object method call. Returns the number of bytes * returned in the outbound buffer, or a negative error code. @@ -5003,7 +5019,6 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev) static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) { int ret; - int tmp; /* * Get the id from the image id object. Unless there's an @@ -5022,7 +5037,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) goto err_out_format; if (mapping) { - ret = rbd_dev_header_watch_sync(rbd_dev, true); + ret = rbd_dev_header_watch_sync(rbd_dev); if (ret) goto out_header_name; } @@ -5049,12 +5064,8 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) err_out_probe: rbd_dev_unprobe(rbd_dev); err_out_watch: - if (mapping) { - tmp = rbd_dev_header_watch_sync(rbd_dev, false); - if (tmp) - rbd_warn(rbd_dev, "unable to tear down " - "watch request (%d)\n", tmp); - } + if (mapping) + rbd_dev_header_unwatch_sync(rbd_dev); out_header_name: kfree(rbd_dev->header_name); rbd_dev->header_name = NULL; @@ -5250,16 +5261,14 @@ static ssize_t do_rbd_remove(struct bus_type *bus, if (ret < 0 || already) return ret; - ret = rbd_dev_header_watch_sync(rbd_dev, false); - if (ret) - rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret); - + rbd_dev_header_unwatch_sync(rbd_dev); /* * flush remaining watch callbacks - these must be complete * before the osd_client is shutdown */ dout("%s: flushing notifies", __func__); ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); + /* * Don't free anything from rbd_dev->disk until after all * notifies are completely processed. Otherwise -- cgit v1.1 From e37180c0f2f0c5b21e9295d5b19874ff4a659be1 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 16 Dec 2013 18:02:41 +0200 Subject: rbd: tear down watch request if rbd_dev_device_setup() fails Tear down watch request if rbd_dev_device_setup() fails. Signed-off-by: Ilya Dryomov Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index d078d23..72a7eec 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -5143,6 +5143,12 @@ static ssize_t do_rbd_add(struct bus_type *bus, rc = rbd_dev_device_setup(rbd_dev); if (rc) { + /* + * rbd_dev_header_unwatch_sync() can't be moved into + * rbd_dev_image_release() without refactoring, see + * commit 1f3ef78861ac. + */ + rbd_dev_header_unwatch_sync(rbd_dev); rbd_dev_image_release(rbd_dev); goto err_out_module; } -- cgit v1.1 From 12b4629a9fb80fecaebadc217b13b8776ed8dbef Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:23 +0200 Subject: libceph: all features fields must be u64 In preparation for ceph_features.h update, change all features fields from unsigned int/u32 to u64. (ceph.git has ~40 feature bits at this point.) Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- fs/ceph/mds_client.c | 14 +++++++------- fs/ceph/super.c | 4 ++-- include/linux/ceph/libceph.h | 8 ++++---- include/linux/ceph/messenger.h | 10 +++++----- net/ceph/ceph_common.c | 4 ++-- net/ceph/messenger.c | 4 ++-- 6 files changed, 22 insertions(+), 22 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index d90861f..4a13f6e 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -63,7 +63,7 @@ static const struct ceph_connection_operations mds_con_ops; */ static int parse_reply_info_in(void **p, void *end, struct ceph_mds_reply_info_in *info, - int features) + u64 features) { int err = -EIO; @@ -98,7 +98,7 @@ bad: */ static int parse_reply_info_trace(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { int err; @@ -145,7 +145,7 @@ out_bad: */ static int parse_reply_info_dir(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { u32 num, i = 0; int err; @@ -217,7 +217,7 @@ out_bad: */ static int parse_reply_info_filelock(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (*p + sizeof(*info->filelock_reply) > end) goto bad; @@ -238,7 +238,7 @@ bad: */ static int parse_reply_info_create(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { if (*p == end) { @@ -262,7 +262,7 @@ bad: */ static int parse_reply_info_extra(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (info->head->op == CEPH_MDS_OP_GETFILELOCK) return parse_reply_info_filelock(p, end, info, features); @@ -280,7 +280,7 @@ static int parse_reply_info_extra(void **p, void *end, */ static int parse_reply_info(struct ceph_msg *msg, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { void *p, *end; u32 len; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index c6740e4..2df963f 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -490,10 +490,10 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, struct ceph_options *opt) { struct ceph_fs_client *fsc; - const unsigned supported_features = + const u64 supported_features = CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH; - const unsigned required_features = 0; + const u64 required_features = 0; int page_count; size_t size; int err = -ENOMEM; diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 2e30248..7d704db 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -122,8 +122,8 @@ struct ceph_client { int (*extra_mon_dispatch)(struct ceph_client *, struct ceph_msg *); - u32 supported_features; - u32 required_features; + u64 supported_features; + u64 required_features; struct ceph_messenger msgr; /* messenger instance */ struct ceph_mon_client monc; @@ -192,8 +192,8 @@ extern int ceph_compare_options(struct ceph_options *new_opt, struct ceph_client *client); extern struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, - unsigned supported_features, - unsigned required_features); + u64 supported_features, + u64 required_features); extern u64 ceph_client_id(struct ceph_client *client); extern void ceph_destroy_client(struct ceph_client *client); extern int __ceph_open_session(struct ceph_client *client, diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 7c1420b..c1d3f5a 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -60,8 +60,8 @@ struct ceph_messenger { u32 global_seq; spinlock_t global_seq_lock; - u32 supported_features; - u32 required_features; + u64 supported_features; + u64 required_features; }; enum ceph_msg_data_type { @@ -192,7 +192,7 @@ struct ceph_connection { struct ceph_entity_name peer_name; /* peer name */ - unsigned peer_features; + u64 peer_features; u32 connect_seq; /* identify the most recent connection attempt for this connection, client */ u32 peer_global_seq; /* peer's global seq for this connection */ @@ -256,8 +256,8 @@ extern void ceph_msgr_flush(void); extern void ceph_messenger_init(struct ceph_messenger *msgr, struct ceph_entity_addr *myaddr, - u32 supported_features, - u32 required_features, + u64 supported_features, + u64 required_features, bool nocrc); extern void ceph_con_init(struct ceph_connection *con, void *private, diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 34b11ee..43d8177 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -461,8 +461,8 @@ EXPORT_SYMBOL(ceph_client_id); * create a fresh client instance */ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, - unsigned int supported_features, - unsigned int required_features) + u64 supported_features, + u64 required_features) { struct ceph_client *client; struct ceph_entity_addr *myaddr = NULL; diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 4a5df7b..2606705 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2853,8 +2853,8 @@ static void con_fault(struct ceph_connection *con) */ void ceph_messenger_init(struct ceph_messenger *msgr, struct ceph_entity_addr *myaddr, - u32 supported_features, - u32 required_features, + u64 supported_features, + u64 required_features, bool nocrc) { msgr->supported_features = supported_features; -- cgit v1.1 From 2b3e0c905af43cfe402a2ef3f800be5dc1684005 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:24 +0200 Subject: libceph: update ceph_features.h This updates ceph_features.h so that it has all feature bits defined in ceph.git. In the interim since the last update, ceph.git crossed the "32 feature bits" point, and, the addition of the 33rd bit wasn't handled correctly. The work-around is squashed into this commit and reflects ceph.git commit 053659d05e0349053ef703b414f44965f368b9f0. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/ceph_features.h | 96 +++++++++++++++++++++++++------------- net/ceph/messenger.c | 4 +- 2 files changed, 67 insertions(+), 33 deletions(-) diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index 4c420803..003ea71 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -4,42 +4,73 @@ /* * feature bits */ -#define CEPH_FEATURE_UID (1<<0) -#define CEPH_FEATURE_NOSRCADDR (1<<1) -#define CEPH_FEATURE_MONCLOCKCHECK (1<<2) -#define CEPH_FEATURE_FLOCK (1<<3) -#define CEPH_FEATURE_SUBSCRIBE2 (1<<4) -#define CEPH_FEATURE_MONNAMES (1<<5) -#define CEPH_FEATURE_RECONNECT_SEQ (1<<6) -#define CEPH_FEATURE_DIRLAYOUTHASH (1<<7) -#define CEPH_FEATURE_OBJECTLOCATOR (1<<8) -#define CEPH_FEATURE_PGID64 (1<<9) -#define CEPH_FEATURE_INCSUBOSDMAP (1<<10) -#define CEPH_FEATURE_PGPOOL3 (1<<11) -#define CEPH_FEATURE_OSDREPLYMUX (1<<12) -#define CEPH_FEATURE_OSDENC (1<<13) -#define CEPH_FEATURE_OMAP (1<<14) -#define CEPH_FEATURE_MONENC (1<<15) -#define CEPH_FEATURE_QUERY_T (1<<16) -#define CEPH_FEATURE_INDEP_PG_MAP (1<<17) -#define CEPH_FEATURE_CRUSH_TUNABLES (1<<18) -#define CEPH_FEATURE_CHUNKY_SCRUB (1<<19) -#define CEPH_FEATURE_MON_NULLROUTE (1<<20) -#define CEPH_FEATURE_MON_GV (1<<21) -#define CEPH_FEATURE_BACKFILL_RESERVATION (1<<22) -#define CEPH_FEATURE_MSG_AUTH (1<<23) -#define CEPH_FEATURE_RECOVERY_RESERVATION (1<<24) -#define CEPH_FEATURE_CRUSH_TUNABLES2 (1<<25) -#define CEPH_FEATURE_CREATEPOOLID (1<<26) -#define CEPH_FEATURE_REPLY_CREATE_INODE (1<<27) -#define CEPH_FEATURE_OSD_HBMSGS (1<<28) -#define CEPH_FEATURE_MDSENC (1<<29) -#define CEPH_FEATURE_OSDHASHPSPOOL (1<<30) +#define CEPH_FEATURE_UID (1ULL<<0) +#define CEPH_FEATURE_NOSRCADDR (1ULL<<1) +#define CEPH_FEATURE_MONCLOCKCHECK (1ULL<<2) +#define CEPH_FEATURE_FLOCK (1ULL<<3) +#define CEPH_FEATURE_SUBSCRIBE2 (1ULL<<4) +#define CEPH_FEATURE_MONNAMES (1ULL<<5) +#define CEPH_FEATURE_RECONNECT_SEQ (1ULL<<6) +#define CEPH_FEATURE_DIRLAYOUTHASH (1ULL<<7) +#define CEPH_FEATURE_OBJECTLOCATOR (1ULL<<8) +#define CEPH_FEATURE_PGID64 (1ULL<<9) +#define CEPH_FEATURE_INCSUBOSDMAP (1ULL<<10) +#define CEPH_FEATURE_PGPOOL3 (1ULL<<11) +#define CEPH_FEATURE_OSDREPLYMUX (1ULL<<12) +#define CEPH_FEATURE_OSDENC (1ULL<<13) +#define CEPH_FEATURE_OMAP (1ULL<<14) +#define CEPH_FEATURE_MONENC (1ULL<<15) +#define CEPH_FEATURE_QUERY_T (1ULL<<16) +#define CEPH_FEATURE_INDEP_PG_MAP (1ULL<<17) +#define CEPH_FEATURE_CRUSH_TUNABLES (1ULL<<18) +#define CEPH_FEATURE_CHUNKY_SCRUB (1ULL<<19) +#define CEPH_FEATURE_MON_NULLROUTE (1ULL<<20) +#define CEPH_FEATURE_MON_GV (1ULL<<21) +#define CEPH_FEATURE_BACKFILL_RESERVATION (1ULL<<22) +#define CEPH_FEATURE_MSG_AUTH (1ULL<<23) +#define CEPH_FEATURE_RECOVERY_RESERVATION (1ULL<<24) +#define CEPH_FEATURE_CRUSH_TUNABLES2 (1ULL<<25) +#define CEPH_FEATURE_CREATEPOOLID (1ULL<<26) +#define CEPH_FEATURE_REPLY_CREATE_INODE (1ULL<<27) +#define CEPH_FEATURE_OSD_HBMSGS (1ULL<<28) +#define CEPH_FEATURE_MDSENC (1ULL<<29) +#define CEPH_FEATURE_OSDHASHPSPOOL (1ULL<<30) +#define CEPH_FEATURE_MON_SINGLE_PAXOS (1ULL<<31) +#define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32) +#define CEPH_FEATURE_MON_SCRUB (1ULL<<33) +#define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<34) +#define CEPH_FEATURE_OSD_CACHEPOOL (1ULL<<35) +#define CEPH_FEATURE_CRUSH_V2 (1ULL<<36) /* new indep; SET_* steps */ +#define CEPH_FEATURE_EXPORT_PEER (1ULL<<37) +#define CEPH_FEATURE_OSD_ERASURE_CODES (1ULL<<38) + +/* + * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature + * vector to evaluate to 64 bit ~0. To cope, we designate 1ULL << 63 + * to mean 33 bit ~0, and introduce a helper below to do the + * translation. + * + * This was introduced by ceph.git commit + * 9ea02b84104045c2ffd7e7f4e7af512953855ecd v0.58-657-g9ea02b8 + * and fixed by ceph.git commit + * 4255b5c2fb54ae40c53284b3ab700fdfc7e61748 v0.65-263-g4255b5c + */ +#define CEPH_FEATURE_RESERVED (1ULL<<63) + +static inline u64 ceph_sanitize_features(u64 features) +{ + if (features & CEPH_FEATURE_RESERVED) { + /* everything through OSD_SNAPMAPPER */ + return 0x1ffffffffull; + } else { + return features; + } +} /* * Features supported. */ -#define CEPH_FEATURES_SUPPORTED_DEFAULT \ +#define CEPH_FEATURES_SUPPORTED_DEFAULT \ (CEPH_FEATURE_NOSRCADDR | \ CEPH_FEATURE_RECONNECT_SEQ | \ CEPH_FEATURE_PGID64 | \ @@ -56,4 +87,5 @@ CEPH_FEATURE_PGID64 | \ CEPH_FEATURE_PGPOOL3 | \ CEPH_FEATURE_OSDENC) + #endif diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 2606705..bd172e1 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -1945,7 +1946,8 @@ static int process_connect(struct ceph_connection *con) { u64 sup_feat = con->msgr->supported_features; u64 req_feat = con->msgr->required_features; - u64 server_feat = le64_to_cpu(con->in_reply.features); + u64 server_feat = ceph_sanitize_features( + le64_to_cpu(con->in_reply.features)); int ret; dout("process_connect on %p tag %d\n", con, (int)con->in_tag); -- cgit v1.1 From b3b33b0e43323af4fb697f4378218d3c268d02cd Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:24 +0200 Subject: crush: pass weight vector size to map function Pass the size of the weight vector into crush_do_rule() to ensure that we don't access values past the end. This can happen if the caller misbehaves and passes a weight vector that is smaller than max_devices. Currently the monitor tries to prevent that from happening, but this will gracefully tolerate previous bad osdmaps that got into this state. It's also a bit more defensive. Reflects ceph.git commit 5922e2c2b8335b5e46c9504349c3a55b7434c01a. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/crush/mapper.h | 2 +- net/ceph/crush/mapper.c | 17 ++++++++++++----- net/ceph/osdmap.c | 2 +- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h index 5772dee..69310b0 100644 --- a/include/linux/crush/mapper.h +++ b/include/linux/crush/mapper.h @@ -14,6 +14,6 @@ extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, i extern int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - const __u32 *weights); + const __u32 *weights, int weight_max); #endif diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index cbd06a9..18d2cf6 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -264,8 +264,12 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r) * true if device is marked "out" (failed, fully offloaded) * of the cluster */ -static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x) +static int is_out(const struct crush_map *map, + const __u32 *weight, int weight_max, + int item, int x) { + if (item >= weight_max) + return 1; if (weight[item] >= 0x10000) return 0; if (weight[item] == 0) @@ -292,7 +296,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in */ static int crush_choose(const struct crush_map *map, struct crush_bucket *bucket, - const __u32 *weight, + const __u32 *weight, int weight_max, int x, int numrep, int type, int *out, int outpos, int firstn, int recurse_to_leaf, @@ -396,7 +400,7 @@ static int crush_choose(const struct crush_map *map, if (item < 0) { if (crush_choose(map, map->buckets[-1-item], - weight, + weight, weight_max, x, outpos+1, 0, out2, outpos, firstn, 0, @@ -414,6 +418,7 @@ static int crush_choose(const struct crush_map *map, /* out? */ if (itemtype == 0) reject = is_out(map, weight, + weight_max, item, x); else reject = 0; @@ -470,10 +475,12 @@ reject: * @x: hash input * @result: pointer to result vector * @result_max: maximum result size + * @weight: weight vector (for map leaves) + * @weight_max: size of weight vector */ int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - const __u32 *weight) + const __u32 *weight, int weight_max) { int result_len; int a[CRUSH_MAX_SET]; @@ -545,7 +552,7 @@ int crush_do_rule(const struct crush_map *map, j = 0; osize += crush_choose(map, map->buckets[-1-w[i]], - weight, + weight, weight_max, x, numrep, curstep->arg2, o+osize, j, diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index dbd9a47..6477a68 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1165,7 +1165,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, } r = crush_do_rule(osdmap->crush, ruleno, pps, osds, min_t(int, pool->size, *num), - osdmap->osd_weight); + osdmap->osd_weight, osdmap->max_osd); if (r < 0) { pr_err("error %d from crush rule: pool %lld ruleset %d type %d" " size %d\n", r, pgid.pool, pool->crush_ruleset, -- cgit v1.1 From bfb16d7d69f0272451ad85a6e50aab3c4262fbc0 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:24 +0200 Subject: crush: factor out (trivial) crush_destroy_rule() Reflects ceph.git commit 43a01c9973c4b83f2eaa98be87429941a227ddde. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/crush/crush.h | 1 + net/ceph/crush/crush.c | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 6a1101f..09561a0 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -174,6 +174,7 @@ extern void crush_destroy_bucket_list(struct crush_bucket_list *b); extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b); extern void crush_destroy_bucket_straw(struct crush_bucket_straw *b); extern void crush_destroy_bucket(struct crush_bucket *b); +extern void crush_destroy_rule(struct crush_rule *r); extern void crush_destroy(struct crush_map *map); static inline int crush_calc_tree_node(int i) diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index 0896132..16bc199d 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -116,11 +116,14 @@ void crush_destroy(struct crush_map *map) if (map->rules) { __u32 b; for (b = 0; b < map->max_rules; b++) - kfree(map->rules[b]); + crush_destroy_rule(map->rules[b]); kfree(map->rules); } kfree(map); } - +void crush_destroy_rule(struct crush_rule *rule) +{ + kfree(rule); +} -- cgit v1.1 From 8f99c85b7ad55faecaab9d29fd7d12f1601c2eea Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:24 +0200 Subject: crush: reduce scope of some local variables Reflects ceph.git commit e7d47827f0333c96ad43d257607fb92ed4176550. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/crush/mapper.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 18d2cf6..71192b1 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -189,7 +189,7 @@ static int terminal(int x) static int bucket_tree_choose(struct crush_bucket_tree *bucket, int x, int r) { - int n, l; + int n; __u32 w; __u64 t; @@ -197,6 +197,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket, n = bucket->num_nodes >> 1; while (!terminal(n)) { + int l; /* pick point in [0, w) */ w = bucket->node_weights[n]; t = (__u64)crush_hash32_4(bucket->h.hash, x, n, r, @@ -496,7 +497,6 @@ int crush_do_rule(const struct crush_map *map, __u32 step; int i, j; int numrep; - int firstn; const int descend_once = 0; if ((__u32)ruleno >= map->max_rules) { @@ -510,9 +510,9 @@ int crush_do_rule(const struct crush_map *map, o = b; for (step = 0; step < rule->len; step++) { + int firstn = 0; struct crush_rule_step *curstep = &rule->steps[step]; - firstn = 0; switch (curstep->op) { case CRUSH_RULE_TAKE: w[0] = curstep->arg1; -- cgit v1.1 From 2a4ba74ef67ad3a1645d78487ed7ccd0f40063c0 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:24 +0200 Subject: crush: fix some comments Reflects ceph.git commit 3cef755428761f2481b1dd0e0fbd0464ac483fc5. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/crush/mapper.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 71192b1..82cab7d 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -565,7 +565,7 @@ int crush_do_rule(const struct crush_map *map, /* copy final _leaf_ values to output set */ memcpy(o, c, osize*sizeof(*o)); - /* swap t and w arrays */ + /* swap o and w arrays */ tmp = o; o = w; w = tmp; -- cgit v1.1 From e8ef19c4ad161768e1d8309d5ae18481c098eb81 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:24 +0200 Subject: crush: eliminate CRUSH_MAX_SET result size limitation This is only present to size the temporary scratch arrays that we put on the stack. Let the caller allocate them as they wish and remove the limitation. Reflects ceph.git commit 1cfe140bf2dab99517589a82a916f4c75b9492d1. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/crush/crush.h | 1 - include/linux/crush/mapper.h | 3 ++- net/ceph/crush/mapper.c | 10 ++++++---- net/ceph/osdmap.c | 16 +++++++++++++--- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 09561a0..83543c5 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -21,7 +21,6 @@ #define CRUSH_MAX_DEPTH 10 /* max crush hierarchy depth */ -#define CRUSH_MAX_SET 10 /* max size of a mapping result */ /* diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h index 69310b0..eab3674 100644 --- a/include/linux/crush/mapper.h +++ b/include/linux/crush/mapper.h @@ -14,6 +14,7 @@ extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, i extern int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - const __u32 *weights, int weight_max); + const __u32 *weights, int weight_max, + int *scratch); #endif diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 82cab7d..dcf48bc 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -478,15 +478,17 @@ reject: * @result_max: maximum result size * @weight: weight vector (for map leaves) * @weight_max: size of weight vector + * @scratch: scratch vector for private use; must be >= 3 * result_max */ int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - const __u32 *weight, int weight_max) + const __u32 *weight, int weight_max, + int *scratch) { int result_len; - int a[CRUSH_MAX_SET]; - int b[CRUSH_MAX_SET]; - int c[CRUSH_MAX_SET]; + int *a = scratch; + int *b = scratch + result_max; + int *c = scratch + result_max*2; int recurse_to_leaf; int *w; int wsize = 0; diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 6477a68..8b1a6b4 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1110,6 +1110,16 @@ int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, } EXPORT_SYMBOL(ceph_calc_ceph_pg); +static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x, + int *result, int result_max, + const __u32 *weight, int weight_max) +{ + int scratch[result_max * 3]; + + return crush_do_rule(map, ruleno, x, result, result_max, + weight, weight_max, scratch); +} + /* * Calculate raw osd vector for the given pgid. Return pointer to osd * array, or NULL on failure. @@ -1163,9 +1173,9 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, pool->pgp_num_mask) + (unsigned)pgid.pool; } - r = crush_do_rule(osdmap->crush, ruleno, pps, osds, - min_t(int, pool->size, *num), - osdmap->osd_weight, osdmap->max_osd); + r = crush_do_rule_ary(osdmap->crush, ruleno, pps, + osds, min_t(int, pool->size, *num), + osdmap->osd_weight, osdmap->max_osd); if (r < 0) { pr_err("error %d from crush rule: pool %lld ruleset %d type %d" " size %d\n", r, pgid.pool, pool->crush_ruleset, -- cgit v1.1 From c6d98a603a02594f6ecf16d0a0af989ae9fa7abd Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:25 +0200 Subject: crush: return CRUSH_ITEM_UNDEF for failed placements with indep For firstn mode, if we fail to make a valid placement choice, we just continue and return a short result to the caller. For indep mode, however, we need to make the position stable, and return an undefined value on failed placements to avoid shifting later results to the left. Reflects ceph.git commit b1d4dd4eb044875874a1d01c01c7d766db5d0a80. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/crush/crush.h | 3 ++- net/ceph/crush/mapper.c | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 83543c5..3d6a129 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -19,10 +19,11 @@ #define CRUSH_MAGIC 0x00010000ul /* for detecting algorithm revisions */ - #define CRUSH_MAX_DEPTH 10 /* max crush hierarchy depth */ +#define CRUSH_ITEM_UNDEF 0x7fffffff /* undefined result */ + /* * CRUSH uses user-defined "rules" to describe how inputs should be * mapped to devices. A rule consists of sequence of steps to perform diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index dcf48bc..a860524 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -455,8 +455,12 @@ reject: } while (retry_descent); if (skip_rep) { - dprintk("skip rep\n"); - continue; + if (firstn) { + dprintk("skip rep\n"); + continue; + } + dprintk("undef rep, continuing\n"); + item = CRUSH_ITEM_UNDEF; } dprintk("CHOOSE got %d\n", item); -- cgit v1.1 From 9a3b490a20e06368c81d7a81506e99388e733379 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:25 +0200 Subject: crush: use breadth-first search for indep mode Reflects ceph.git commit 86e978036a4ecbac4c875e7c00f6c5bbe37282d3. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/crush/crush.h | 3 +- net/ceph/crush/mapper.c | 172 +++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 165 insertions(+), 10 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 3d6a129..4023b1b 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -22,7 +22,8 @@ #define CRUSH_MAX_DEPTH 10 /* max crush hierarchy depth */ -#define CRUSH_ITEM_UNDEF 0x7fffffff /* undefined result */ +#define CRUSH_ITEM_UNDEF 0x7ffffffe /* undefined result (internal use only) */ +#define CRUSH_ITEM_NONE 0x7fffffff /* no result */ /* * CRUSH uses user-defined "rules" to describe how inputs should be diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index a860524..caeb106 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -474,6 +474,148 @@ reject: /** + * choose indep: alternative breadth-first positionally stable mapping + * + */ +static void crush_choose_indep(const struct crush_map *map, + struct crush_bucket *bucket, + const __u32 *weight, int weight_max, + int x, int numrep, int type, + int *out, int outpos, + int recurse_to_leaf, + int *out2) +{ + struct crush_bucket *in = bucket; + int left = numrep - outpos; + int rep; + unsigned int ftotal; + int r; + int i; + int item = 0; + int itemtype; + int collide; + + dprintk("CHOOSE%s INDEP bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", + bucket->id, x, outpos, numrep); + + /* initially my result is undefined */ + for (rep = outpos; rep < numrep; rep++) { + out[rep] = CRUSH_ITEM_UNDEF; + if (out2) + out2[rep] = CRUSH_ITEM_UNDEF; + } + + for (ftotal = 0; left > 0 && ftotal < map->choose_total_tries; ftotal++) { + for (rep = outpos; rep < numrep; rep++) { + if (out[rep] != CRUSH_ITEM_UNDEF) + continue; + + in = bucket; /* initial bucket */ + + /* choose through intervening buckets */ + for (;;) { + r = rep; + + /* be careful */ + if (in->alg == CRUSH_BUCKET_UNIFORM && + in->size % numrep == 0) + /* r'=r+(n+1)*f_total */ + r += (numrep+1) * ftotal; + else + /* r' = r + n*f_total */ + r += numrep * ftotal; + + /* bucket choose */ + if (in->size == 0) { + dprintk(" empty bucket\n"); + break; + } + + item = crush_bucket_choose(in, x, r); + if (item >= map->max_devices) { + dprintk(" bad item %d\n", item); + out[rep] = CRUSH_ITEM_NONE; + if (out2) + out2[rep] = CRUSH_ITEM_NONE; + left--; + break; + } + + /* desired type? */ + if (item < 0) + itemtype = map->buckets[-1-item]->type; + else + itemtype = 0; + dprintk(" item %d type %d\n", item, itemtype); + + /* keep going? */ + if (itemtype != type) { + if (item >= 0 || + (-1-item) >= map->max_buckets) { + dprintk(" bad item type %d\n", type); + out[rep] = CRUSH_ITEM_NONE; + if (out2) + out2[rep] = + CRUSH_ITEM_NONE; + left--; + break; + } + in = map->buckets[-1-item]; + continue; + } + + /* collision? */ + collide = 0; + for (i = outpos; i < numrep; i++) { + if (out[i] == item) { + collide = 1; + break; + } + } + if (collide) + break; + + if (recurse_to_leaf) { + if (item < 0) { + crush_choose_indep(map, + map->buckets[-1-item], + weight, weight_max, + x, rep+1, 0, + out2, rep, + 0, NULL); + if (out2[rep] == CRUSH_ITEM_NONE) { + /* placed nothing; no leaf */ + break; + } + } else { + /* we already have a leaf! */ + out2[rep] = item; + } + } + + /* out? */ + if (itemtype == 0 && + is_out(map, weight, weight_max, item, x)) + break; + + /* yay! */ + out[rep] = item; + left--; + break; + } + } + } + for (rep = outpos; rep < numrep; rep++) { + if (out[rep] == CRUSH_ITEM_UNDEF) { + out[rep] = CRUSH_ITEM_NONE; + } + if (out2 && out2[rep] == CRUSH_ITEM_UNDEF) { + out2[rep] = CRUSH_ITEM_NONE; + } + } +} + +/** * crush_do_rule - calculate a mapping with the given input and rule * @map: the crush_map * @ruleno: the rule id @@ -556,15 +698,27 @@ int crush_do_rule(const struct crush_map *map, continue; } j = 0; - osize += crush_choose(map, - map->buckets[-1-w[i]], - weight, weight_max, - x, numrep, - curstep->arg2, - o+osize, j, - firstn, - recurse_to_leaf, - descend_once, c+osize); + if (firstn) { + osize += crush_choose(map, + map->buckets[-1-w[i]], + weight, weight_max, + x, numrep, + curstep->arg2, + o+osize, j, + firstn, + recurse_to_leaf, + descend_once, c+osize); + } else { + crush_choose_indep(map, + map->buckets[-1-w[i]], + weight, weight_max, + x, numrep, + curstep->arg2, + o+osize, j, + recurse_to_leaf, + c+osize); + osize += numrep; + } } if (recurse_to_leaf) -- cgit v1.1 From 3102b0a5b4cd63acccac78ca541b73fe28b4faa6 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:25 +0200 Subject: crush: add note about r in recursive choose Reflects ceph.git commit 4551fee9ad89d0427ed865d766d0d44004d3e3e1. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/crush/mapper.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index caeb106..77b7a73 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -514,6 +514,14 @@ static void crush_choose_indep(const struct crush_map *map, /* choose through intervening buckets */ for (;;) { + /* note: we base the choice on the position + * even in the nested call. that means that + * if the first layer chooses the same bucket + * in a different position, we will tend to + * choose a different item in that bucket. + * this will involve more devices in data + * movement and tend to distribute the load. + */ r = rep; /* be careful */ -- cgit v1.1 From 9fe07182827d9913daf85f3e6a950578a3fd4c5d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:25 +0200 Subject: crush: strip firstn conditionals out of crush_choose, rename Now that indep is handled by crush_choose_indep, rename crush_choose to crush_choose_firstn and remove all the conditionals. This ends up stripping out *lots* of code. Note that it *also* makes it obvious that the shenanigans we were playing with r' for uniform buckets were broken for firstn mode. This appears to have happened waaaay back in commit dae8bec9 (or earlier)... 2007. Reflects ceph.git commit 94350996cb2035850bcbece6a77a9b0394177ec9. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/crush/mapper.c | 88 +++++++++++++++++++------------------------------ 1 file changed, 33 insertions(+), 55 deletions(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 77b7a73..a71f8c4 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -282,7 +282,7 @@ static int is_out(const struct crush_map *map, } /** - * crush_choose - choose numrep distinct items of given type + * crush_choose_firstn - choose numrep distinct items of given type * @map: the crush_map * @bucket: the bucket we are choose an item from * @x: crush input value @@ -290,18 +290,17 @@ static int is_out(const struct crush_map *map, * @type: the type of item to choose * @out: pointer to output vector * @outpos: our position in that vector - * @firstn: true if choosing "first n" items, false if choosing "indep" * @recurse_to_leaf: true if we want one device under each item of given type * @descend_once: true if we should only try one descent before giving up * @out2: second output vector for leaf items (if @recurse_to_leaf) */ -static int crush_choose(const struct crush_map *map, - struct crush_bucket *bucket, - const __u32 *weight, int weight_max, - int x, int numrep, int type, - int *out, int outpos, - int firstn, int recurse_to_leaf, - int descend_once, int *out2) +static int crush_choose_firstn(const struct crush_map *map, + struct crush_bucket *bucket, + const __u32 *weight, int weight_max, + int x, int numrep, int type, + int *out, int outpos, + int recurse_to_leaf, + int descend_once, int *out2) { int rep; unsigned int ftotal, flocal; @@ -330,26 +329,8 @@ static int crush_choose(const struct crush_map *map, collide = 0; retry_bucket = 0; r = rep; - if (in->alg == CRUSH_BUCKET_UNIFORM) { - /* be careful */ - if (firstn || (__u32)numrep >= in->size) - /* r' = r + f_total */ - r += ftotal; - else if (in->size % numrep == 0) - /* r'=r+(n+1)*f_local */ - r += (numrep+1) * - (flocal+ftotal); - else - /* r' = r + n*f_local */ - r += numrep * (flocal+ftotal); - } else { - if (firstn) - /* r' = r + f_total */ - r += ftotal; - else - /* r' = r + n*f_local */ - r += numrep * (flocal+ftotal); - } + /* r' = r + f_total */ + r += ftotal; /* bucket choose */ if (in->size == 0) { @@ -399,12 +380,12 @@ static int crush_choose(const struct crush_map *map, reject = 0; if (!collide && recurse_to_leaf) { if (item < 0) { - if (crush_choose(map, + if (crush_choose_firstn(map, map->buckets[-1-item], weight, weight_max, x, outpos+1, 0, out2, outpos, - firstn, 0, + 0, map->chooseleaf_descend_once, NULL) <= outpos) /* didn't get leaf */ @@ -455,12 +436,8 @@ reject: } while (retry_descent); if (skip_rep) { - if (firstn) { - dprintk("skip rep\n"); - continue; - } - dprintk("undef rep, continuing\n"); - item = CRUSH_ITEM_UNDEF; + dprintk("skip rep\n"); + continue; } dprintk("CHOOSE got %d\n", item); @@ -474,7 +451,7 @@ reject: /** - * choose indep: alternative breadth-first positionally stable mapping + * crush_choose_indep: alternative breadth-first positionally stable mapping * */ static void crush_choose_indep(const struct crush_map *map, @@ -707,24 +684,25 @@ int crush_do_rule(const struct crush_map *map, } j = 0; if (firstn) { - osize += crush_choose(map, - map->buckets[-1-w[i]], - weight, weight_max, - x, numrep, - curstep->arg2, - o+osize, j, - firstn, - recurse_to_leaf, - descend_once, c+osize); + osize += crush_choose_firstn( + map, + map->buckets[-1-w[i]], + weight, weight_max, + x, numrep, + curstep->arg2, + o+osize, j, + recurse_to_leaf, + descend_once, c+osize); } else { - crush_choose_indep(map, - map->buckets[-1-w[i]], - weight, weight_max, - x, numrep, - curstep->arg2, - o+osize, j, - recurse_to_leaf, - c+osize); + crush_choose_indep( + map, + map->buckets[-1-w[i]], + weight, weight_max, + x, numrep, + curstep->arg2, + o+osize, j, + recurse_to_leaf, + c+osize); osize += numrep; } } -- cgit v1.1 From ab4ce2b5bdb5ca416756df3df6f5c63667a05065 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:25 +0200 Subject: crush: clarify numrep vs endpos Pass numrep (the width of the result) separately from the number of results we want *this* iteration. This makes things less awkward when we do a recursive call (for chooseleaf) and want only one item. Reflects ceph.git commit 1b567ee08972f268c11b43fc881e57b5984dd08b. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/crush/mapper.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index a71f8c4..125dbd0 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -457,13 +457,13 @@ reject: static void crush_choose_indep(const struct crush_map *map, struct crush_bucket *bucket, const __u32 *weight, int weight_max, - int x, int numrep, int type, + int x, int left, int numrep, int type, int *out, int outpos, int recurse_to_leaf, int *out2) { struct crush_bucket *in = bucket; - int left = numrep - outpos; + int endpos = outpos + left; int rep; unsigned int ftotal; int r; @@ -476,14 +476,14 @@ static void crush_choose_indep(const struct crush_map *map, bucket->id, x, outpos, numrep); /* initially my result is undefined */ - for (rep = outpos; rep < numrep; rep++) { + for (rep = outpos; rep < endpos; rep++) { out[rep] = CRUSH_ITEM_UNDEF; if (out2) out2[rep] = CRUSH_ITEM_UNDEF; } for (ftotal = 0; left > 0 && ftotal < map->choose_total_tries; ftotal++) { - for (rep = outpos; rep < numrep; rep++) { + for (rep = outpos; rep < endpos; rep++) { if (out[rep] != CRUSH_ITEM_UNDEF) continue; @@ -551,7 +551,7 @@ static void crush_choose_indep(const struct crush_map *map, /* collision? */ collide = 0; - for (i = outpos; i < numrep; i++) { + for (i = outpos; i < endpos; i++) { if (out[i] == item) { collide = 1; break; @@ -565,7 +565,7 @@ static void crush_choose_indep(const struct crush_map *map, crush_choose_indep(map, map->buckets[-1-item], weight, weight_max, - x, rep+1, 0, + x, 1, numrep, 0, out2, rep, 0, NULL); if (out2[rep] == CRUSH_ITEM_NONE) { @@ -590,7 +590,7 @@ static void crush_choose_indep(const struct crush_map *map, } } } - for (rep = outpos; rep < numrep; rep++) { + for (rep = outpos; rep < endpos; rep++) { if (out[rep] == CRUSH_ITEM_UNDEF) { out[rep] = CRUSH_ITEM_NONE; } @@ -698,7 +698,7 @@ int crush_do_rule(const struct crush_map *map, map, map->buckets[-1-w[i]], weight, weight_max, - x, numrep, + x, numrep, numrep, curstep->arg2, o+osize, j, recurse_to_leaf, -- cgit v1.1 From 4158608139de02f5265ec4f41774af7418911016 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:25 +0200 Subject: crush: pass parent r value for indep call Pass down the parent's 'r' value so that we will sample different values in the recursive call when the parent tries multiple times. This avoids doing useless work (calling multiple times and trying the same values). Reflects ceph.git commit 2731d3030d7a3e80922b7f1b7756f9a4a124bac5. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/crush/mapper.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 125dbd0..c727836 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -460,7 +460,8 @@ static void crush_choose_indep(const struct crush_map *map, int x, int left, int numrep, int type, int *out, int outpos, int recurse_to_leaf, - int *out2) + int *out2, + int parent_r) { struct crush_bucket *in = bucket; int endpos = outpos + left; @@ -499,7 +500,7 @@ static void crush_choose_indep(const struct crush_map *map, * this will involve more devices in data * movement and tend to distribute the load. */ - r = rep; + r = rep + parent_r; /* be careful */ if (in->alg == CRUSH_BUCKET_UNIFORM && @@ -567,7 +568,7 @@ static void crush_choose_indep(const struct crush_map *map, weight, weight_max, x, 1, numrep, 0, out2, rep, - 0, NULL); + 0, NULL, r); if (out2[rep] == CRUSH_ITEM_NONE) { /* placed nothing; no leaf */ break; @@ -702,7 +703,8 @@ int crush_do_rule(const struct crush_map *map, curstep->arg2, o+osize, j, recurse_to_leaf, - c+osize); + c+osize, + 0); osize += numrep; } } -- cgit v1.1 From be3226acc5544bcc91e756eb3ee6ca7b74f6f0a8 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:26 +0200 Subject: crush: new SET_CHOOSE_LEAF_TRIES command Explicitly control the number of sample attempts, and allow the number of tries in the recursive call to be explicitly controlled via the rule. This is important because the amount of time we want to spend looking for a solution may be rule dependent (e.g., higher for the wide indep pool than the rep pools). (We should do the same for the other tunables, by the way!) Reflects ceph.git commit c43c893be872f709c787bc57f46c0e97876ff681. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/crush/crush.h | 2 ++ net/ceph/crush/mapper.c | 31 +++++++++++++++++++++---------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 4023b1b..2e50bab 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -46,6 +46,8 @@ enum { CRUSH_RULE_EMIT = 4, /* no args */ CRUSH_RULE_CHOOSE_LEAF_FIRSTN = 6, CRUSH_RULE_CHOOSE_LEAF_INDEP = 7, + + CRUSH_RULE_SET_CHOOSE_LEAF_TRIES = 9, }; /* diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index c727836..e3ade07 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -455,11 +455,13 @@ reject: * */ static void crush_choose_indep(const struct crush_map *map, - struct crush_bucket *bucket, - const __u32 *weight, int weight_max, + struct crush_bucket *bucket, + const __u32 *weight, int weight_max, int x, int left, int numrep, int type, - int *out, int outpos, - int recurse_to_leaf, + int *out, int outpos, + unsigned int attempts, + unsigned int recurse_attempts, + int recurse_to_leaf, int *out2, int parent_r) { @@ -483,7 +485,7 @@ static void crush_choose_indep(const struct crush_map *map, out2[rep] = CRUSH_ITEM_UNDEF; } - for (ftotal = 0; left > 0 && ftotal < map->choose_total_tries; ftotal++) { + for (ftotal = 0; left > 0 && ftotal < attempts; ftotal++) { for (rep = outpos; rep < endpos; rep++) { if (out[rep] != CRUSH_ITEM_UNDEF) continue; @@ -564,11 +566,12 @@ static void crush_choose_indep(const struct crush_map *map, if (recurse_to_leaf) { if (item < 0) { crush_choose_indep(map, - map->buckets[-1-item], - weight, weight_max, - x, 1, numrep, 0, - out2, rep, - 0, NULL, r); + map->buckets[-1-item], + weight, weight_max, + x, 1, numrep, 0, + out2, rep, + recurse_attempts, 0, + 0, NULL, r); if (out2[rep] == CRUSH_ITEM_NONE) { /* placed nothing; no leaf */ break; @@ -631,6 +634,7 @@ int crush_do_rule(const struct crush_map *map, __u32 step; int i, j; int numrep; + int choose_leaf_tries = 1; const int descend_once = 0; if ((__u32)ruleno >= map->max_rules) { @@ -653,6 +657,11 @@ int crush_do_rule(const struct crush_map *map, wsize = 1; break; + case CRUSH_RULE_SET_CHOOSE_LEAF_TRIES: + if (curstep->arg1 > 0) + choose_leaf_tries = curstep->arg1; + break; + case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: case CRUSH_RULE_CHOOSE_FIRSTN: firstn = 1; @@ -702,6 +711,8 @@ int crush_do_rule(const struct crush_map *map, x, numrep, numrep, curstep->arg2, o+osize, j, + map->choose_total_tries, + choose_leaf_tries, recurse_to_leaf, c+osize, 0); -- cgit v1.1 From f18650ace38ef200dd1578257c75e9407297953c Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:26 +0200 Subject: crush: apply chooseleaf_tries to firstn mode too Parameterize the attempts for the _firstn choose method, and apply the rule-specified tries count to firstn mode as well. Note that we have slightly different behavior here than with indep: If the firstn value is not specified for firstn, we pass through the normal attempt count. This maintains compatibility with legacy behavior. Note that this is usually *not* actually N^2 work, though, because of the descend_once tunable. However, descend_once is unfortunately *not* the same thing as 1 chooseleaf try because it is only checked on a reject but not on a collision. Sigh. In contrast, for indep, if tries is not specified we default to 1 recursive attempt, because that is simply more sane, and we have the option to do so. The descend_once tunable has no effect for indep. Reflects ceph.git commit 64aeded50d80942d66a5ec7b604ff2fcbf5d7b63. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/crush/crush.h | 5 ++++- net/ceph/crush/mapper.c | 14 ++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 2e50bab..07b8fd4 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -165,7 +165,10 @@ struct crush_map { __u32 choose_local_fallback_tries; /* choose attempts before giving up */ __u32 choose_total_tries; - /* attempt chooseleaf inner descent once; on failure retry outer descent */ + /* attempt chooseleaf inner descent once for firstn mode; on + * reject retry outer descent. Note that this does *not* + * apply to a collision: in that case we will retry as we used + * to. */ __u32 chooseleaf_descend_once; }; diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index e3ade07..c343205 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -299,6 +299,8 @@ static int crush_choose_firstn(const struct crush_map *map, const __u32 *weight, int weight_max, int x, int numrep, int type, int *out, int outpos, + unsigned int attempts, + unsigned int recurse_attempts, int recurse_to_leaf, int descend_once, int *out2) { @@ -385,6 +387,7 @@ static int crush_choose_firstn(const struct crush_map *map, weight, weight_max, x, outpos+1, 0, out2, outpos, + recurse_attempts, 0, 0, map->chooseleaf_descend_once, NULL) <= outpos) @@ -421,7 +424,7 @@ reject: flocal <= in->size + map->choose_local_fallback_tries) /* exhaustive bucket search */ retry_bucket = 1; - else if (ftotal <= map->choose_total_tries) + else if (ftotal <= attempts) /* then retry descent */ retry_descent = 1; else @@ -634,7 +637,8 @@ int crush_do_rule(const struct crush_map *map, __u32 step; int i, j; int numrep; - int choose_leaf_tries = 1; + int choose_tries = map->choose_total_tries; + int choose_leaf_tries = 0; const int descend_once = 0; if ((__u32)ruleno >= map->max_rules) { @@ -701,6 +705,8 @@ int crush_do_rule(const struct crush_map *map, x, numrep, curstep->arg2, o+osize, j, + choose_tries, + choose_leaf_tries ? choose_leaf_tries : choose_tries, recurse_to_leaf, descend_once, c+osize); } else { @@ -711,8 +717,8 @@ int crush_do_rule(const struct crush_map *map, x, numrep, numrep, curstep->arg2, o+osize, j, - map->choose_total_tries, - choose_leaf_tries, + choose_tries, + choose_leaf_tries ? choose_leaf_tries : 1, recurse_to_leaf, c+osize, 0); -- cgit v1.1 From cc10df4a3a5c34cb1d5b635ac70dd1fc406153ce Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:26 +0200 Subject: crush: add SET_CHOOSE_TRIES rule step Since we can specify the recursive retries in a rule, we may as well also specify the non-recursive tries too for completeness. Reflects ceph.git commit d1b97462cffccc871914859eaee562f2786abfd1. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/crush/crush.h | 3 ++- net/ceph/crush/mapper.c | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 07b8fd4..5f95969 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -47,7 +47,8 @@ enum { CRUSH_RULE_CHOOSE_LEAF_FIRSTN = 6, CRUSH_RULE_CHOOSE_LEAF_INDEP = 7, - CRUSH_RULE_SET_CHOOSE_LEAF_TRIES = 9, + CRUSH_RULE_SET_CHOOSE_TRIES = 8, /* override choose_total_tries */ + CRUSH_RULE_SET_CHOOSE_LEAF_TRIES = 9, /* override chooseleaf_descend_once */ }; /* diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index c343205..a1acdea 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -661,6 +661,11 @@ int crush_do_rule(const struct crush_map *map, wsize = 1; break; + case CRUSH_RULE_SET_CHOOSE_TRIES: + if (curstep->arg1 > 0) + choose_tries = curstep->arg1; + break; + case CRUSH_RULE_SET_CHOOSE_LEAF_TRIES: if (curstep->arg1 > 0) choose_leaf_tries = curstep->arg1; -- cgit v1.1 From 917edad5d1d62070436b74ecbf5ea019b651ff69 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:26 +0200 Subject: crush: CHOOSE_LEAF -> CHOOSELEAF throughout This aligns the internal identifier names with the user-visible names in the decompiled crush map language. Reflects ceph.git commit caa0e22e15e4226c3671318ba1f61314bf6da2a6. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/crush/crush.h | 6 +++--- net/ceph/crush/mapper.c | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 5f95969..7b0fc4a 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -44,11 +44,11 @@ enum { /* arg2 = type */ CRUSH_RULE_CHOOSE_INDEP = 3, /* same */ CRUSH_RULE_EMIT = 4, /* no args */ - CRUSH_RULE_CHOOSE_LEAF_FIRSTN = 6, - CRUSH_RULE_CHOOSE_LEAF_INDEP = 7, + CRUSH_RULE_CHOOSELEAF_FIRSTN = 6, + CRUSH_RULE_CHOOSELEAF_INDEP = 7, CRUSH_RULE_SET_CHOOSE_TRIES = 8, /* override choose_total_tries */ - CRUSH_RULE_SET_CHOOSE_LEAF_TRIES = 9, /* override chooseleaf_descend_once */ + CRUSH_RULE_SET_CHOOSELEAF_TRIES = 9, /* override chooseleaf_descend_once */ }; /* diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index a1acdea..e9256a3 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -666,25 +666,25 @@ int crush_do_rule(const struct crush_map *map, choose_tries = curstep->arg1; break; - case CRUSH_RULE_SET_CHOOSE_LEAF_TRIES: + case CRUSH_RULE_SET_CHOOSELEAF_TRIES: if (curstep->arg1 > 0) choose_leaf_tries = curstep->arg1; break; - case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: + case CRUSH_RULE_CHOOSELEAF_FIRSTN: case CRUSH_RULE_CHOOSE_FIRSTN: firstn = 1; /* fall through */ - case CRUSH_RULE_CHOOSE_LEAF_INDEP: + case CRUSH_RULE_CHOOSELEAF_INDEP: case CRUSH_RULE_CHOOSE_INDEP: if (wsize == 0) break; recurse_to_leaf = curstep->op == - CRUSH_RULE_CHOOSE_LEAF_FIRSTN || + CRUSH_RULE_CHOOSELEAF_FIRSTN || curstep->op == - CRUSH_RULE_CHOOSE_LEAF_INDEP; + CRUSH_RULE_CHOOSELEAF_INDEP; /* reset output */ osize = 0; -- cgit v1.1 From d390bb2a83086f2b79c152e2c1734813bd257d9b Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:26 +0200 Subject: crush: generalize descend_once The legacy behavior is to make the normal number of tries for the recursive chooseleaf call. The descend_once tunable changed this to making a single try and bail if we get a reject (note that it is impossible to collide in the recursive case). The new set_chooseleaf_tries lets you select the number of recursive chooseleaf attempts for indep mode, or default to 1. Use the same behavior for firstn, except default to total_tries when the legacy tunables are set (for compatibility). This makes the rule step override the (new) default of 1 recursive attempt, keeping behavior consistent with indep mode. Reflects ceph.git commit 685c6950ef3df325ef04ce7c986e36ca2514c5f1. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/crush/mapper.c | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index e9256a3..0613dd2 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -291,7 +291,6 @@ static int is_out(const struct crush_map *map, * @out: pointer to output vector * @outpos: our position in that vector * @recurse_to_leaf: true if we want one device under each item of given type - * @descend_once: true if we should only try one descent before giving up * @out2: second output vector for leaf items (if @recurse_to_leaf) */ static int crush_choose_firstn(const struct crush_map *map, @@ -302,7 +301,7 @@ static int crush_choose_firstn(const struct crush_map *map, unsigned int attempts, unsigned int recurse_attempts, int recurse_to_leaf, - int descend_once, int *out2) + int *out2) { int rep; unsigned int ftotal, flocal; @@ -389,7 +388,6 @@ static int crush_choose_firstn(const struct crush_map *map, out2, outpos, recurse_attempts, 0, 0, - map->chooseleaf_descend_once, NULL) <= outpos) /* didn't get leaf */ reject = 1; @@ -414,10 +412,7 @@ reject: ftotal++; flocal++; - if (reject && descend_once) - /* let outer call try again */ - skip_rep = 1; - else if (collide && flocal <= map->choose_local_tries) + if (collide && flocal <= map->choose_local_tries) /* retry locally a few times */ retry_bucket = 1; else if (map->choose_local_fallback_tries > 0 && @@ -639,7 +634,6 @@ int crush_do_rule(const struct crush_map *map, int numrep; int choose_tries = map->choose_total_tries; int choose_leaf_tries = 0; - const int descend_once = 0; if ((__u32)ruleno >= map->max_rules) { dprintk(" bad ruleno %d\n", ruleno); @@ -703,6 +697,14 @@ int crush_do_rule(const struct crush_map *map, } j = 0; if (firstn) { + int recurse_tries; + if (choose_leaf_tries) + recurse_tries = + choose_leaf_tries; + else if (map->chooseleaf_descend_once) + recurse_tries = 1; + else + recurse_tries = choose_tries; osize += crush_choose_firstn( map, map->buckets[-1-w[i]], @@ -711,9 +713,9 @@ int crush_do_rule(const struct crush_map *map, curstep->arg2, o+osize, j, choose_tries, - choose_leaf_tries ? choose_leaf_tries : choose_tries, + recurse_tries, recurse_to_leaf, - descend_once, c+osize); + c+osize); } else { crush_choose_indep( map, @@ -723,7 +725,8 @@ int crush_do_rule(const struct crush_map *map, curstep->arg2, o+osize, j, choose_tries, - choose_leaf_tries ? choose_leaf_tries : 1, + choose_leaf_tries ? + choose_leaf_tries : 1, recurse_to_leaf, c+osize, 0); -- cgit v1.1 From f046bf92080cbdc4a94c6e86698c5a3f10716445 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:27 +0200 Subject: crush: add set_choose_local_[fallback_]tries steps This allows all of the tunables to be overridden by a specific rule. Reflects ceph.git commits d129e09e57fbc61cfd4f492e3ee77d0750c9d292, 0497db49e5973b50df26251ed0e3f4ac7578e66e. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/crush/crush.h | 2 ++ net/ceph/crush/mapper.c | 28 +++++++++++++++++++++++----- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 7b0fc4a..acaa561 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -49,6 +49,8 @@ enum { CRUSH_RULE_SET_CHOOSE_TRIES = 8, /* override choose_total_tries */ CRUSH_RULE_SET_CHOOSELEAF_TRIES = 9, /* override chooseleaf_descend_once */ + CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES = 10, + CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES = 11, }; /* diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 0613dd2..8cde481 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -300,6 +300,8 @@ static int crush_choose_firstn(const struct crush_map *map, int *out, int outpos, unsigned int attempts, unsigned int recurse_attempts, + unsigned int local_tries, + unsigned int local_fallback_tries, int recurse_to_leaf, int *out2) { @@ -338,9 +340,9 @@ static int crush_choose_firstn(const struct crush_map *map, reject = 1; goto reject; } - if (map->choose_local_fallback_tries > 0 && + if (local_fallback_tries > 0 && flocal >= (in->size>>1) && - flocal > map->choose_local_fallback_tries) + flocal > local_fallback_tries) item = bucket_perm_choose(in, x, r); else item = crush_bucket_choose(in, x, r); @@ -387,6 +389,8 @@ static int crush_choose_firstn(const struct crush_map *map, x, outpos+1, 0, out2, outpos, recurse_attempts, 0, + local_tries, + local_fallback_tries, 0, NULL) <= outpos) /* didn't get leaf */ @@ -412,11 +416,11 @@ reject: ftotal++; flocal++; - if (collide && flocal <= map->choose_local_tries) + if (collide && flocal <= local_tries) /* retry locally a few times */ retry_bucket = 1; - else if (map->choose_local_fallback_tries > 0 && - flocal <= in->size + map->choose_local_fallback_tries) + else if (local_fallback_tries > 0 && + flocal <= in->size + local_fallback_tries) /* exhaustive bucket search */ retry_bucket = 1; else if (ftotal <= attempts) @@ -633,6 +637,8 @@ int crush_do_rule(const struct crush_map *map, int i, j; int numrep; int choose_tries = map->choose_total_tries; + int choose_local_tries = map->choose_local_tries; + int choose_local_fallback_tries = map->choose_local_fallback_tries; int choose_leaf_tries = 0; if ((__u32)ruleno >= map->max_rules) { @@ -665,6 +671,16 @@ int crush_do_rule(const struct crush_map *map, choose_leaf_tries = curstep->arg1; break; + case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES: + if (curstep->arg1 > 0) + choose_local_tries = curstep->arg1; + break; + + case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES: + if (curstep->arg1 > 0) + choose_local_fallback_tries = curstep->arg1; + break; + case CRUSH_RULE_CHOOSELEAF_FIRSTN: case CRUSH_RULE_CHOOSE_FIRSTN: firstn = 1; @@ -714,6 +730,8 @@ int crush_do_rule(const struct crush_map *map, o+osize, j, choose_tries, recurse_tries, + choose_local_tries, + choose_local_fallback_tries, recurse_to_leaf, c+osize); } else { -- cgit v1.1 From 2d8be0bc8bc2dc7d3fc3d322c4db3fdf6c924660 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:27 +0200 Subject: crush: attempts -> tries Reflects ceph.git commit ea3a0bb8b773360d73b8b77fa32115ef091c9857. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/crush/mapper.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 8cde481..71ce4f1 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -298,8 +298,8 @@ static int crush_choose_firstn(const struct crush_map *map, const __u32 *weight, int weight_max, int x, int numrep, int type, int *out, int outpos, - unsigned int attempts, - unsigned int recurse_attempts, + unsigned int tries, + unsigned int recurse_tries, unsigned int local_tries, unsigned int local_fallback_tries, int recurse_to_leaf, @@ -388,7 +388,7 @@ static int crush_choose_firstn(const struct crush_map *map, weight, weight_max, x, outpos+1, 0, out2, outpos, - recurse_attempts, 0, + recurse_tries, 0, local_tries, local_fallback_tries, 0, @@ -423,7 +423,7 @@ reject: flocal <= in->size + local_fallback_tries) /* exhaustive bucket search */ retry_bucket = 1; - else if (ftotal <= attempts) + else if (ftotal <= tries) /* then retry descent */ retry_descent = 1; else @@ -461,8 +461,8 @@ static void crush_choose_indep(const struct crush_map *map, const __u32 *weight, int weight_max, int x, int left, int numrep, int type, int *out, int outpos, - unsigned int attempts, - unsigned int recurse_attempts, + unsigned int tries, + unsigned int recurse_tries, int recurse_to_leaf, int *out2, int parent_r) @@ -487,7 +487,7 @@ static void crush_choose_indep(const struct crush_map *map, out2[rep] = CRUSH_ITEM_UNDEF; } - for (ftotal = 0; left > 0 && ftotal < attempts; ftotal++) { + for (ftotal = 0; left > 0 && ftotal < tries; ftotal++) { for (rep = outpos; rep < endpos; rep++) { if (out[rep] != CRUSH_ITEM_UNDEF) continue; @@ -572,7 +572,7 @@ static void crush_choose_indep(const struct crush_map *map, weight, weight_max, x, 1, numrep, 0, out2, rep, - recurse_attempts, 0, + recurse_tries, 0, 0, NULL, r); if (out2[rep] == CRUSH_ITEM_NONE) { /* placed nothing; no leaf */ -- cgit v1.1 From 0e32d7126cdf30b610bc131c81f131c717bd0d77 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:27 +0200 Subject: crush: fix crush_choose_firstn comment Reflects ceph.git commit 8b38f10bc2ee3643a33ea5f9545ad5c00e4ac5b4. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/crush/mapper.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 71ce4f1..b703790 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -290,7 +290,11 @@ static int is_out(const struct crush_map *map, * @type: the type of item to choose * @out: pointer to output vector * @outpos: our position in that vector - * @recurse_to_leaf: true if we want one device under each item of given type + * @tries: number of attempts to make + * @recurse_tries: number of attempts to have recursive chooseleaf make + * @local_tries: localized retries + * @local_fallback_tries: localized fallback retries + * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose) * @out2: second output vector for leaf items (if @recurse_to_leaf) */ static int crush_choose_firstn(const struct crush_map *map, -- cgit v1.1 From cdff49918c8286ac18593e742ead25242c76c81d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 24 Dec 2013 21:19:27 +0200 Subject: crush: support new indep mode and SET_* steps (crush v2) by default Add CRUSH_V2 feature (new indep mode and SET_* steps) to a set of features supported by default. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/ceph_features.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index 003ea71..5f42e44 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -79,7 +79,8 @@ static inline u64 ceph_sanitize_features(u64 features) CEPH_FEATURE_CRUSH_TUNABLES | \ CEPH_FEATURE_CRUSH_TUNABLES2 | \ CEPH_FEATURE_REPLY_CREATE_INODE | \ - CEPH_FEATURE_OSDHASHPSPOOL) + CEPH_FEATURE_OSDHASHPSPOOL | \ + CEPH_FEATURE_CRUSH_V2) #define CEPH_FEATURES_REQUIRED_DEFAULT \ (CEPH_FEATURE_NOSRCADDR | \ -- cgit v1.1 From f48db1e9ac6f1578ab7efef9f66c70279e2f0cb5 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 30 Dec 2013 19:21:29 +0200 Subject: libceph: use CEPH_MON_PORT when the specified port is 0 Similar to userspace, don't bail with "parse_ips bad ip ..." if the specified port is port 0, instead use port CEPH_MON_PORT (6789, the default monitor port). Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/messenger.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index bd172e1..d2cadb5 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1866,7 +1866,9 @@ int ceph_parse_ips(const char *c, const char *end, port = (port * 10) + (*p - '0'); p++; } - if (port > 65535 || port == 0) + if (port == 0) + port = CEPH_MON_PORT; + else if (port > 65535) goto bad; } else { port = CEPH_MON_PORT; -- cgit v1.1 From 3cea4c3071d4e55e9d7356efe9d0ebf92f0c2204 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 9 Jan 2014 20:08:21 +0200 Subject: libceph: rename ceph_msg::front_max to front_alloc_len Rename front_max field of struct ceph_msg to front_alloc_len to make its purpose more clear. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/messenger.h | 2 +- net/ceph/messenger.c | 6 +++--- net/ceph/mon_client.c | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index c1d3f5a..861138f 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -157,7 +157,7 @@ struct ceph_msg { bool front_is_vmalloc; bool more_to_follow; bool needs_out_seq; - int front_max; + int front_alloc_len; unsigned long ack_stamp; /* tx: when we were acked */ struct ceph_msgpool *pool; diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index d2cadb5..f4d411c 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -3130,7 +3130,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, INIT_LIST_HEAD(&m->data); /* front */ - m->front_max = front_len; + m->front_alloc_len = front_len; if (front_len) { if (front_len > PAGE_CACHE_SIZE) { m->front.iov_base = __vmalloc(front_len, flags, @@ -3305,8 +3305,8 @@ EXPORT_SYMBOL(ceph_msg_last_put); void ceph_msg_dump(struct ceph_msg *msg) { - pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, - msg->front_max, msg->data_length); + pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg, + msg->front_alloc_len, msg->data_length); print_hex_dump(KERN_DEBUG, "header: ", DUMP_PREFIX_OFFSET, 16, 1, &msg->hdr, sizeof(msg->hdr), true); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 1fe25cd..2ac9ef3 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -152,7 +152,7 @@ static int __open_session(struct ceph_mon_client *monc) /* initiatiate authentication handshake */ ret = ceph_auth_build_hello(monc->auth, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); __send_prepared_auth_request(monc, ret); } else { dout("open_session mon%d already open\n", monc->cur_mon); @@ -196,7 +196,7 @@ static void __send_subscribe(struct ceph_mon_client *monc) int num; p = msg->front.iov_base; - end = p + msg->front_max; + end = p + msg->front_alloc_len; num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; ceph_encode_32(&p, num); @@ -897,7 +897,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc, ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, msg->front.iov_len, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); if (ret < 0) { monc->client->auth_err = ret; wake_up_all(&monc->client->auth_wq); @@ -939,7 +939,7 @@ static int __validate_auth(struct ceph_mon_client *monc) return 0; ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); if (ret <= 0) return ret; /* either an error, or no need to authenticate */ __send_prepared_auth_request(monc, ret); -- cgit v1.1 From 3f0a4ac55fe036902e3666be740da63528ad8639 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 9 Jan 2014 20:08:21 +0200 Subject: libceph: rename front to front_len in get_reply() Rename front local variable to front_len in get_reply() to make its purpose more clear. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/osd_client.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 9f19935..7619c37 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2502,7 +2502,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, struct ceph_osd_client *osdc = osd->o_osdc; struct ceph_msg *m; struct ceph_osd_request *req; - int front = le32_to_cpu(hdr->front_len); + int front_len = le32_to_cpu(hdr->front_len); int data_len = le32_to_cpu(hdr->data_len); u64 tid; @@ -2522,12 +2522,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply, req->r_reply->con); ceph_msg_revoke_incoming(req->r_reply); - if (front > req->r_reply->front.iov_len) { + if (front_len > req->r_reply->front.iov_len) { pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", - front, (int)req->r_reply->front.iov_len, + front_len, (int)req->r_reply->front.iov_len, (unsigned int)con->peer_name.type, le64_to_cpu(con->peer_name.num)); - m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); + m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, + false); if (!m) goto out; ceph_msg_put(req->r_reply); -- cgit v1.1 From f2be82b0058e90b5d9ac2cb896b4914276fb50ef Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 9 Jan 2014 20:08:21 +0200 Subject: libceph: fix preallocation check in get_reply() The check that makes sure that we have enough memory allocated to read in the entire header of the message in question is currently busted. It compares front_len of the incoming message with iov_len field of ceph_msg::front structure, which is used primarily to indicate the amount of data already read in, and not the size of the allocated buffer. Under certain conditions (e.g. a short read from a socket followed by that socket's shutdown and owning ceph_connection reset) this results in a warning similar to [85688.975866] libceph: get_reply front 198 > preallocated 122 (4#0) and, through another bug, leads to forever hung tasks and forced reboots. Fix this by comparing front_len with front_alloc_len field of struct ceph_msg, which stores the actual size of the buffer. Fixes: http://tracker.ceph.com/issues/5425 Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/messenger.c | 3 +-- net/ceph/osd_client.c | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index f4d411c..252ad4e 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -3130,7 +3130,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, INIT_LIST_HEAD(&m->data); /* front */ - m->front_alloc_len = front_len; if (front_len) { if (front_len > PAGE_CACHE_SIZE) { m->front.iov_base = __vmalloc(front_len, flags, @@ -3147,7 +3146,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, } else { m->front.iov_base = NULL; } - m->front.iov_len = front_len; + m->front_alloc_len = m->front.iov_len = front_len; dout("ceph_msg_new %p front %d\n", m, front_len); return m; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 7619c37..7331951 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2522,9 +2522,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply, req->r_reply->con); ceph_msg_revoke_incoming(req->r_reply); - if (front_len > req->r_reply->front.iov_len) { + if (front_len > req->r_reply->front_alloc_len) { pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", - front_len, (int)req->r_reply->front.iov_len, + front_len, req->r_reply->front_alloc_len, (unsigned int)con->peer_name.type, le64_to_cpu(con->peer_name.num)); m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, -- cgit v1.1 From fc12c80aa57ee90385dc90e4263ec1a66200ba76 Mon Sep 17 00:00:00 2001 From: "J. Bruce Fields" Date: Thu, 16 Jan 2014 17:42:53 -0500 Subject: ceph: trivial comment fix "disconnected" is too easily confused with "DCACHE_DISCONNECTED". I think "unhashed" is the more precise term here. Signed-off-by: J. Bruce Fields Reviewed-by: Sage Weil --- fs/ceph/caps.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 9289c6b2..80dad0d 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2350,11 +2350,11 @@ static void invalidate_aliases(struct inode *inode) d_prune_aliases(inode); /* * For non-directory inode, d_find_alias() only returns - * connected dentry. After calling d_invalidate(), the - * dentry become disconnected. + * hashed dentry. After calling d_invalidate(), the + * dentry becomes unhashed. * * For directory inode, d_find_alias() can return - * disconnected dentry. But directory inode should have + * unhashed dentry. But directory inode should have * one alias at most. */ while ((dn = d_find_alias(inode))) { -- cgit v1.1 From 4fe59789adc93b2573b0417ac93136805521902e Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 31 Oct 2013 16:44:14 +0800 Subject: ceph: handle cap export race in try_flush_caps() auth cap may change after releasing the i_ceph_lock Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 80dad0d..1012099 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1735,13 +1735,12 @@ ack: /* * Try to flush dirty caps back to the auth mds. */ -static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, - unsigned *flush_tid) +static int try_flush_caps(struct inode *inode, unsigned *flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - int unlock_session = session ? 0 : 1; int flushing = 0; + struct ceph_mds_session *session = NULL; retry: spin_lock(&ci->i_ceph_lock); @@ -1755,13 +1754,14 @@ retry: int want = __ceph_caps_wanted(ci); int delayed; - if (!session) { + if (!session || session != cap->session) { spin_unlock(&ci->i_ceph_lock); + if (session) + mutex_unlock(&session->s_mutex); session = cap->session; mutex_lock(&session->s_mutex); goto retry; } - BUG_ON(session != cap->session); if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; @@ -1780,7 +1780,7 @@ retry: out: spin_unlock(&ci->i_ceph_lock); out_unlocked: - if (session && unlock_session) + if (session) mutex_unlock(&session->s_mutex); return flushing; } @@ -1865,7 +1865,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) return ret; mutex_lock(&inode->i_mutex); - dirty = try_flush_caps(inode, NULL, &flush_tid); + dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); /* @@ -1900,7 +1900,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) dout("write_inode %p wait=%d\n", inode, wait); if (wait) { - dirty = try_flush_caps(inode, NULL, &flush_tid); + dirty = try_flush_caps(inode, &flush_tid); if (dirty) err = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); -- cgit v1.1 From d1b87809fba3e07a261080837d1ae58d790b51a6 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 13 Nov 2013 14:47:19 +0800 Subject: ceph: use ceph_seq_cmp() to compare migrate_seq Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 1012099..2c39d9f 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -628,7 +628,7 @@ retry: cap->cap_id = cap_id; cap->issued = issued; cap->implemented |= issued; - if (mseq > cap->mseq) + if (ceph_seq_cmp(mseq, cap->mseq) > 0) cap->mds_wanted = wanted; else cap->mds_wanted |= wanted; -- cgit v1.1 From 9563f88c1fa01341d125e396edc654a8dbcab2d2 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 22 Nov 2013 13:50:45 +0800 Subject: ceph: fix cache revoke race handle following sequence of events: - non-auth MDS revokes Fc cap. queue invalidate work - auth MDS issues Fc cap through request reply. i_rdcache_gen gets increased. - invalidate work runs. it finds i_rdcache_revoking != i_rdcache_gen, so it does nothing. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 2 +- fs/ceph/inode.c | 8 +++++--- fs/ceph/super.h | 2 ++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 2c39d9f..d2154d6 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -816,7 +816,7 @@ int __ceph_caps_revoking_other(struct ceph_inode_info *ci, for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); - if (cap != ocap && __cap_is_valid(cap) && + if (cap != ocap && (cap->implemented & ~cap->issued & mask)) return 1; } diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index a808bfb..3db97ba 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1466,7 +1466,8 @@ static void ceph_invalidate_work(struct work_struct *work) dout("invalidate_pages %p gen %d revoking %d\n", inode, ci->i_rdcache_gen, ci->i_rdcache_revoking); if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { - /* nevermind! */ + if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE)) + check = 1; spin_unlock(&ci->i_ceph_lock); mutex_unlock(&ci->i_truncate_mutex); goto out; @@ -1487,13 +1488,14 @@ static void ceph_invalidate_work(struct work_struct *work) dout("invalidate_pages %p gen %d raced, now %d revoking %d\n", inode, orig_gen, ci->i_rdcache_gen, ci->i_rdcache_revoking); + if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE)) + check = 1; } spin_unlock(&ci->i_ceph_lock); mutex_unlock(&ci->i_truncate_mutex); - +out: if (check) ceph_check_caps(ci, 0, NULL); -out: iput(inode); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 7fa78a7..891cda8 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -528,6 +528,8 @@ static inline int __ceph_caps_dirty(struct ceph_inode_info *ci) } extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask); +extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci, + struct ceph_cap *ocap, int mask); extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask); extern int __ceph_caps_used(struct ceph_inode_info *ci); -- cgit v1.1 From 979abfdd5c7ca4abe3f0157a6ea9bfef41114c89 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 22 Nov 2013 13:56:24 +0800 Subject: ceph: fix trim caps - don't trim auth cap if there are flusing caps - don't trim auth cap if any 'write' cap is wanted - allow trimming non-auth cap even if the inode is dirty Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 4a13f6e..73c7943 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1214,7 +1214,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) { struct ceph_mds_session *session = arg; struct ceph_inode_info *ci = ceph_inode(inode); - int used, oissued, mine; + int used, wanted, oissued, mine; if (session->s_trim_caps <= 0) return -1; @@ -1222,14 +1222,19 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) spin_lock(&ci->i_ceph_lock); mine = cap->issued | cap->implemented; used = __ceph_caps_used(ci); + wanted = __ceph_caps_file_wanted(ci); oissued = __ceph_caps_issued_other(ci, cap); - dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", + dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), - ceph_cap_string(used)); - if (ci->i_dirty_caps) - goto out; /* dirty caps */ - if ((used & ~oissued) & mine) + ceph_cap_string(used), ceph_cap_string(wanted)); + if (cap == ci->i_auth_cap) { + if (ci->i_dirty_caps | ci->i_flushing_caps) + goto out; + if ((used | wanted) & CEPH_CAP_ANY_WR) + goto out; + } + if ((used | wanted) & ~oissued & mine) goto out; /* we need these caps */ session->s_trim_caps--; -- cgit v1.1 From ca18bede048e95a749d13410ce1da4ad0ffa7938 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 22 Nov 2013 14:21:44 +0800 Subject: ceph: handle -ESTALE reply Send requests that operate on path to directory's auth MDS if mode == USE_AUTH_MDS. Always retry using the auth MDS if got -ESTALE reply from non-auth MDS. Also clean up the code that handles auth MDS change. Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 73c7943..1fd655a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -713,14 +713,15 @@ static int __choose_mds(struct ceph_mds_client *mdsc, struct dentry *dn = get_nonsnap_parent(parent); inode = dn->d_inode; dout("__choose_mds using nonsnap parent %p\n", inode); - } else if (req->r_dentry->d_inode) { + } else { /* dentry target */ inode = req->r_dentry->d_inode; - } else { - /* dir + name */ - inode = dir; - hash = ceph_dentry_hash(dir, req->r_dentry); - is_hash = true; + if (!inode || mode == USE_AUTH_MDS) { + /* dir + name */ + inode = dir; + hash = ceph_dentry_hash(dir, req->r_dentry); + is_hash = true; + } } } @@ -2161,26 +2162,16 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) */ if (result == -ESTALE) { dout("got ESTALE on request %llu", req->r_tid); - if (!req->r_inode) { - /* do nothing; not an authority problem */ - } else if (req->r_direct_mode != USE_AUTH_MDS) { + if (req->r_direct_mode != USE_AUTH_MDS) { dout("not using auth, setting for that now"); req->r_direct_mode = USE_AUTH_MDS; __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; } else { - struct ceph_inode_info *ci = ceph_inode(req->r_inode); - struct ceph_cap *cap = NULL; - - if (req->r_session) - cap = ceph_get_cap_for_mds(ci, - req->r_session->s_mds); - - dout("already using auth"); - if ((!cap || cap != ci->i_auth_cap) || - (cap->mseq != req->r_sent_on_mseq)) { - dout("but cap changed, so resending"); + int mds = __choose_mds(mdsc, req); + if (mds >= 0 && mds != req->r_session->s_mds) { + dout("but auth changed, so resending"); __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; -- cgit v1.1 From 9215aeea622fec7ca8123c6bd6f03a1753e2b0b3 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 30 Nov 2013 12:47:41 +0800 Subject: ceph: check inode caps in ceph_d_revalidate Some inodes in readdir reply may have no caps. Getattr mds request for these inodes can return -ESTALE. The fix is consider dentry that links to inode with no caps as invalid. Invalid dentry causes a lookup request to send to the mds, the MDS will send caps back. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 12 ++++++++++++ fs/ceph/dir.c | 11 ++++++++--- fs/ceph/super.h | 1 + 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index d2154d6..d65ff33 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -891,6 +891,18 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci) return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0; } +int ceph_is_any_caps(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + spin_lock(&ci->i_ceph_lock); + ret = __ceph_is_any_caps(ci); + spin_unlock(&ci->i_ceph_lock); + + return ret; +} + /* * Remove a cap. Take steps to deal with a racing iterate_session_caps. * diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index b629e9d..619616d 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1041,14 +1041,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) valid = 1; } else if (dentry_lease_is_valid(dentry) || dir_lease_is_valid(dir, dentry)) { - valid = 1; + if (dentry->d_inode) + valid = ceph_is_any_caps(dentry->d_inode); + else + valid = 1; } dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); - if (valid) + if (valid) { ceph_dentry_lru_touch(dentry); - else + } else { + ceph_dir_clear_complete(dir); d_drop(dentry); + } iput(dir); return valid; } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 891cda8..a6ba32f 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -782,6 +782,7 @@ extern int ceph_add_cap(struct inode *inode, extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); extern void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap); +extern int ceph_is_any_caps(struct inode *inode); extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino, u64 cap_id, u32 migrate_seq, u32 issue_seq); -- cgit v1.1 From 186e4f7a4b1883f3f46aa15366c0bcebc28fdda7 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 22 Nov 2013 14:48:37 +0800 Subject: ceph: handle session flush message Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 19 +++++++++++++++++++ fs/ceph/strings.c | 2 ++ include/linux/ceph/ceph_fs.h | 2 ++ 3 files changed, 23 insertions(+) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 1fd655a..7c00dd5 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1137,6 +1137,21 @@ static int send_renew_caps(struct ceph_mds_client *mdsc, return 0; } +static int send_flushmsg_ack(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, u64 seq) +{ + struct ceph_msg *msg; + + dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", + session->s_mds, session_state_name(session->s_state), seq); + msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); + if (!msg) + return -ENOMEM; + ceph_con_send(&session->s_con, msg); + return 0; +} + + /* * Note new cap ttl, and any transition from stale -> not stale (fresh?). * @@ -2396,6 +2411,10 @@ static void handle_session(struct ceph_mds_session *session, trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); break; + case CEPH_SESSION_FLUSHMSG: + send_flushmsg_ack(mdsc, session, seq); + break; + default: pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); WARN_ON(1); diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c index 89fa4a9..4440f44 100644 --- a/fs/ceph/strings.c +++ b/fs/ceph/strings.c @@ -41,6 +41,8 @@ const char *ceph_session_op_name(int op) case CEPH_SESSION_RENEWCAPS: return "renewcaps"; case CEPH_SESSION_STALE: return "stale"; case CEPH_SESSION_RECALL_STATE: return "recall_state"; + case CEPH_SESSION_FLUSHMSG: return "flushmsg"; + case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack"; } return "???"; } diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 2ad7b86..26bb587 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -282,6 +282,8 @@ enum { CEPH_SESSION_RENEWCAPS, CEPH_SESSION_STALE, CEPH_SESSION_RECALL_STATE, + CEPH_SESSION_FLUSHMSG, + CEPH_SESSION_FLUSHMSG_ACK, }; extern const char *ceph_session_op_name(int op); -- cgit v1.1 From 4ee6a914edbbd2543884f0ad7d58ea471136be32 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sun, 24 Nov 2013 14:43:46 +0800 Subject: ceph: remove exported caps when handling cap import message Version 3 cap import message includes the ID of the exported caps. It allow us to remove the exported caps if we still haven't received the corresponding cap export message. We remove the exported caps because they are stale, keeping them can compromise consistence. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 79 +++++++++++++++++++++++++++++--------------- include/linux/ceph/ceph_fs.h | 11 +++++- 2 files changed, 62 insertions(+), 28 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index d65ff33..98f3ca4 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -611,6 +611,7 @@ retry: if (ci->i_auth_cap == NULL || ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) ci->i_auth_cap = cap; + ci->i_cap_exporting_issued = 0; } else if (ci->i_auth_cap == cap) { ci->i_auth_cap = NULL; spin_lock(&mdsc->cap_dirty_lock); @@ -2823,10 +2824,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, */ static void handle_cap_import(struct ceph_mds_client *mdsc, struct inode *inode, struct ceph_mds_caps *im, + struct ceph_mds_cap_peer *ph, struct ceph_mds_session *session, void *snaptrace, int snaptrace_len) { struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_cap *cap; int mds = session->s_mds; unsigned issued = le32_to_cpu(im->caps); unsigned wanted = le32_to_cpu(im->wanted); @@ -2834,28 +2837,44 @@ static void handle_cap_import(struct ceph_mds_client *mdsc, unsigned mseq = le32_to_cpu(im->migrate_seq); u64 realmino = le64_to_cpu(im->realm); u64 cap_id = le64_to_cpu(im->cap_id); + u64 p_cap_id; + int peer; - if (ci->i_cap_exporting_mds >= 0 && - ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) { - dout("handle_cap_import inode %p ci %p mds%d mseq %d" - " - cleared exporting from mds%d\n", - inode, ci, mds, mseq, - ci->i_cap_exporting_mds); - ci->i_cap_exporting_issued = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_mds = -1; + if (ph) { + p_cap_id = le64_to_cpu(ph->cap_id); + peer = le32_to_cpu(ph->mds); + } else { + p_cap_id = 0; + peer = -1; + } - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p back to cap_dirty\n", inode); - list_move(&ci->i_dirty_item, &mdsc->cap_dirty); + dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", + inode, ci, mds, mseq, peer); + + spin_lock(&ci->i_ceph_lock); + cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; + if (cap && cap->cap_id == p_cap_id) { + dout(" remove export cap %p mds%d flags %d\n", + cap, peer, ph->flags); + if ((ph->flags & CEPH_CAP_FLAG_AUTH) && + (cap->seq != le32_to_cpu(ph->seq) || + cap->mseq != le32_to_cpu(ph->mseq))) { + pr_err("handle_cap_import: mismatched seq/mseq: " + "ino (%llx.%llx) mds%d seq %d mseq %d " + "importer mds%d has peer seq %d mseq %d\n", + ceph_vinop(inode), peer, cap->seq, + cap->mseq, mds, le32_to_cpu(ph->seq), + le32_to_cpu(ph->mseq)); } - spin_unlock(&mdsc->cap_dirty_lock); - } else { - dout("handle_cap_import inode %p ci %p mds%d mseq %d\n", - inode, ci, mds, mseq); + ci->i_cap_exporting_issued = cap->issued; + __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); } + /* make sure we re-request max_size, if necessary */ + ci->i_wanted_max_size = 0; + ci->i_requested_max_size = 0; + spin_unlock(&ci->i_ceph_lock); + down_write(&mdsc->snap_rwsem); ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, false); @@ -2866,11 +2885,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc, kick_flushing_inode_caps(mdsc, session, inode); up_read(&mdsc->snap_rwsem); - /* make sure we re-request max_size, if necessary */ - spin_lock(&ci->i_ceph_lock); - ci->i_wanted_max_size = 0; /* reset */ - ci->i_requested_max_size = 0; - spin_unlock(&ci->i_ceph_lock); } /* @@ -2888,6 +2902,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_inode_info *ci; struct ceph_cap *cap; struct ceph_mds_caps *h; + struct ceph_mds_cap_peer *peer = NULL; int mds = session->s_mds; int op; u32 seq, mseq; @@ -2898,12 +2913,14 @@ void ceph_handle_caps(struct ceph_mds_session *session, void *snaptrace; size_t snaptrace_len; void *flock; + void *end; u32 flock_len; int open_target_sessions = 0; dout("handle_caps from mds%d\n", mds); /* decode */ + end = msg->front.iov_base + msg->front.iov_len; tid = le64_to_cpu(msg->hdr.tid); if (msg->front.iov_len < sizeof(*h)) goto bad; @@ -2921,17 +2938,25 @@ void ceph_handle_caps(struct ceph_mds_session *session, snaptrace_len = le32_to_cpu(h->snap_trace_len); if (le16_to_cpu(msg->hdr.version) >= 2) { - void *p, *end; - - p = snaptrace + snaptrace_len; - end = msg->front.iov_base + msg->front.iov_len; + void *p = snaptrace + snaptrace_len; ceph_decode_32_safe(&p, end, flock_len, bad); + if (p + flock_len > end) + goto bad; flock = p; } else { flock = NULL; flock_len = 0; } + if (le16_to_cpu(msg->hdr.version) >= 3) { + if (op == CEPH_CAP_OP_IMPORT) { + void *p = flock + flock_len; + if (p + sizeof(*peer) > end) + goto bad; + peer = p; + } + } + mutex_lock(&session->s_mutex); session->s_seq++; dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, @@ -2968,7 +2993,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, goto done; case CEPH_CAP_OP_IMPORT: - handle_cap_import(mdsc, inode, h, session, + handle_cap_import(mdsc, inode, h, peer, session, snaptrace, snaptrace_len); } diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 26bb587..0a37b98 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -459,7 +459,8 @@ struct ceph_mds_reply_cap { __u8 flags; /* CEPH_CAP_FLAG_* */ } __attribute__ ((packed)); -#define CEPH_CAP_FLAG_AUTH 1 /* cap is issued by auth mds */ +#define CEPH_CAP_FLAG_AUTH (1 << 0) /* cap is issued by auth mds */ +#define CEPH_CAP_FLAG_RELEASE (1 << 1) /* release the cap */ /* inode record, for bundling with mds reply */ struct ceph_mds_reply_inode { @@ -660,6 +661,14 @@ struct ceph_mds_caps { __le32 time_warp_seq; } __attribute__ ((packed)); +struct ceph_mds_cap_peer { + __le64 cap_id; + __le32 seq; + __le32 mseq; + __le32 mds; + __u8 flags; +} __attribute__ ((packed)); + /* cap release msg head */ struct ceph_mds_cap_release { __le32 num; /* number of cap_items that follow */ -- cgit v1.1 From 5d72d13c425bb41f7752962f168fb402b86b7ac0 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sun, 24 Nov 2013 14:33:01 +0800 Subject: ceph: add open export target session helper Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 51 ++++++++++++++++++++++++++++++++++++--------------- fs/ceph/mds_client.h | 2 ++ 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 7c00dd5..f4f050a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -847,35 +847,56 @@ static int __open_session(struct ceph_mds_client *mdsc, * * called under mdsc->mutex */ +static struct ceph_mds_session * +__open_export_target_session(struct ceph_mds_client *mdsc, int target) +{ + struct ceph_mds_session *session; + + session = __ceph_lookup_mds_session(mdsc, target); + if (!session) { + session = register_session(mdsc, target); + if (IS_ERR(session)) + return session; + } + if (session->s_state == CEPH_MDS_SESSION_NEW || + session->s_state == CEPH_MDS_SESSION_CLOSING) + __open_session(mdsc, session); + + return session; +} + +struct ceph_mds_session * +ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) +{ + struct ceph_mds_session *session; + + dout("open_export_target_session to mds%d\n", target); + + mutex_lock(&mdsc->mutex); + session = __open_export_target_session(mdsc, target); + mutex_unlock(&mdsc->mutex); + + return session; +} + static void __open_export_target_sessions(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_mds_info *mi; struct ceph_mds_session *ts; int i, mds = session->s_mds; - int target; if (mds >= mdsc->mdsmap->m_max_mds) return; + mi = &mdsc->mdsmap->m_info[mds]; dout("open_export_target_sessions for mds%d (%d targets)\n", session->s_mds, mi->num_export_targets); for (i = 0; i < mi->num_export_targets; i++) { - target = mi->export_targets[i]; - ts = __ceph_lookup_mds_session(mdsc, target); - if (!ts) { - ts = register_session(mdsc, target); - if (IS_ERR(ts)) - return; - } - if (session->s_state == CEPH_MDS_SESSION_NEW || - session->s_state == CEPH_MDS_SESSION_CLOSING) - __open_session(mdsc, session); - else - dout(" mds%d target mds%d %p is %s\n", session->s_mds, - i, ts, session_state_name(ts->s_state)); - ceph_put_mds_session(ts); + ts = __open_export_target_session(mdsc, mi->export_targets[i]); + if (!IS_ERR(ts)) + ceph_put_mds_session(ts); } } diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 4c053d0..6828891 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -383,6 +383,8 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg); +extern struct ceph_mds_session * +ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target); extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); -- cgit v1.1 From 11df2dfb610d68e8050c2183c344b1002351a99d Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sun, 24 Nov 2013 14:44:38 +0800 Subject: ceph: add imported caps when handling cap export message Version 3 cap export message includes information about the imported caps. It allows us to add the imported caps if the corresponding cap import message still hasn't been received. This allow us to handle situation that the importer MDS crashes and the cap import message is missing. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 220 ++++++++++++++++++++++++++++++++++++-------------------- fs/ceph/inode.c | 4 +- fs/ceph/super.h | 4 +- 3 files changed, 146 insertions(+), 82 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 98f3ca4..1754338 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -555,21 +555,34 @@ retry: cap->ci = ci; __insert_cap_node(ci, cap); - /* clear out old exporting info? (i.e. on cap import) */ - if (ci->i_cap_exporting_mds == mds) { - ci->i_cap_exporting_issued = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_mds = -1; - } - /* add to session cap list */ cap->session = session; spin_lock(&session->s_cap_lock); list_add_tail(&cap->session_caps, &session->s_caps); session->s_nr_caps++; spin_unlock(&session->s_cap_lock); - } else if (new_cap) - ceph_put_cap(mdsc, new_cap); + } else { + if (new_cap) + ceph_put_cap(mdsc, new_cap); + + /* + * auth mds of the inode changed. we received the cap export + * message, but still haven't received the cap import message. + * handle_cap_export() updated the new auth MDS' cap. + * + * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing + * a message that was send before the cap import message. So + * don't remove caps. + */ + if (ceph_seq_cmp(seq, cap->seq) <= 0) { + WARN_ON(cap != ci->i_auth_cap); + WARN_ON(cap->cap_id != cap_id); + seq = cap->seq; + mseq = cap->mseq; + issued |= cap->issued; + flags |= CEPH_CAP_FLAG_AUTH; + } + } if (!ci->i_snap_realm) { /* @@ -612,15 +625,8 @@ retry: ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) ci->i_auth_cap = cap; ci->i_cap_exporting_issued = 0; - } else if (ci->i_auth_cap == cap) { - ci->i_auth_cap = NULL; - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p to cap_dirty_migrating\n", inode); - list_move(&ci->i_dirty_item, - &mdsc->cap_dirty_migrating); - } - spin_unlock(&mdsc->cap_dirty_lock); + } else { + WARN_ON(ci->i_auth_cap == cap); } dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", @@ -889,7 +895,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) */ static int __ceph_is_any_caps(struct ceph_inode_info *ci) { - return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0; + return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued; } int ceph_is_any_caps(struct inode *inode) @@ -1396,13 +1402,10 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ci->i_snap_realm->cached_context); dout(" inode %p now dirty snapc %p auth cap %p\n", &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); + WARN_ON(!ci->i_auth_cap); BUG_ON(!list_empty(&ci->i_dirty_item)); spin_lock(&mdsc->cap_dirty_lock); - if (ci->i_auth_cap) - list_add(&ci->i_dirty_item, &mdsc->cap_dirty); - else - list_add(&ci->i_dirty_item, - &mdsc->cap_dirty_migrating); + list_add(&ci->i_dirty_item, &mdsc->cap_dirty); spin_unlock(&mdsc->cap_dirty_lock); if (ci->i_flushing_caps == 0) { ihold(inode); @@ -2421,6 +2424,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, inode->i_size); + + /* + * auth mds of the inode changed. we received the cap export message, + * but still haven't received the cap import message. handle_cap_export + * updated the new auth MDS' cap. + * + * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message + * that was sent before the cap import message. So don't remove caps. + */ + if (ceph_seq_cmp(seq, cap->seq) <= 0) { + WARN_ON(cap != ci->i_auth_cap); + WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id)); + seq = cap->seq; + newcaps |= cap->issued; + } + /* * If CACHE is being revoked, and we have no dirty buffers, * try to invalidate (once). (If there are dirty buffers, we @@ -2447,6 +2466,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, issued |= implemented | __ceph_caps_dirty(ci); cap->cap_gen = session->s_cap_gen; + cap->seq = seq; __check_cap_issue(ci, cap, newcaps); @@ -2497,6 +2517,10 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, &atime); + + /* file layout may have changed */ + ci->i_layout = grant->layout; + /* max size increase? */ if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); @@ -2525,11 +2549,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, check_caps = 1; } - cap->seq = seq; - - /* file layout may have changed */ - ci->i_layout = grant->layout; - /* revocation, grant, or no-op? */ if (cap->issued & ~newcaps) { int revoking = cap->issued & ~newcaps; @@ -2755,65 +2774,114 @@ static void handle_cap_trunc(struct inode *inode, * caller holds s_mutex */ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, - struct ceph_mds_session *session, - int *open_target_sessions) + struct ceph_mds_cap_peer *ph, + struct ceph_mds_session *session) { struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct ceph_mds_session *tsession = NULL; + struct ceph_cap *cap, *tcap; struct ceph_inode_info *ci = ceph_inode(inode); - int mds = session->s_mds; + u64 t_cap_id; unsigned mseq = le32_to_cpu(ex->migrate_seq); - struct ceph_cap *cap = NULL, *t; - struct rb_node *p; - int remember = 1; + unsigned t_seq, t_mseq; + int target, issued; + int mds = session->s_mds; - dout("handle_cap_export inode %p ci %p mds%d mseq %d\n", - inode, ci, mds, mseq); + if (ph) { + t_cap_id = le64_to_cpu(ph->cap_id); + t_seq = le32_to_cpu(ph->seq); + t_mseq = le32_to_cpu(ph->mseq); + target = le32_to_cpu(ph->mds); + } else { + t_cap_id = t_seq = t_mseq = 0; + target = -1; + } + dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n", + inode, ci, mds, mseq, target); +retry: spin_lock(&ci->i_ceph_lock); + cap = __get_cap_for_mds(ci, mds); + if (!cap) + goto out_unlock; - /* make sure we haven't seen a higher mseq */ - for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { - t = rb_entry(p, struct ceph_cap, ci_node); - if (ceph_seq_cmp(t->mseq, mseq) > 0) { - dout(" higher mseq on cap from mds%d\n", - t->session->s_mds); - remember = 0; - } - if (t->session->s_mds == mds) - cap = t; + if (target < 0) { + __ceph_remove_cap(cap, false); + goto out_unlock; } - if (cap) { - if (remember) { - /* make note */ - ci->i_cap_exporting_mds = mds; - ci->i_cap_exporting_mseq = mseq; - ci->i_cap_exporting_issued = cap->issued; - - /* - * make sure we have open sessions with all possible - * export targets, so that we get the matching IMPORT - */ - *open_target_sessions = 1; + /* + * now we know we haven't received the cap import message yet + * because the exported cap still exist. + */ - /* - * we can't flush dirty caps that we've seen the - * EXPORT but no IMPORT for - */ - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p to cap_dirty_migrating\n", - inode); - list_move(&ci->i_dirty_item, - &mdsc->cap_dirty_migrating); + issued = cap->issued; + WARN_ON(issued != cap->implemented); + + tcap = __get_cap_for_mds(ci, target); + if (tcap) { + /* already have caps from the target */ + if (tcap->cap_id != t_cap_id || + ceph_seq_cmp(tcap->seq, t_seq) < 0) { + dout(" updating import cap %p mds%d\n", tcap, target); + tcap->cap_id = t_cap_id; + tcap->seq = t_seq - 1; + tcap->issue_seq = t_seq - 1; + tcap->mseq = t_mseq; + tcap->issued |= issued; + tcap->implemented |= issued; + if (cap == ci->i_auth_cap) + ci->i_auth_cap = tcap; + if (ci->i_flushing_caps && ci->i_auth_cap == tcap) { + spin_lock(&mdsc->cap_dirty_lock); + list_move_tail(&ci->i_flushing_item, + &tcap->session->s_cap_flushing); + spin_unlock(&mdsc->cap_dirty_lock); } - spin_unlock(&mdsc->cap_dirty_lock); } __ceph_remove_cap(cap, false); + goto out_unlock; + } + + if (tsession) { + int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; + spin_unlock(&ci->i_ceph_lock); + /* add placeholder for the export tagert */ + ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, + t_seq - 1, t_mseq, (u64)-1, flag, NULL); + goto retry; } - /* else, we already released it */ spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&session->s_mutex); + + /* open target session */ + tsession = ceph_mdsc_open_export_target_session(mdsc, target); + if (!IS_ERR(tsession)) { + if (mds > target) { + mutex_lock(&session->s_mutex); + mutex_lock_nested(&tsession->s_mutex, + SINGLE_DEPTH_NESTING); + } else { + mutex_lock(&tsession->s_mutex); + mutex_lock_nested(&session->s_mutex, + SINGLE_DEPTH_NESTING); + } + ceph_add_cap_releases(mdsc, tsession); + } else { + WARN_ON(1); + tsession = NULL; + target = -1; + } + goto retry; + +out_unlock: + spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&session->s_mutex); + if (tsession) { + mutex_unlock(&tsession->s_mutex); + ceph_put_mds_session(tsession); + } } /* @@ -2915,7 +2983,6 @@ void ceph_handle_caps(struct ceph_mds_session *session, void *flock; void *end; u32 flock_len; - int open_target_sessions = 0; dout("handle_caps from mds%d\n", mds); @@ -2954,6 +3021,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, if (p + sizeof(*peer) > end) goto bad; peer = p; + } else if (op == CEPH_CAP_OP_EXPORT) { + /* recorded in unused fields */ + peer = (void *)&h->size; } } @@ -2989,8 +3059,8 @@ void ceph_handle_caps(struct ceph_mds_session *session, goto done; case CEPH_CAP_OP_EXPORT: - handle_cap_export(inode, h, session, &open_target_sessions); - goto done; + handle_cap_export(inode, h, peer, session); + goto done_unlocked; case CEPH_CAP_OP_IMPORT: handle_cap_import(mdsc, inode, h, peer, session, @@ -3045,8 +3115,6 @@ done: done_unlocked: if (inode) iput(inode); - if (open_target_sessions) - ceph_mdsc_open_export_target_sessions(mdsc, session); return; bad: diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 3db97ba..6fc10a7 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -336,12 +336,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_hold_caps_min = 0; ci->i_hold_caps_max = 0; INIT_LIST_HEAD(&ci->i_cap_delay_list); - ci->i_cap_exporting_mds = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_issued = 0; INIT_LIST_HEAD(&ci->i_cap_snaps); ci->i_head_snapc = NULL; ci->i_snap_caps = 0; + ci->i_cap_exporting_issued = 0; for (i = 0; i < CEPH_FILE_MODE_NUM; i++) ci->i_nr_by_mode[i] = 0; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index a6ba32f..c299f7d 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -287,14 +287,12 @@ struct ceph_inode_info { unsigned long i_hold_caps_min; /* jiffies */ unsigned long i_hold_caps_max; /* jiffies */ struct list_head i_cap_delay_list; /* for delayed cap release to mds */ - int i_cap_exporting_mds; /* to handle cap migration between */ - unsigned i_cap_exporting_mseq; /* mds's. */ - unsigned i_cap_exporting_issued; struct ceph_cap_reservation i_cap_migration_resv; struct list_head i_cap_snaps; /* snapped state pending flush to mds */ struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or dirty|flushing caps */ unsigned i_snap_caps; /* cap bits for snapped files */ + unsigned i_cap_exporting_issued; int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ -- cgit v1.1 From 80213a84a96c3040f5824bce646a184d5dd3dd2b Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 21 Jan 2014 11:07:16 +0800 Subject: libceph: support CEPH_FEATURE_EXPORT_PEER Signed-off-by: Yan, Zheng --- include/linux/ceph/ceph_features.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index 5f42e44..d21b34d 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -80,7 +80,8 @@ static inline u64 ceph_sanitize_features(u64 features) CEPH_FEATURE_CRUSH_TUNABLES2 | \ CEPH_FEATURE_REPLY_CREATE_INODE | \ CEPH_FEATURE_OSDHASHPSPOOL | \ - CEPH_FEATURE_CRUSH_V2) + CEPH_FEATURE_CRUSH_V2 | \ + CEPH_FEATURE_EXPORT_PEER) #define CEPH_FEATURES_REQUIRED_DEFAULT \ (CEPH_FEATURE_NOSRCADDR | \ -- cgit v1.1 From eeb0bed5572b1282009dfc2635604df5a35d1a02 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 9 Jan 2014 20:08:21 +0200 Subject: libceph: add ceph_kv{malloc,free}() and switch to them Encapsulate kmalloc vs vmalloc memory allocation and freeing logic into two helpers, ceph_kvmalloc() and ceph_kvfree(), and switch to them. ceph_kvmalloc() kmalloc()'s a maximum of 8 pages, anything bigger is vmalloc()'ed with __GFP_HIGHMEM set. This changes the existing behaviour: - for buffers (ceph_buffer_new()), from trying to kmalloc() everything and using vmalloc() just as a fallback - for messages (ceph_msg_new()), from going to vmalloc() for anything bigger than a page - for messages (ceph_msg_new()), from disallowing vmalloc() to use high memory Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/buffer.h | 1 - include/linux/ceph/libceph.h | 11 +++++++---- include/linux/ceph/messenger.h | 1 - net/ceph/buffer.c | 22 ++++++---------------- net/ceph/ceph_common.c | 20 ++++++++++++++++++++ net/ceph/messenger.c | 13 ++----------- 6 files changed, 35 insertions(+), 33 deletions(-) diff --git a/include/linux/ceph/buffer.h b/include/linux/ceph/buffer.h index 58d1901..07ad423 100644 --- a/include/linux/ceph/buffer.h +++ b/include/linux/ceph/buffer.h @@ -17,7 +17,6 @@ struct ceph_buffer { struct kref kref; struct kvec vec; size_t alloc_len; - bool is_vmalloc; }; extern struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp); diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 7d704db..2f49aa4 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -173,15 +173,18 @@ static inline int calc_pages_for(u64 off, u64 len) (off >> PAGE_CACHE_SHIFT); } +extern struct kmem_cache *ceph_inode_cachep; +extern struct kmem_cache *ceph_cap_cachep; +extern struct kmem_cache *ceph_dentry_cachep; +extern struct kmem_cache *ceph_file_cachep; + /* ceph_common.c */ extern bool libceph_compatible(void *data); extern const char *ceph_msg_type_name(int type); extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid); -extern struct kmem_cache *ceph_inode_cachep; -extern struct kmem_cache *ceph_cap_cachep; -extern struct kmem_cache *ceph_dentry_cachep; -extern struct kmem_cache *ceph_file_cachep; +extern void *ceph_kvmalloc(size_t size, gfp_t flags); +extern void ceph_kvfree(const void *ptr); extern struct ceph_options *ceph_parse_options(char *options, const char *dev_name, const char *dev_name_end, diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 861138f..20ee8b6 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -154,7 +154,6 @@ struct ceph_msg { struct list_head list_head; /* links for connection lists */ struct kref kref; - bool front_is_vmalloc; bool more_to_follow; bool needs_out_seq; int front_alloc_len; diff --git a/net/ceph/buffer.c b/net/ceph/buffer.c index bf3e6a1..621b5f6 100644 --- a/net/ceph/buffer.c +++ b/net/ceph/buffer.c @@ -6,6 +6,7 @@ #include #include +#include /* for ceph_kv{malloc,free} */ struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp) { @@ -15,16 +16,10 @@ struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp) if (!b) return NULL; - b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN); - if (b->vec.iov_base) { - b->is_vmalloc = false; - } else { - b->vec.iov_base = __vmalloc(len, gfp | __GFP_HIGHMEM, PAGE_KERNEL); - if (!b->vec.iov_base) { - kfree(b); - return NULL; - } - b->is_vmalloc = true; + b->vec.iov_base = ceph_kvmalloc(len, gfp); + if (!b->vec.iov_base) { + kfree(b); + return NULL; } kref_init(&b->kref); @@ -40,12 +35,7 @@ void ceph_buffer_release(struct kref *kref) struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref); dout("buffer_release %p\n", b); - if (b->vec.iov_base) { - if (b->is_vmalloc) - vfree(b->vec.iov_base); - else - kfree(b->vec.iov_base); - } + ceph_kvfree(b->vec.iov_base); kfree(b); } EXPORT_SYMBOL(ceph_buffer_release); diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 43d8177..67d7721 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -170,6 +171,25 @@ int ceph_compare_options(struct ceph_options *new_opt, } EXPORT_SYMBOL(ceph_compare_options); +void *ceph_kvmalloc(size_t size, gfp_t flags) +{ + if (size <= (PAGE_SIZE << PAGE_ALLOC_COSTLY_ORDER)) { + void *ptr = kmalloc(size, flags | __GFP_NOWARN); + if (ptr) + return ptr; + } + + return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL); +} + +void ceph_kvfree(const void *ptr) +{ + if (is_vmalloc_addr(ptr)) + vfree(ptr); + else + kfree(ptr); +} + static int parse_fsid(const char *str, struct ceph_fsid *fsid) { diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 252ad4e..2ed1304 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -3131,13 +3131,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, /* front */ if (front_len) { - if (front_len > PAGE_CACHE_SIZE) { - m->front.iov_base = __vmalloc(front_len, flags, - PAGE_KERNEL); - m->front_is_vmalloc = true; - } else { - m->front.iov_base = kmalloc(front_len, flags); - } + m->front.iov_base = ceph_kvmalloc(front_len, flags); if (m->front.iov_base == NULL) { dout("ceph_msg_new can't allocate %d bytes\n", front_len); @@ -3259,10 +3253,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) void ceph_msg_kfree(struct ceph_msg *m) { dout("msg_kfree %p\n", m); - if (m->front_is_vmalloc) - vfree(m->front.iov_base); - else - kfree(m->front.iov_base); + ceph_kvfree(m->front.iov_base); kmem_cache_free(ceph_msg_cache, m); } -- cgit v1.1 From 0b4af2e8c9f3fc9c31d2f9374b79af2c890ef897 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 16 Jan 2014 19:18:27 +0200 Subject: libceph: dout() is missing a newline Add a missing newline to a dout() in __reset_osd(). Signed-off-by: Ilya Dryomov --- net/ceph/osd_client.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 7331951..959d332 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1044,8 +1044,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) !ceph_con_opened(&osd->o_con)) { struct ceph_osd_request *req; - dout(" osd addr hasn't changed and connection never opened," - " letting msgr retry"); + dout("osd addr hasn't changed and connection never opened, " + "letting msgr retry\n"); /* touch each r_stamp for handle_timeout()'s benfit */ list_for_each_entry(req, &osd->o_requests, r_osd_item) req->r_stamp = jiffies; -- cgit v1.1 From 22116525baec1d63f4878eaa92f0b57946a78819 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:18 +0200 Subject: libceph: start using oloc abstraction Instead of relying on pool fields in ceph_file_layout (for mapping) and ceph_pg (for enconding), start using ceph_object_locator (oloc) abstraction. Note that userspace oloc currently consists of pool, key, nspace and hash fields, while this one contains only a pool. This is OK, because at this point we only send (i.e. encode) olocs and never have to receive (i.e. decode) them. This makes keeping a copy of ceph_file_layout in every osd request unnecessary, so ceph_osd_request::r_file_layout field is nuked. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- drivers/block/rbd.c | 8 ++++---- include/linux/ceph/osd_client.h | 3 ++- include/linux/ceph/osdmap.h | 3 +-- net/ceph/osd_client.c | 8 +++++--- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 72a7eec..6614e8d 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1808,12 +1808,12 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req->r_callback = rbd_osd_req_callback; osd_req->r_priv = obj_request; + osd_req->r_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + osd_req->r_oid_len = strlen(obj_request->object_name); rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); - osd_req->r_file_layout = rbd_dev->layout; /* struct */ - return osd_req; } @@ -1849,12 +1849,12 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) osd_req->r_callback = rbd_osd_req_callback; osd_req->r_priv = obj_request; + osd_req->r_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + osd_req->r_oid_len = strlen(obj_request->object_name); rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); - osd_req->r_file_layout = rbd_dev->layout; /* struct */ - return osd_req; } diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 4fb6a89..5b85551 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -159,12 +159,13 @@ struct ceph_osd_request { struct inode *r_inode; /* for use by callbacks */ void *r_priv; /* ditto */ + struct ceph_object_locator r_oloc; + char r_oid[MAX_OBJ_NAME_SIZE]; /* object name */ int r_oid_len; u64 r_snapid; unsigned long r_stamp; /* send OR check time */ - struct ceph_file_layout r_file_layout; struct ceph_snap_context *r_snapc; /* snap context for writes */ }; diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index d05cc44..256134a 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -40,8 +40,7 @@ struct ceph_pg_pool_info { }; struct ceph_object_locator { - uint64_t pool; - char *key; + s64 pool; }; struct ceph_pg_mapping { diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 959d332..7130c5c 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -368,6 +368,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); + req->r_oloc.pool = -1; + /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); @@ -761,7 +763,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (num_ops > 1) osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); - req->r_file_layout = *layout; /* keep a copy */ + req->r_oloc.pool = ceph_file_layout_pg_pool(*layout); snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, objnum); @@ -1268,7 +1270,7 @@ static int __map_request(struct ceph_osd_client *osdc, dout("map_request %p tid %lld\n", req, req->r_tid); err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, - ceph_file_layout_pg_pool(req->r_file_layout)); + req->r_oloc.pool); if (err) { list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; @@ -1354,7 +1356,7 @@ static void __send_request(struct ceph_osd_client *osdc, /* fill in message content that changes each time we send it */ put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); put_unaligned_le32(req->r_flags, req->r_request_flags); - put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); + put_unaligned_le64(req->r_oloc.pool, req->r_request_pool); p = req->r_request_pgid; ceph_encode_64(&p, req->r_pgid.pool); ceph_encode_32(&p, req->r_pgid.seed); -- cgit v1.1 From e8221464fc2bc8c9f7b0c2115abbd75ba23f210a Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:18 +0200 Subject: libceph: move ceph_file_layout helpers to ceph_fs.h Move ceph_file_layout helper macros and inline functions to ceph_fs.h. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/ceph_fs.h | 23 +++++++++++++++++++++++ include/linux/ceph/osdmap.h | 27 --------------------------- 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 0a37b98..2623cff 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -53,6 +53,29 @@ struct ceph_file_layout { __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ } __attribute__ ((packed)); +#define ceph_file_layout_su(l) ((__s32)le32_to_cpu((l).fl_stripe_unit)) +#define ceph_file_layout_stripe_count(l) \ + ((__s32)le32_to_cpu((l).fl_stripe_count)) +#define ceph_file_layout_object_size(l) ((__s32)le32_to_cpu((l).fl_object_size)) +#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash)) +#define ceph_file_layout_object_su(l) \ + ((__s32)le32_to_cpu((l).fl_object_stripe_unit)) +#define ceph_file_layout_pg_pool(l) \ + ((__s32)le32_to_cpu((l).fl_pg_pool)) + +static inline unsigned ceph_file_layout_stripe_width(struct ceph_file_layout *l) +{ + return le32_to_cpu(l->fl_stripe_unit) * + le32_to_cpu(l->fl_stripe_count); +} + +/* "period" == bytes before i start on a new set of objects */ +static inline unsigned ceph_file_layout_period(struct ceph_file_layout *l) +{ + return le32_to_cpu(l->fl_object_size) * + le32_to_cpu(l->fl_stripe_count); +} + #define CEPH_MIN_STRIPE_UNIT 65536 int ceph_file_layout_is_valid(const struct ceph_file_layout *layout); diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 256134a..f2679c3 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -72,33 +72,6 @@ struct ceph_osdmap { struct crush_map *crush; }; -/* - * file layout helpers - */ -#define ceph_file_layout_su(l) ((__s32)le32_to_cpu((l).fl_stripe_unit)) -#define ceph_file_layout_stripe_count(l) \ - ((__s32)le32_to_cpu((l).fl_stripe_count)) -#define ceph_file_layout_object_size(l) ((__s32)le32_to_cpu((l).fl_object_size)) -#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash)) -#define ceph_file_layout_object_su(l) \ - ((__s32)le32_to_cpu((l).fl_object_stripe_unit)) -#define ceph_file_layout_pg_pool(l) \ - ((__s32)le32_to_cpu((l).fl_pg_pool)) - -static inline unsigned ceph_file_layout_stripe_width(struct ceph_file_layout *l) -{ - return le32_to_cpu(l->fl_stripe_unit) * - le32_to_cpu(l->fl_stripe_count); -} - -/* "period" == bytes before i start on a new set of objects */ -static inline unsigned ceph_file_layout_period(struct ceph_file_layout *l) -{ - return le32_to_cpu(l->fl_object_size) * - le32_to_cpu(l->fl_stripe_count); -} - - static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd) { return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP); -- cgit v1.1 From 2d0ebc5d591f49131bf8f93b54c5424162c3fb7f Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:18 +0200 Subject: libceph: rename MAX_OBJ_NAME_SIZE to CEPH_MAX_OID_NAME_LEN In preparation for adding oid abstraction, rename MAX_OBJ_NAME_SIZE to CEPH_MAX_OID_NAME_LEN. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- drivers/block/rbd.c | 6 +++--- include/linux/ceph/osd_client.h | 4 ++-- net/ceph/osd_client.c | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6614e8d..98792b2 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1088,9 +1088,9 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) name_format = "%s.%012llx"; if (rbd_dev->image_format == 2) name_format = "%s.%016llx"; - ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format, + ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format, rbd_dev->header.object_prefix, segment); - if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) { + if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) { pr_err("error formatting segment name for #%llu (%d)\n", segment, ret); kfree(name); @@ -5350,7 +5350,7 @@ static int rbd_slab_init(void) rbd_assert(!rbd_segment_name_cache); rbd_segment_name_cache = kmem_cache_create("rbd_segment_name", - MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL); + CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL); if (rbd_segment_name_cache) return 0; out_err: diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 5b85551..b42f158 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -16,7 +16,7 @@ * Maximum object name size * (must be at least as big as RBD_MAX_MD_NAME_LEN -- currently 100) */ -#define MAX_OBJ_NAME_SIZE 100 +#define CEPH_MAX_OID_NAME_LEN 100 struct ceph_msg; struct ceph_snap_context; @@ -161,7 +161,7 @@ struct ceph_osd_request { struct ceph_object_locator r_oloc; - char r_oid[MAX_OBJ_NAME_SIZE]; /* object name */ + char r_oid[CEPH_MAX_OID_NAME_LEN]; /* object name */ int r_oid_len; u64 r_snapid; unsigned long r_stamp; /* send OR check time */ diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 7130c5c..a053e7e 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, msg_size = 4 + 4 + 8 + 8 + 4+8; msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ msg_size += 1 + 8 + 4 + 4; /* pg_t */ - msg_size += 4 + MAX_OBJ_NAME_SIZE; + msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); msg_size += 8; /* snapid */ msg_size += 8; /* snap_seq */ -- cgit v1.1 From 4295f2217a5aa8ef2738e3a368db3c1ceab41212 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:18 +0200 Subject: libceph: introduce and start using oid abstraction In preparation for tiering support, which would require having two (base and target) object names for each osd request and also copying those names around, introduce struct ceph_object_id (oid) and a couple helpers to facilitate those copies and encapsulate the fact that object name is not necessarily a NUL-terminated string. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- drivers/block/rbd.c | 10 ++-------- include/linux/ceph/osd_client.h | 9 +-------- include/linux/ceph/osdmap.h | 36 ++++++++++++++++++++++++++++++++++++ net/ceph/debugfs.c | 3 ++- net/ceph/osd_client.c | 17 +++++++++-------- 5 files changed, 50 insertions(+), 25 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 98792b2..6fdc5fc 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1809,10 +1809,7 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req->r_priv = obj_request; osd_req->r_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); - - osd_req->r_oid_len = strlen(obj_request->object_name); - rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); - memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); + ceph_oid_set_name(&osd_req->r_oid, obj_request->object_name); return osd_req; } @@ -1850,10 +1847,7 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) osd_req->r_priv = obj_request; osd_req->r_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); - - osd_req->r_oid_len = strlen(obj_request->object_name); - rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); - memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); + ceph_oid_set_name(&osd_req->r_oid, obj_request->object_name); return osd_req; } diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index b42f158..8d8bb53 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -12,12 +12,6 @@ #include #include -/* - * Maximum object name size - * (must be at least as big as RBD_MAX_MD_NAME_LEN -- currently 100) - */ -#define CEPH_MAX_OID_NAME_LEN 100 - struct ceph_msg; struct ceph_snap_context; struct ceph_osd_request; @@ -160,9 +154,8 @@ struct ceph_osd_request { void *r_priv; /* ditto */ struct ceph_object_locator r_oloc; + struct ceph_object_id r_oid; - char r_oid[CEPH_MAX_OID_NAME_LEN]; /* object name */ - int r_oid_len; u64 r_snapid; unsigned long r_stamp; /* send OR check time */ diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index f2679c3..c85f7d4 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -43,6 +43,18 @@ struct ceph_object_locator { s64 pool; }; +/* + * Maximum supported by kernel client object name length + * + * (probably outdated: must be >= RBD_MAX_MD_NAME_LEN -- currently 100) + */ +#define CEPH_MAX_OID_NAME_LEN 100 + +struct ceph_object_id { + char name[CEPH_MAX_OID_NAME_LEN]; + int name_len; +}; + struct ceph_pg_mapping { struct rb_node node; struct ceph_pg pgid; @@ -72,6 +84,30 @@ struct ceph_osdmap { struct crush_map *crush; }; +static inline void ceph_oid_set_name(struct ceph_object_id *oid, + const char *name) +{ + int len; + + len = strlen(name); + if (len > sizeof(oid->name)) { + WARN(1, "ceph_oid_set_name '%s' len %d vs %zu, truncating\n", + name, len, sizeof(oid->name)); + len = sizeof(oid->name); + } + + memcpy(oid->name, name, len); + oid->name_len = len; +} + +static inline void ceph_oid_copy(struct ceph_object_id *dest, + struct ceph_object_id *src) +{ + BUG_ON(src->name_len > sizeof(dest->name)); + memcpy(dest->name, src->name, src->name_len); + dest->name_len = src->name_len; +} + static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd) { return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP); diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 83661cd..1f85627 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -132,7 +132,8 @@ static int osdc_show(struct seq_file *s, void *pp) req->r_osd ? req->r_osd->o_osd : -1, req->r_pgid.pool, req->r_pgid.seed); - seq_printf(s, "%.*s", req->r_oid_len, req->r_oid); + seq_printf(s, "%.*s", req->r_oid.name_len, + req->r_oid.name); if (req->r_reassert_version.epoch) seq_printf(s, "\t%u'%llu", diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a053e7e..2988d68 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -765,9 +765,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req->r_oloc.pool = ceph_file_layout_pg_pool(*layout); - snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", - vino.ino, objnum); - req->r_oid_len = strlen(req->r_oid); + snprintf(req->r_oid.name, sizeof(req->r_oid.name), + "%llx.%08llx", vino.ino, objnum); + req->r_oid.name_len = strlen(req->r_oid.name); return req; } @@ -1269,7 +1269,7 @@ static int __map_request(struct ceph_osd_client *osdc, bool was_paused; dout("map_request %p tid %lld\n", req, req->r_tid); - err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, + err = ceph_calc_ceph_pg(&pgid, req->r_oid.name, osdc->osdmap, req->r_oloc.pool); if (err) { list_move(&req->r_req_lru_item, &osdc->req_notarget); @@ -2118,10 +2118,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, ceph_encode_32(&p, -1); /* preferred */ /* oid */ - ceph_encode_32(&p, req->r_oid_len); - memcpy(p, req->r_oid, req->r_oid_len); - dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); - p += req->r_oid_len; + ceph_encode_32(&p, req->r_oid.name_len); + memcpy(p, req->r_oid.name, req->r_oid.name_len); + dout("oid '%.*s' len %d\n", req->r_oid.name_len, + req->r_oid.name, req->r_oid.name_len); + p += req->r_oid.name_len; /* ops--can imply data */ ceph_encode_16(&p, (u16)req->r_num_ops); -- cgit v1.1 From 7c13cb64352230deac24d3cb058387a6c0676f83 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:19 +0200 Subject: libceph: replace ceph_calc_ceph_pg() with ceph_oloc_oid_to_pg() Switch ceph_calc_ceph_pg() to new oloc and oid abstractions and rename it to ceph_oloc_oid_to_pg() to make its purpose more clear. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- fs/ceph/ioctl.c | 8 ++++++-- include/linux/ceph/osdmap.h | 7 +++++-- net/ceph/osd_client.c | 4 ++-- net/ceph/osdmap.c | 29 +++++++++++++++++------------ 4 files changed, 30 insertions(+), 18 deletions(-) diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 669622f..dc66c9e 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -183,6 +183,8 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->client->osdc; + struct ceph_object_locator oloc; + struct ceph_object_id oid; u64 len = 1, olen; u64 tmp; struct ceph_pg pgid; @@ -211,8 +213,10 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", ceph_ino(inode), dl.object_no); - r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, - ceph_file_layout_pg_pool(ci->i_layout)); + oloc.pool = ceph_file_layout_pg_pool(ci->i_layout); + ceph_oid_set_name(&oid, dl.object_name); + + r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid); if (r < 0) { up_read(&osdc->map_sem); return r; diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index c85f7d4..ebb8ec2 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -163,8 +163,11 @@ extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, u64 *bno, u64 *oxoff, u64 *oxlen); /* calculate mapping of object to a placement group */ -extern int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, - struct ceph_osdmap *osdmap, uint64_t pool); +extern int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, + struct ceph_object_locator *oloc, + struct ceph_object_id *oid, + struct ceph_pg *pg_out); + extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, int *acting); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 2988d68..10360de 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1269,8 +1269,8 @@ static int __map_request(struct ceph_osd_client *osdc, bool was_paused; dout("map_request %p tid %lld\n", req, req->r_tid); - err = ceph_calc_ceph_pg(&pgid, req->r_oid.name, osdc->osdmap, - req->r_oloc.pool); + err = ceph_oloc_oid_to_pg(osdc->osdmap, &req->r_oloc, &req->r_oid, + &pgid); if (err) { list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 8b1a6b4..768dd04 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1090,25 +1090,30 @@ invalid: EXPORT_SYMBOL(ceph_calc_file_object_mapping); /* - * calculate an object layout (i.e. pgid) from an oid, - * file_layout, and osdmap + * Calculate mapping of a (oloc, oid) pair to a PG. Should only be + * called with target's (oloc, oid), since tiering isn't taken into + * account. */ -int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, - struct ceph_osdmap *osdmap, uint64_t pool) +int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, + struct ceph_object_locator *oloc, + struct ceph_object_id *oid, + struct ceph_pg *pg_out) { - struct ceph_pg_pool_info *pool_info; + struct ceph_pg_pool_info *pi; - BUG_ON(!osdmap); - pool_info = __lookup_pg_pool(&osdmap->pg_pools, pool); - if (!pool_info) + pi = __lookup_pg_pool(&osdmap->pg_pools, oloc->pool); + if (!pi) return -EIO; - pg->pool = pool; - pg->seed = ceph_str_hash(pool_info->object_hash, oid, strlen(oid)); - dout("%s '%s' pgid %lld.%x\n", __func__, oid, pg->pool, pg->seed); + pg_out->pool = oloc->pool; + pg_out->seed = ceph_str_hash(pi->object_hash, oid->name, + oid->name_len); + + dout("%s '%.*s' pgid %llu.%x\n", __func__, oid->name_len, oid->name, + pg_out->pool, pg_out->seed); return 0; } -EXPORT_SYMBOL(ceph_calc_ceph_pg); +EXPORT_SYMBOL(ceph_oloc_oid_to_pg); static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x, int *result, int result_max, -- cgit v1.1 From 1b3f2ab51095a7aab684bf9f5c14235126188dbc Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:19 +0200 Subject: libceph: CEPH_OSD_FLAG_* enum update Update CEPH_OSD_FLAG_* enum. (We need CEPH_OSD_FLAG_IGNORE_OVERLAY to support tiering). Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/rados.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 68c96a5..96292df 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -344,6 +344,10 @@ enum { CEPH_OSD_FLAG_EXEC_PUBLIC = 0x1000, /* DEPRECATED op may exec (public) */ CEPH_OSD_FLAG_LOCALIZE_READS = 0x2000, /* read from nearby replica, if any */ CEPH_OSD_FLAG_RWORDERED = 0x4000, /* order wrt concurrent reads */ + CEPH_OSD_FLAG_IGNORE_CACHE = 0x8000, /* ignore cache logic */ + CEPH_OSD_FLAG_SKIPRWLOCKS = 0x10000, /* skip rw locks */ + CEPH_OSD_FLAG_IGNORE_OVERLAY = 0x20000, /* ignore pool overlay */ + CEPH_OSD_FLAG_FLUSH = 0x40000, /* this is part of flush */ }; enum { -- cgit v1.1 From ce7f6a2790464047199f54b66420243d433142bd Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:19 +0200 Subject: libceph: add ceph_pg_pool_by_id() "Lookup pool info by ID" function is hidden in osdmap.c. Expose it to the rest of libceph. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/osdmap.h | 3 +++ net/ceph/osdmap.c | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index ebb8ec2..7f894a6 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -174,6 +174,9 @@ extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid); +extern struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, + u64 id); + extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id); extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 768dd04..d69d2355 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -464,6 +464,11 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id) return NULL; } +struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id) +{ + return __lookup_pg_pool(&map->pg_pools, id); +} + const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) { struct ceph_pg_pool_info *pi; -- cgit v1.1 From 17a13e4028e6ad7ded079cf32370c47bd0e0fc07 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:19 +0200 Subject: libceph: follow {read,write}_tier fields on osd request submission Overwrite ceph_osd_request::r_oloc.pool with read_tier for read ops and write_tier for write and read+write ops (aka basic tiering support). {read,write}_tier are part of pg_pool_t since v9. This commit bumps our pg_pool_t decode compat version from v7 to v9, all new fields except for {read,write}_tier are ignored. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/osdmap.h | 2 ++ net/ceph/osd_client.c | 30 ++++++++++++++++++++++++++++-- net/ceph/osdmap.c | 28 +++++++++++++++++++++++++--- 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 7f894a6..49ff69f 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -35,6 +35,8 @@ struct ceph_pg_pool_info { u8 object_hash; u32 pg_num, pgp_num; int pg_num_mask, pgp_num_mask; + s64 read_tier; + s64 write_tier; /* wins for read+write ops */ u64 flags; char *name; }; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 10360de..0eb009e 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1250,6 +1250,32 @@ static bool __req_should_be_paused(struct ceph_osd_client *osdc, } /* + * Calculate mapping of a request to a PG. Takes tiering into account. + */ +static int __calc_request_pg(struct ceph_osdmap *osdmap, + struct ceph_osd_request *req, + struct ceph_pg *pg_out) +{ + if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { + struct ceph_pg_pool_info *pi; + + pi = ceph_pg_pool_by_id(osdmap, req->r_oloc.pool); + if (pi) { + if ((req->r_flags & CEPH_OSD_FLAG_READ) && + pi->read_tier >= 0) + req->r_oloc.pool = pi->read_tier; + if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && + pi->write_tier >= 0) + req->r_oloc.pool = pi->write_tier; + } + /* !pi is caught in ceph_oloc_oid_to_pg() */ + } + + return ceph_oloc_oid_to_pg(osdmap, &req->r_oloc, + &req->r_oid, pg_out); +} + +/* * Pick an osd (the first 'up' osd in the pg), allocate the osd struct * (as needed), and set the request r_osd appropriately. If there is * no up osd, set r_osd to NULL. Move the request to the appropriate list @@ -1269,8 +1295,8 @@ static int __map_request(struct ceph_osd_client *osdc, bool was_paused; dout("map_request %p tid %lld\n", req, req->r_tid); - err = ceph_oloc_oid_to_pg(osdc->osdmap, &req->r_oloc, &req->r_oid, - &pgid); + + err = __calc_request_pg(osdc->osdmap, req, &pgid); if (err) { list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index d69d2355..aade4a5 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -519,8 +519,8 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); return -EINVAL; } - if (cv > 7) { - pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv); + if (cv > 9) { + pr_warning("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv); return -EINVAL; } len = ceph_decode_32(p); @@ -548,12 +548,34 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) *p += len; } - /* skip removed snaps */ + /* skip removed_snaps */ num = ceph_decode_32(p); *p += num * (8 + 8); *p += 8; /* skip auid */ pi->flags = ceph_decode_64(p); + *p += 4; /* skip crash_replay_interval */ + + if (ev >= 7) + *p += 1; /* skip min_size */ + + if (ev >= 8) + *p += 8 + 8; /* skip quota_max_* */ + + if (ev >= 9) { + /* skip tiers */ + num = ceph_decode_32(p); + *p += num * 8; + + *p += 8; /* skip tier_of */ + *p += 1; /* skip cache_mode */ + + pi->read_tier = ceph_decode_64(p); + pi->write_tier = ceph_decode_64(p); + } else { + pi->read_tier = -1; + pi->write_tier = -1; + } /* ignore the rest */ -- cgit v1.1 From 3c972c95c68f455d80ff185aa440857be046bbe0 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:20 +0200 Subject: libceph: rename ceph_osd_request::r_{oloc,oid} to r_base_{oloc,oid} Rename ceph_osd_request::r_{oloc,oid} to r_base_{oloc,oid} before introducing r_target_{oloc,oid} needed for redirects. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- drivers/block/rbd.c | 8 ++++---- include/linux/ceph/osd_client.h | 4 ++-- net/ceph/debugfs.c | 4 ++-- net/ceph/osd_client.c | 30 +++++++++++++++--------------- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6fdc5fc..16cab66 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1808,8 +1808,8 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req->r_callback = rbd_osd_req_callback; osd_req->r_priv = obj_request; - osd_req->r_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); - ceph_oid_set_name(&osd_req->r_oid, obj_request->object_name); + osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); return osd_req; } @@ -1846,8 +1846,8 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) osd_req->r_callback = rbd_osd_req_callback; osd_req->r_priv = obj_request; - osd_req->r_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); - ceph_oid_set_name(&osd_req->r_oid, obj_request->object_name); + osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); return osd_req; } diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 8d8bb53..3170ca6 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -153,8 +153,8 @@ struct ceph_osd_request { struct inode *r_inode; /* for use by callbacks */ void *r_priv; /* ditto */ - struct ceph_object_locator r_oloc; - struct ceph_object_id r_oid; + struct ceph_object_locator r_base_oloc; + struct ceph_object_id r_base_oid; u64 r_snapid; unsigned long r_stamp; /* send OR check time */ diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 1f85627..258a382 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -132,8 +132,8 @@ static int osdc_show(struct seq_file *s, void *pp) req->r_osd ? req->r_osd->o_osd : -1, req->r_pgid.pool, req->r_pgid.seed); - seq_printf(s, "%.*s", req->r_oid.name_len, - req->r_oid.name); + seq_printf(s, "%.*s", req->r_base_oid.name_len, + req->r_base_oid.name); if (req->r_reassert_version.epoch) seq_printf(s, "\t%u'%llu", diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 0eb009e..3997a87 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -368,7 +368,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); - req->r_oloc.pool = -1; + req->r_base_oloc.pool = -1; /* create reply message */ if (use_mempool) @@ -763,11 +763,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (num_ops > 1) osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); - req->r_oloc.pool = ceph_file_layout_pg_pool(*layout); + req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); - snprintf(req->r_oid.name, sizeof(req->r_oid.name), + snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), "%llx.%08llx", vino.ino, objnum); - req->r_oid.name_len = strlen(req->r_oid.name); + req->r_base_oid.name_len = strlen(req->r_base_oid.name); return req; } @@ -1259,20 +1259,20 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap, if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { struct ceph_pg_pool_info *pi; - pi = ceph_pg_pool_by_id(osdmap, req->r_oloc.pool); + pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool); if (pi) { if ((req->r_flags & CEPH_OSD_FLAG_READ) && pi->read_tier >= 0) - req->r_oloc.pool = pi->read_tier; + req->r_base_oloc.pool = pi->read_tier; if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && pi->write_tier >= 0) - req->r_oloc.pool = pi->write_tier; + req->r_base_oloc.pool = pi->write_tier; } /* !pi is caught in ceph_oloc_oid_to_pg() */ } - return ceph_oloc_oid_to_pg(osdmap, &req->r_oloc, - &req->r_oid, pg_out); + return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc, + &req->r_base_oid, pg_out); } /* @@ -1382,7 +1382,7 @@ static void __send_request(struct ceph_osd_client *osdc, /* fill in message content that changes each time we send it */ put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); put_unaligned_le32(req->r_flags, req->r_request_flags); - put_unaligned_le64(req->r_oloc.pool, req->r_request_pool); + put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool); p = req->r_request_pgid; ceph_encode_64(&p, req->r_pgid.pool); ceph_encode_32(&p, req->r_pgid.seed); @@ -2144,11 +2144,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, ceph_encode_32(&p, -1); /* preferred */ /* oid */ - ceph_encode_32(&p, req->r_oid.name_len); - memcpy(p, req->r_oid.name, req->r_oid.name_len); - dout("oid '%.*s' len %d\n", req->r_oid.name_len, - req->r_oid.name, req->r_oid.name_len); - p += req->r_oid.name_len; + ceph_encode_32(&p, req->r_base_oid.name_len); + memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); + dout("oid '%.*s' len %d\n", req->r_base_oid.name_len, + req->r_base_oid.name, req->r_base_oid.name_len); + p += req->r_base_oid.name_len; /* ops--can imply data */ ceph_encode_16(&p, (u16)req->r_num_ops); -- cgit v1.1 From 205ee1187a671c3b067d7f1e974903b44036f270 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:20 +0200 Subject: libceph: follow redirect replies from osds Follow redirect replies from osds, for details see ceph.git commit fbbe3ad1220799b7bb00ea30fce581c5eadaf034. v1 (current) version of redirect reply consists of oloc and oid, which expands to pool, key, nspace, hash and oid. However, server-side code that would populate anything other than pool doesn't exist yet, and hence this commit adds support for pool redirects only. To make sure that future server-side updates don't break us, we decode all fields and, if any of key, nspace, hash or oid have a non-default value, error out with "corrupt osd_op_reply ..." message. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/osd_client.h | 6 ++ net/ceph/osd_client.c | 167 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 164 insertions(+), 9 deletions(-) diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 3170ca6..fd47e87 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -155,6 +155,8 @@ struct ceph_osd_request { struct ceph_object_locator r_base_oloc; struct ceph_object_id r_base_oid; + struct ceph_object_locator r_target_oloc; + struct ceph_object_id r_target_oid; u64 r_snapid; unsigned long r_stamp; /* send OR check time */ @@ -162,6 +164,10 @@ struct ceph_osd_request { struct ceph_snap_context *r_snapc; /* snap context for writes */ }; +struct ceph_request_redirect { + struct ceph_object_locator oloc; +}; + struct ceph_osd_event { u64 cookie; int one_shot; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 3997a87..010ff3b 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -369,6 +369,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_osd_item); req->r_base_oloc.pool = -1; + req->r_target_oloc.pool = -1; /* create reply message */ if (use_mempool) @@ -1256,23 +1257,36 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap, struct ceph_osd_request *req, struct ceph_pg *pg_out) { - if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { + bool need_check_tiering; + + need_check_tiering = false; + if (req->r_target_oloc.pool == -1) { + req->r_target_oloc = req->r_base_oloc; /* struct */ + need_check_tiering = true; + } + if (req->r_target_oid.name_len == 0) { + ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); + need_check_tiering = true; + } + + if (need_check_tiering && + (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { struct ceph_pg_pool_info *pi; - pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool); + pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool); if (pi) { if ((req->r_flags & CEPH_OSD_FLAG_READ) && pi->read_tier >= 0) - req->r_base_oloc.pool = pi->read_tier; + req->r_target_oloc.pool = pi->read_tier; if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && pi->write_tier >= 0) - req->r_base_oloc.pool = pi->write_tier; + req->r_target_oloc.pool = pi->write_tier; } /* !pi is caught in ceph_oloc_oid_to_pg() */ } - return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc, - &req->r_base_oid, pg_out); + return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc, + &req->r_target_oid, pg_out); } /* @@ -1382,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc, /* fill in message content that changes each time we send it */ put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); put_unaligned_le32(req->r_flags, req->r_request_flags); - put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool); + put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool); p = req->r_request_pgid; ceph_encode_64(&p, req->r_pgid.pool); ceph_encode_32(&p, req->r_pgid.seed); @@ -1483,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work) round_jiffies_relative(delay)); } +static int ceph_oloc_decode(void **p, void *end, + struct ceph_object_locator *oloc) +{ + u8 struct_v, struct_cv; + u32 len; + void *struct_end; + int ret = 0; + + ceph_decode_need(p, end, 1 + 1 + 4, e_inval); + struct_v = ceph_decode_8(p); + struct_cv = ceph_decode_8(p); + if (struct_v < 3) { + pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", + struct_v, struct_cv); + goto e_inval; + } + if (struct_cv > 6) { + pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", + struct_v, struct_cv); + goto e_inval; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, e_inval); + struct_end = *p + len; + + oloc->pool = ceph_decode_64(p); + *p += 4; /* skip preferred */ + + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_object_locator::key is set\n"); + goto e_inval; + } + + if (struct_v >= 5) { + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_object_locator::nspace is set\n"); + goto e_inval; + } + } + + if (struct_v >= 6) { + s64 hash = ceph_decode_64(p); + if (hash != -1) { + pr_warn("ceph_object_locator::hash is set\n"); + goto e_inval; + } + } + + /* skip the rest */ + *p = struct_end; +out: + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + +static int ceph_redirect_decode(void **p, void *end, + struct ceph_request_redirect *redir) +{ + u8 struct_v, struct_cv; + u32 len; + void *struct_end; + int ret; + + ceph_decode_need(p, end, 1 + 1 + 4, e_inval); + struct_v = ceph_decode_8(p); + struct_cv = ceph_decode_8(p); + if (struct_cv > 1) { + pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", + struct_v, struct_cv); + goto e_inval; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, e_inval); + struct_end = *p + len; + + ret = ceph_oloc_decode(p, end, &redir->oloc); + if (ret) + goto out; + + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_request_redirect::object_name is set\n"); + goto e_inval; + } + + len = ceph_decode_32(p); + *p += len; /* skip osd_instructions */ + + /* skip the rest */ + *p = struct_end; +out: + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + static void complete_request(struct ceph_osd_request *req) { complete_all(&req->r_safe_completion); /* fsync waiter */ @@ -1497,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, { void *p, *end; struct ceph_osd_request *req; + struct ceph_request_redirect redir; u64 tid; int object_len; unsigned int numops; @@ -1576,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, for (i = 0; i < numops; i++) req->r_reply_op_result[i] = ceph_decode_32(&p); - already_completed = req->r_got_reply; + if (le16_to_cpu(msg->hdr.version) >= 6) { + p += 8 + 4; /* skip replay_version */ + p += 8; /* skip user_version */ - if (!req->r_got_reply) { + err = ceph_redirect_decode(&p, end, &redir); + if (err) + goto bad_put; + } else { + redir.oloc.pool = -1; + } + if (redir.oloc.pool != -1) { + dout("redirect pool %lld\n", redir.oloc.pool); + + __unregister_request(osdc, req); + mutex_unlock(&osdc->request_mutex); + + req->r_target_oloc = redir.oloc; /* struct */ + + /* + * Start redirect requests with nofail=true. If + * mapping fails, request will end up on the notarget + * list, waiting for the new osdmap (which can take + * a while), even though the original request mapped + * successfully. In the future we might want to follow + * original request's nofail setting here. + */ + err = ceph_osdc_start_request(osdc, req, true); + BUG_ON(err); + + goto done; + } + + already_completed = req->r_got_reply; + if (!req->r_got_reply) { req->r_result = result; dout("handle_reply result %d bytes %d\n", req->r_result, bytes); -- cgit v1.1 From 80e163a58c0c69ef1a0ba3500d9932b14d67bf64 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 27 Jan 2014 17:40:20 +0200 Subject: libceph: support CEPH_FEATURE_OSD_CACHEPOOL feature Announce our (limited, see previous commit) support for CACHEPOOL feature. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/ceph_features.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index d21b34d..138448f 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -80,6 +80,7 @@ static inline u64 ceph_sanitize_features(u64 features) CEPH_FEATURE_CRUSH_TUNABLES2 | \ CEPH_FEATURE_REPLY_CREATE_INODE | \ CEPH_FEATURE_OSDHASHPSPOOL | \ + CEPH_FEATURE_OSD_CACHEPOOL | \ CEPH_FEATURE_CRUSH_V2 | \ CEPH_FEATURE_EXPORT_PEER) -- cgit v1.1 From 37b52fe60838b135913e877b0c849e59fae587c3 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 28 Jan 2014 18:29:29 +0200 Subject: ceph: fix dout() compile warnings in ceph_filemap_fault() PAGE_CACHE_SIZE is unsigned long on all architectures, however size_t is either unsigned int or unsigned long. Rather than change format strings, cast PAGE_CACHE_SIZE to size_t to be in line with dout()s in ceph_page_mkwrite(). Cc: Yan, Zheng Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- fs/ceph/addr.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 791a9a2..b53278c 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1220,7 +1220,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) int want, got, ret; dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n", - inode, ceph_vinop(inode), off, PAGE_CACHE_SIZE); + inode, ceph_vinop(inode), off, (size_t)PAGE_CACHE_SIZE); if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; else @@ -1236,12 +1236,12 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) } } dout("filemap_fault %p %llu~%zd got cap refs on %s\n", - inode, off, PAGE_CACHE_SIZE, ceph_cap_string(got)); + inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got)); ret = filemap_fault(vma, vmf); dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", - inode, off, PAGE_CACHE_SIZE, ceph_cap_string(got), ret); + inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); ceph_put_cap_refs(ci, got); return ret; -- cgit v1.1 From 125d725c923527a85876c031028c7f55c28b74b3 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 28 Jan 2014 19:16:18 +0200 Subject: ceph: cast PAGE_SIZE to size_t in ceph_sync_write() Use min_t(size_t, ...) instead of plain min(), which does strict type checking, to avoid compile warning on i386. Cc: Jianpeng Ma Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- fs/ceph/file.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index d10510a..dfd2ce3 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -734,7 +734,7 @@ static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov, left = len; for (n = 0; n < num_pages; n++) { - size_t plen = min(left, PAGE_SIZE); + size_t plen = min_t(size_t, left, PAGE_SIZE); ret = iov_iter_copy_from_user(pages[n], &i, 0, plen); if (ret != plen) { ret = -EFAULT; -- cgit v1.1