diff options
-rw-r--r-- | drivers/block/rbd.c | 3 | ||||
-rw-r--r-- | fs/ceph/addr.c | 14 | ||||
-rw-r--r-- | fs/ceph/cache.c | 8 | ||||
-rw-r--r-- | fs/ceph/file.c | 509 | ||||
-rw-r--r-- | fs/ceph/inode.c | 8 | ||||
-rw-r--r-- | include/linux/ceph/ceph_frag.h | 37 | ||||
-rw-r--r-- | include/linux/ceph/messenger.h | 2 | ||||
-rw-r--r-- | net/ceph/auth_x.c | 49 | ||||
-rw-r--r-- | net/ceph/auth_x.h | 2 | ||||
-rw-r--r-- | net/ceph/messenger.c | 105 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 4 |
11 files changed, 501 insertions, 240 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 81ea69f..4a87678 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -5185,8 +5185,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) out_err: rbd_dev_unparent(rbd_dev); - if (parent) - rbd_dev_destroy(parent); + rbd_dev_destroy(parent); return ret; } diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index b7d218a..c2221378 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1108,7 +1108,7 @@ retry_locked: return 0; /* past end of file? */ - i_size = inode->i_size; /* caller holds i_mutex */ + i_size = i_size_read(inode); if (page_off >= i_size || (pos_in_page == 0 && (pos+len) >= i_size && @@ -1149,7 +1149,6 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, page = grab_cache_page_write_begin(mapping, index, 0); if (!page) return -ENOMEM; - *pagep = page; dout("write_begin file %p inode %p page %p %d~%d\n", file, inode, page, (int)pos, (int)len); @@ -1184,8 +1183,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, zero_user_segment(page, from+copied, len); /* did file size increase? */ - /* (no need for i_size_read(); we caller holds i_mutex */ - if (pos+copied > inode->i_size) + if (pos+copied > i_size_read(inode)) check_cap = ceph_inode_set_size(inode, pos+copied); if (!PageUptodate(page)) @@ -1378,11 +1376,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = VM_FAULT_NOPAGE; if ((off > size) || - (page->mapping != inode->i_mapping)) + (page->mapping != inode->i_mapping)) { + unlock_page(page); goto out; + } ret = ceph_update_writeable_page(vma->vm_file, off, len, page); - if (ret == 0) { + if (ret >= 0) { /* success. we'll keep the page locked. */ set_page_dirty(page); ret = VM_FAULT_LOCKED; @@ -1393,8 +1393,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = VM_FAULT_SIGBUS; } out: - if (ret != VM_FAULT_LOCKED) - unlock_page(page); if (ret == VM_FAULT_LOCKED || ci->i_inline_version != CEPH_INLINE_NONE) { int dirty; diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c index 7680e26..a351480 100644 --- a/fs/ceph/cache.c +++ b/fs/ceph/cache.c @@ -106,7 +106,7 @@ static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data, memset(&aux, 0, sizeof(aux)); aux.mtime = inode->i_mtime; - aux.size = inode->i_size; + aux.size = i_size_read(inode); memcpy(buffer, &aux, sizeof(aux)); @@ -117,9 +117,7 @@ static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data, uint64_t *size) { const struct ceph_inode_info* ci = cookie_netfs_data; - const struct inode* inode = &ci->vfs_inode; - - *size = inode->i_size; + *size = i_size_read(&ci->vfs_inode); } static enum fscache_checkaux ceph_fscache_inode_check_aux( @@ -134,7 +132,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux( memset(&aux, 0, sizeof(aux)); aux.mtime = inode->i_mtime; - aux.size = inode->i_size; + aux.size = i_size_read(inode); if (memcmp(data, &aux, sizeof(aux)) != 0) return FSCACHE_CHECKAUX_OBSOLETE; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 10c5ae7..86a9c38 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file) } enum { - CHECK_EOF = 1, - READ_INLINE = 2, + HAVE_RETRIED = 1, + CHECK_EOF = 2, + READ_INLINE = 3, }; /* @@ -411,17 +412,15 @@ enum { static int striped_read(struct inode *inode, u64 off, u64 len, struct page **pages, int num_pages, - int *checkeof, bool o_direct, - unsigned long buf_align) + int *checkeof) { struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); u64 pos, this_len, left; - int io_align, page_align; - int pages_left; - int read; + loff_t i_size; + int page_align, pages_left; + int read, ret; struct page **page_pos; - int ret; bool hit_stripe, was_short; /* @@ -432,13 +431,9 @@ static int striped_read(struct inode *inode, page_pos = pages; pages_left = num_pages; read = 0; - io_align = off & ~PAGE_MASK; more: - if (o_direct) - page_align = (pos - io_align + buf_align) & ~PAGE_MASK; - else - page_align = pos & ~PAGE_MASK; + page_align = pos & ~PAGE_MASK; this_len = left; ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), &ci->i_layout, pos, &this_len, @@ -452,13 +447,12 @@ more: dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); + i_size = i_size_read(inode); if (ret >= 0) { int didpages; - if (was_short && (pos + ret < inode->i_size)) { - int zlen = min(this_len - ret, - inode->i_size - pos - ret); - int zoff = (o_direct ? buf_align : io_align) + - read + ret; + if (was_short && (pos + ret < i_size)) { + int zlen = min(this_len - ret, i_size - pos - ret); + int zoff = (off & ~PAGE_MASK) + read + ret; dout(" zero gap %llu to %llu\n", pos + ret, pos + ret + zlen); ceph_zero_page_vector_range(zoff, zlen, pages); @@ -473,14 +467,14 @@ more: pages_left -= didpages; /* hit stripe and need continue*/ - if (left && hit_stripe && pos < inode->i_size) + if (left && hit_stripe && pos < i_size) goto more; } if (read > 0) { ret = read; /* did we bounce off eof? */ - if (pos + left > inode->i_size) + if (pos + left > i_size) *checkeof = CHECK_EOF; } @@ -521,54 +515,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, if (ret < 0) return ret; - if (iocb->ki_flags & IOCB_DIRECT) { - while (iov_iter_count(i)) { - size_t start; - ssize_t n; - - n = dio_get_pagev_size(i); - pages = dio_get_pages_alloc(i, n, &start, &num_pages); - if (IS_ERR(pages)) - return PTR_ERR(pages); - - ret = striped_read(inode, off, n, - pages, num_pages, checkeof, - 1, start); - - ceph_put_page_vector(pages, num_pages, true); - - if (ret <= 0) - break; - off += ret; - iov_iter_advance(i, ret); - if (ret < n) + num_pages = calc_pages_for(off, len); + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + ret = striped_read(inode, off, len, pages, + num_pages, checkeof); + if (ret > 0) { + int l, k = 0; + size_t left = ret; + + while (left) { + size_t page_off = off & ~PAGE_MASK; + size_t copy = min_t(size_t, left, + PAGE_SIZE - page_off); + l = copy_page_to_iter(pages[k++], page_off, copy, i); + off += l; + left -= l; + if (l < copy) break; } - } else { - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - ret = striped_read(inode, off, len, pages, - num_pages, checkeof, 0, 0); - if (ret > 0) { - int l, k = 0; - size_t left = ret; - - while (left) { - size_t page_off = off & ~PAGE_MASK; - size_t copy = min_t(size_t, - PAGE_SIZE - page_off, left); - l = copy_page_to_iter(pages[k++], page_off, - copy, i); - off += l; - left -= l; - if (l < copy) - break; - } - } - ceph_release_page_vector(pages, num_pages); } + ceph_release_page_vector(pages, num_pages); if (off > iocb->ki_pos) { ret = off - iocb->ki_pos; @@ -579,6 +547,193 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, return ret; } +struct ceph_aio_request { + struct kiocb *iocb; + size_t total_len; + int write; + int error; + struct list_head osd_reqs; + unsigned num_reqs; + atomic_t pending_reqs; + struct timespec mtime; + struct ceph_cap_flush *prealloc_cf; +}; + +struct ceph_aio_work { + struct work_struct work; + struct ceph_osd_request *req; +}; + +static void ceph_aio_retry_work(struct work_struct *work); + +static void ceph_aio_complete(struct inode *inode, + struct ceph_aio_request *aio_req) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + if (!atomic_dec_and_test(&aio_req->pending_reqs)) + return; + + ret = aio_req->error; + if (!ret) + ret = aio_req->total_len; + + dout("ceph_aio_complete %p rc %d\n", inode, ret); + + if (ret >= 0 && aio_req->write) { + int dirty; + + loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len; + if (endoff > i_size_read(inode)) { + if (ceph_inode_set_size(inode, endoff)) + ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); + } + + spin_lock(&ci->i_ceph_lock); + ci->i_inline_version = CEPH_INLINE_NONE; + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &aio_req->prealloc_cf); + spin_unlock(&ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(inode, dirty); + + } + + ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR : + CEPH_CAP_FILE_RD)); + + aio_req->iocb->ki_complete(aio_req->iocb, ret, 0); + + ceph_free_cap_flush(aio_req->prealloc_cf); + kfree(aio_req); +} + +static void ceph_aio_complete_req(struct ceph_osd_request *req, + struct ceph_msg *msg) +{ + int rc = req->r_result; + struct inode *inode = req->r_inode; + struct ceph_aio_request *aio_req = req->r_priv; + struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); + int num_pages = calc_pages_for((u64)osd_data->alignment, + osd_data->length); + + dout("ceph_aio_complete_req %p rc %d bytes %llu\n", + inode, rc, osd_data->length); + + if (rc == -EOLDSNAPC) { + struct ceph_aio_work *aio_work; + BUG_ON(!aio_req->write); + + aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS); + if (aio_work) { + INIT_WORK(&aio_work->work, ceph_aio_retry_work); + aio_work->req = req; + queue_work(ceph_inode_to_client(inode)->wb_wq, + &aio_work->work); + return; + } + rc = -ENOMEM; + } else if (!aio_req->write) { + if (rc == -ENOENT) + rc = 0; + if (rc >= 0 && osd_data->length > rc) { + int zoff = osd_data->alignment + rc; + int zlen = osd_data->length - rc; + /* + * If read is satisfied by single OSD request, + * it can pass EOF. Otherwise read is within + * i_size. + */ + if (aio_req->num_reqs == 1) { + loff_t i_size = i_size_read(inode); + loff_t endoff = aio_req->iocb->ki_pos + rc; + if (endoff < i_size) + zlen = min_t(size_t, zlen, + i_size - endoff); + aio_req->total_len = rc + zlen; + } + + if (zlen > 0) + ceph_zero_page_vector_range(zoff, zlen, + osd_data->pages); + } + } + + ceph_put_page_vector(osd_data->pages, num_pages, false); + ceph_osdc_put_request(req); + + if (rc < 0) + cmpxchg(&aio_req->error, 0, rc); + + ceph_aio_complete(inode, aio_req); + return; +} + +static void ceph_aio_retry_work(struct work_struct *work) +{ + struct ceph_aio_work *aio_work = + container_of(work, struct ceph_aio_work, work); + struct ceph_osd_request *orig_req = aio_work->req; + struct ceph_aio_request *aio_req = orig_req->r_priv; + struct inode *inode = orig_req->r_inode; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_snap_context *snapc; + struct ceph_osd_request *req; + int ret; + + spin_lock(&ci->i_ceph_lock); + if (__ceph_have_pending_cap_snap(ci)) { + struct ceph_cap_snap *capsnap = + list_last_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, + ci_item); + snapc = ceph_get_snap_context(capsnap->context); + } else { + BUG_ON(!ci->i_head_snapc); + snapc = ceph_get_snap_context(ci->i_head_snapc); + } + spin_unlock(&ci->i_ceph_lock); + + req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2, + false, GFP_NOFS); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + req = orig_req; + goto out; + } + + req->r_flags = CEPH_OSD_FLAG_ORDERSNAP | + CEPH_OSD_FLAG_ONDISK | + CEPH_OSD_FLAG_WRITE; + req->r_base_oloc = orig_req->r_base_oloc; + req->r_base_oid = orig_req->r_base_oid; + + req->r_ops[0] = orig_req->r_ops[0]; + osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); + + ceph_osdc_build_request(req, req->r_ops[0].extent.offset, + snapc, CEPH_NOSNAP, &aio_req->mtime); + + ceph_put_snap_context(snapc); + ceph_osdc_put_request(orig_req); + + req->r_callback = ceph_aio_complete_req; + req->r_inode = inode; + req->r_priv = aio_req; + + ret = ceph_osdc_start_request(req->r_osdc, req, false); +out: + if (ret < 0) { + BUG_ON(ret == -EOLDSNAPC); + req->r_result = ret; + ceph_aio_complete_req(req, NULL); + } + + kfree(aio_work); +} + /* * Write commit request unsafe callback, called to tell us when a * request is unsafe (that is, in flight--has been handed to the @@ -612,16 +767,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) } -/* - * Synchronous write, straight from __user pointer or user pages. - * - * If write spans object boundary, just do multiple writes. (For a - * correct atomic write, we should e.g. take write locks on all - * objects, rollback on failure, etc.) - */ static ssize_t -ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, - struct ceph_snap_context *snapc) +ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, + struct ceph_snap_context *snapc, + struct ceph_cap_flush **pcf) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); @@ -630,44 +779,52 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, struct ceph_vino vino; struct ceph_osd_request *req; struct page **pages; - int num_pages; - int written = 0; + struct ceph_aio_request *aio_req = NULL; + int num_pages = 0; int flags; - int check_caps = 0; int ret; struct timespec mtime = CURRENT_TIME; - size_t count = iov_iter_count(from); + size_t count = iov_iter_count(iter); + loff_t pos = iocb->ki_pos; + bool write = iov_iter_rw(iter) == WRITE; - if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) + if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; - dout("sync_direct_write on file %p %lld~%u\n", file, pos, - (unsigned)count); + dout("sync_direct_read_write (%s) on file %p %lld~%u\n", + (write ? "write" : "read"), file, pos, (unsigned)count); ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); if (ret < 0) return ret; - ret = invalidate_inode_pages2_range(inode->i_mapping, - pos >> PAGE_CACHE_SHIFT, - (pos + count) >> PAGE_CACHE_SHIFT); - if (ret < 0) - dout("invalidate_inode_pages2_range returned %d\n", ret); + if (write) { + ret = invalidate_inode_pages2_range(inode->i_mapping, + pos >> PAGE_CACHE_SHIFT, + (pos + count) >> PAGE_CACHE_SHIFT); + if (ret < 0) + dout("invalidate_inode_pages2_range returned %d\n", ret); - flags = CEPH_OSD_FLAG_ORDERSNAP | - CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE; + flags = CEPH_OSD_FLAG_ORDERSNAP | + CEPH_OSD_FLAG_ONDISK | + CEPH_OSD_FLAG_WRITE; + } else { + flags = CEPH_OSD_FLAG_READ; + } - while (iov_iter_count(from) > 0) { - u64 len = dio_get_pagev_size(from); - size_t start; - ssize_t n; + while (iov_iter_count(iter) > 0) { + u64 size = dio_get_pagev_size(iter); + size_t start = 0; + ssize_t len; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, 0, - 2,/*include a 'startsync' command*/ - CEPH_OSD_OP_WRITE, flags, snapc, + vino, pos, &size, 0, + /*include a 'startsync' command*/ + write ? 2 : 1, + write ? CEPH_OSD_OP_WRITE : + CEPH_OSD_OP_READ, + flags, snapc, ci->i_truncate_seq, ci->i_truncate_size, false); @@ -676,10 +833,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, break; } - osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); - - n = len; - pages = dio_get_pages_alloc(from, len, &start, &num_pages); + len = size; + pages = dio_get_pages_alloc(iter, len, &start, &num_pages); if (IS_ERR(pages)) { ceph_osdc_put_request(req); ret = PTR_ERR(pages); @@ -687,47 +842,128 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, } /* - * throw out any page cache pages in this range. this - * may block. + * To simplify error handling, allow AIO when IO within i_size + * or IO can be satisfied by single OSD request. */ - truncate_inode_pages_range(inode->i_mapping, pos, - (pos+n) | (PAGE_CACHE_SIZE-1)); - osd_req_op_extent_osd_data_pages(req, 0, pages, n, start, - false, false); + if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) && + (len == count || pos + count <= i_size_read(inode))) { + aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL); + if (aio_req) { + aio_req->iocb = iocb; + aio_req->write = write; + INIT_LIST_HEAD(&aio_req->osd_reqs); + if (write) { + aio_req->mtime = mtime; + swap(aio_req->prealloc_cf, *pcf); + } + } + /* ignore error */ + } + + if (write) { + /* + * throw out any page cache pages in this range. this + * may block. + */ + truncate_inode_pages_range(inode->i_mapping, pos, + (pos+len) | (PAGE_CACHE_SIZE - 1)); + + osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); + } + + + osd_req_op_extent_osd_data_pages(req, 0, pages, len, start, + false, false); - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); - ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (aio_req) { + aio_req->total_len += len; + aio_req->num_reqs++; + atomic_inc(&aio_req->pending_reqs); + + req->r_callback = ceph_aio_complete_req; + req->r_inode = inode; + req->r_priv = aio_req; + list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); + + pos += len; + iov_iter_advance(iter, len); + continue; + } + + ret = ceph_osdc_start_request(req->r_osdc, req, false); if (!ret) ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + size = i_size_read(inode); + if (!write) { + if (ret == -ENOENT) + ret = 0; + if (ret >= 0 && ret < len && pos + ret < size) { + int zlen = min_t(size_t, len - ret, + size - pos - ret); + ceph_zero_page_vector_range(start + ret, zlen, + pages); + ret += zlen; + } + if (ret >= 0) + len = ret; + } + ceph_put_page_vector(pages, num_pages, false); ceph_osdc_put_request(req); - if (ret) + if (ret < 0) break; - pos += n; - written += n; - iov_iter_advance(from, n); - if (pos > i_size_read(inode)) { - check_caps = ceph_inode_set_size(inode, pos); - if (check_caps) + pos += len; + iov_iter_advance(iter, len); + + if (!write && pos >= size) + break; + + if (write && pos > size) { + if (ceph_inode_set_size(inode, pos)) ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); } } - if (ret != -EOLDSNAPC && written > 0) { + if (aio_req) { + if (aio_req->num_reqs == 0) { + kfree(aio_req); + return ret; + } + + ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR : + CEPH_CAP_FILE_RD); + + while (!list_empty(&aio_req->osd_reqs)) { + req = list_first_entry(&aio_req->osd_reqs, + struct ceph_osd_request, + r_unsafe_item); + list_del_init(&req->r_unsafe_item); + if (ret >= 0) + ret = ceph_osdc_start_request(req->r_osdc, + req, false); + if (ret < 0) { + BUG_ON(ret == -EOLDSNAPC); + req->r_result = ret; + ceph_aio_complete_req(req, NULL); + } + } + return -EIOCBQUEUED; + } + + if (ret != -EOLDSNAPC && pos > iocb->ki_pos) { + ret = pos - iocb->ki_pos; iocb->ki_pos = pos; - ret = written; } return ret; } - /* * Synchronous write, straight from __user pointer or user pages. * @@ -897,8 +1133,14 @@ again: ceph_cap_string(got)); if (ci->i_inline_version == CEPH_INLINE_NONE) { - /* hmm, this isn't really async... */ - ret = ceph_sync_read(iocb, to, &retry_op); + if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) { + ret = ceph_direct_read_write(iocb, to, + NULL, NULL); + if (ret >= 0 && ret < len) + retry_op = CHECK_EOF; + } else { + ret = ceph_sync_read(iocb, to, &retry_op); + } } else { retry_op = READ_INLINE; } @@ -916,7 +1158,7 @@ again: pinned_page = NULL; } ceph_put_cap_refs(ci, got); - if (retry_op && ret >= 0) { + if (retry_op > HAVE_RETRIED && ret >= 0) { int statret; struct page *page = NULL; loff_t i_size; @@ -968,12 +1210,11 @@ again: if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && ret < len) { dout("sync_read hit hole, ppos %lld < size %lld" - ", reading more\n", iocb->ki_pos, - inode->i_size); + ", reading more\n", iocb->ki_pos, i_size); read += ret; len -= ret; - retry_op = 0; + retry_op = HAVE_RETRIED; goto again; } } @@ -1052,7 +1293,7 @@ retry_snap: } dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n", - inode, ceph_vinop(inode), pos, count, inode->i_size); + inode, ceph_vinop(inode), pos, count, i_size_read(inode)); if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; else @@ -1088,8 +1329,8 @@ retry_snap: /* we might need to revert back to that point */ data = *from; if (iocb->ki_flags & IOCB_DIRECT) - written = ceph_sync_direct_write(iocb, &data, pos, - snapc); + written = ceph_direct_read_write(iocb, &data, snapc, + &prealloc_cf); else written = ceph_sync_write(iocb, &data, pos, snapc); if (written == -EOLDSNAPC) { @@ -1104,7 +1345,7 @@ retry_snap: iov_iter_advance(from, written); ceph_put_snap_context(snapc); } else { - loff_t old_size = inode->i_size; + loff_t old_size = i_size_read(inode); /* * No need to acquire the i_truncate_mutex. Because * the MDS revokes Fwb caps before sending truncate @@ -1115,7 +1356,7 @@ retry_snap: written = generic_perform_write(file, from, pos); if (likely(written >= 0)) iocb->ki_pos = pos + written; - if (inode->i_size > old_size) + if (i_size_read(inode) > old_size) ceph_fscache_update_objectsize(inode); inode_unlock(inode); } @@ -1160,6 +1401,7 @@ out_unlocked: static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) { struct inode *inode = file->f_mapping->host; + loff_t i_size; int ret; inode_lock(inode); @@ -1172,9 +1414,10 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) } } + i_size = i_size_read(inode); switch (whence) { case SEEK_END: - offset += inode->i_size; + offset += i_size; break; case SEEK_CUR: /* @@ -1190,17 +1433,17 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) offset += file->f_pos; break; case SEEK_DATA: - if (offset >= inode->i_size) { + if (offset >= i_size) { ret = -ENXIO; goto out; } break; case SEEK_HOLE: - if (offset >= inode->i_size) { + if (offset >= i_size) { ret = -ENXIO; goto out; } - offset = inode->i_size; + offset = i_size; break; } diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index da55eb8..fb4ba2e 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -548,7 +548,7 @@ int ceph_fill_file_size(struct inode *inode, int issued, if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 || (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) { dout("size %lld -> %llu\n", inode->i_size, size); - inode->i_size = size; + i_size_write(inode, size); inode->i_blocks = (size + (1<<9) - 1) >> 9; ci->i_reported_size = size; if (truncate_seq != ci->i_truncate_seq) { @@ -808,7 +808,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, spin_unlock(&ci->i_ceph_lock); err = -EINVAL; - if (WARN_ON(symlen != inode->i_size)) + if (WARN_ON(symlen != i_size_read(inode))) goto out; err = -ENOMEM; @@ -1549,7 +1549,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size) spin_lock(&ci->i_ceph_lock); dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size); - inode->i_size = size; + i_size_write(inode, size); inode->i_blocks = (size + (1 << 9) - 1) >> 9; /* tell the MDS if we are approaching max_size */ @@ -1911,7 +1911,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) inode->i_size, attr->ia_size); if ((issued & CEPH_CAP_FILE_EXCL) && attr->ia_size > inode->i_size) { - inode->i_size = attr->ia_size; + i_size_write(inode, attr->ia_size); inode->i_blocks = (attr->ia_size + (1 << 9) - 1) >> 9; inode->i_ctime = attr->ia_ctime; diff --git a/include/linux/ceph/ceph_frag.h b/include/linux/ceph/ceph_frag.h index 5babb8e..b827e06 100644 --- a/include/linux/ceph/ceph_frag.h +++ b/include/linux/ceph/ceph_frag.h @@ -40,46 +40,11 @@ static inline __u32 ceph_frag_mask_shift(__u32 f) return 24 - ceph_frag_bits(f); } -static inline int ceph_frag_contains_value(__u32 f, __u32 v) +static inline bool ceph_frag_contains_value(__u32 f, __u32 v) { return (v & ceph_frag_mask(f)) == ceph_frag_value(f); } -static inline int ceph_frag_contains_frag(__u32 f, __u32 sub) -{ - /* is sub as specific as us, and contained by us? */ - return ceph_frag_bits(sub) >= ceph_frag_bits(f) && - (ceph_frag_value(sub) & ceph_frag_mask(f)) == ceph_frag_value(f); -} -static inline __u32 ceph_frag_parent(__u32 f) -{ - return ceph_frag_make(ceph_frag_bits(f) - 1, - ceph_frag_value(f) & (ceph_frag_mask(f) << 1)); -} -static inline int ceph_frag_is_left_child(__u32 f) -{ - return ceph_frag_bits(f) > 0 && - (ceph_frag_value(f) & (0x1000000 >> ceph_frag_bits(f))) == 0; -} -static inline int ceph_frag_is_right_child(__u32 f) -{ - return ceph_frag_bits(f) > 0 && - (ceph_frag_value(f) & (0x1000000 >> ceph_frag_bits(f))) == 1; -} -static inline __u32 ceph_frag_sibling(__u32 f) -{ - return ceph_frag_make(ceph_frag_bits(f), - ceph_frag_value(f) ^ (0x1000000 >> ceph_frag_bits(f))); -} -static inline __u32 ceph_frag_left_child(__u32 f) -{ - return ceph_frag_make(ceph_frag_bits(f)+1, ceph_frag_value(f)); -} -static inline __u32 ceph_frag_right_child(__u32 f) -{ - return ceph_frag_make(ceph_frag_bits(f)+1, - ceph_frag_value(f) | (0x1000000 >> (1+ceph_frag_bits(f)))); -} static inline __u32 ceph_frag_make_child(__u32 f, int by, int i) { int newbits = ceph_frag_bits(f) + by; diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 71b1d6c..8dbd787 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -220,6 +220,7 @@ struct ceph_connection { struct ceph_entity_addr actual_peer_addr; /* message out temps */ + struct ceph_msg_header out_hdr; struct ceph_msg *out_msg; /* sending message (== tail of out_sent) */ bool out_msg_done; @@ -229,7 +230,6 @@ struct ceph_connection { int out_kvec_left; /* kvec's left in out_kvec */ int out_skip; /* skip this many bytes */ int out_kvec_bytes; /* total bytes left */ - bool out_kvec_is_msg; /* kvec refers to out_msg */ int out_more; /* there is more data after the kvecs */ __le64 out_temp_ack; /* for writing an ack */ struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2 diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c index 10d87753..9e43a31 100644 --- a/net/ceph/auth_x.c +++ b/net/ceph/auth_x.c @@ -152,7 +152,6 @@ static int process_one_ticket(struct ceph_auth_client *ac, void *ticket_buf = NULL; void *tp, *tpend; void **ptp; - struct ceph_timespec new_validity; struct ceph_crypto_key new_session_key; struct ceph_buffer *new_ticket_blob; unsigned long new_expires, new_renew_after; @@ -193,8 +192,8 @@ static int process_one_ticket(struct ceph_auth_client *ac, if (ret) goto out; - ceph_decode_copy(&dp, &new_validity, sizeof(new_validity)); - ceph_decode_timespec(&validity, &new_validity); + ceph_decode_timespec(&validity, dp); + dp += sizeof(struct ceph_timespec); new_expires = get_seconds() + validity.tv_sec; new_renew_after = new_expires - (validity.tv_sec / 4); dout(" expires=%lu renew_after=%lu\n", new_expires, @@ -233,10 +232,10 @@ static int process_one_ticket(struct ceph_auth_client *ac, ceph_buffer_put(th->ticket_blob); th->session_key = new_session_key; th->ticket_blob = new_ticket_blob; - th->validity = new_validity; th->secret_id = new_secret_id; th->expires = new_expires; th->renew_after = new_renew_after; + th->have_key = true; dout(" got ticket service %d (%s) secret_id %lld len %d\n", type, ceph_entity_type_name(type), th->secret_id, (int)th->ticket_blob->vec.iov_len); @@ -384,6 +383,24 @@ bad: return -ERANGE; } +static bool need_key(struct ceph_x_ticket_handler *th) +{ + if (!th->have_key) + return true; + + return get_seconds() >= th->renew_after; +} + +static bool have_key(struct ceph_x_ticket_handler *th) +{ + if (th->have_key) { + if (get_seconds() >= th->expires) + th->have_key = false; + } + + return th->have_key; +} + static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed) { int want = ac->want_keys; @@ -402,20 +419,18 @@ static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed) continue; th = get_ticket_handler(ac, service); - if (IS_ERR(th)) { *pneed |= service; continue; } - if (get_seconds() >= th->renew_after) + if (need_key(th)) *pneed |= service; - if (get_seconds() >= th->expires) + if (!have_key(th)) xi->have_keys &= ~service; } } - static int ceph_x_build_request(struct ceph_auth_client *ac, void *buf, void *end) { @@ -667,14 +682,26 @@ static void ceph_x_destroy(struct ceph_auth_client *ac) ac->private = NULL; } -static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac, - int peer_type) +static void invalidate_ticket(struct ceph_auth_client *ac, int peer_type) { struct ceph_x_ticket_handler *th; th = get_ticket_handler(ac, peer_type); if (!IS_ERR(th)) - memset(&th->validity, 0, sizeof(th->validity)); + th->have_key = false; +} + +static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac, + int peer_type) +{ + /* + * We are to invalidate a service ticket in the hopes of + * getting a new, hopefully more valid, one. But, we won't get + * it unless our AUTH ticket is good, so invalidate AUTH ticket + * as well, just in case. + */ + invalidate_ticket(ac, peer_type); + invalidate_ticket(ac, CEPH_ENTITY_TYPE_AUTH); } static int calcu_signature(struct ceph_x_authorizer *au, diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h index e8b7c69..40b1a3c 100644 --- a/net/ceph/auth_x.h +++ b/net/ceph/auth_x.h @@ -16,7 +16,7 @@ struct ceph_x_ticket_handler { unsigned int service; struct ceph_crypto_key session_key; - struct ceph_timespec validity; + bool have_key; u64 secret_id; struct ceph_buffer *ticket_blob; diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 9981039..9cfedf5 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -23,9 +23,6 @@ #include <linux/ceph/pagelist.h> #include <linux/export.h> -#define list_entry_next(pos, member) \ - list_entry(pos->member.next, typeof(*pos), member) - /* * Ceph uses the messenger to exchange ceph_msg messages with other * hosts in the system. The messenger provides ordered and reliable @@ -672,6 +669,8 @@ static void reset_connection(struct ceph_connection *con) } con->in_seq = 0; con->in_seq_acked = 0; + + con->out_skip = 0; } /* @@ -771,6 +770,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) static void con_out_kvec_reset(struct ceph_connection *con) { + BUG_ON(con->out_skip); + con->out_kvec_left = 0; con->out_kvec_bytes = 0; con->out_kvec_cur = &con->out_kvec[0]; @@ -779,9 +780,9 @@ static void con_out_kvec_reset(struct ceph_connection *con) static void con_out_kvec_add(struct ceph_connection *con, size_t size, void *data) { - int index; + int index = con->out_kvec_left; - index = con->out_kvec_left; + BUG_ON(con->out_skip); BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); con->out_kvec[index].iov_len = size; @@ -790,6 +791,27 @@ static void con_out_kvec_add(struct ceph_connection *con, con->out_kvec_bytes += size; } +/* + * Chop off a kvec from the end. Return residual number of bytes for + * that kvec, i.e. how many bytes would have been written if the kvec + * hadn't been nuked. + */ +static int con_out_kvec_skip(struct ceph_connection *con) +{ + int off = con->out_kvec_cur - con->out_kvec; + int skip = 0; + + if (con->out_kvec_bytes > 0) { + skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; + BUG_ON(con->out_kvec_bytes < skip); + BUG_ON(!con->out_kvec_left); + con->out_kvec_bytes -= skip; + con->out_kvec_left--; + } + + return skip; +} + #ifdef CONFIG_BLOCK /* @@ -1042,7 +1064,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, /* Move on to the next page */ BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); - cursor->page = list_entry_next(cursor->page, lru); + cursor->page = list_next_entry(cursor->page, lru); cursor->last_piece = cursor->resid <= PAGE_SIZE; return true; @@ -1166,7 +1188,7 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, if (!cursor->resid && cursor->total_resid) { WARN_ON(!cursor->last_piece); BUG_ON(list_is_last(&cursor->data->links, cursor->data_head)); - cursor->data = list_entry_next(cursor->data, links); + cursor->data = list_next_entry(cursor->data, links); __ceph_msg_data_cursor_init(cursor); new_piece = true; } @@ -1197,7 +1219,6 @@ static void prepare_write_message_footer(struct ceph_connection *con) m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; dout("prepare_write_message_footer %p\n", con); - con->out_kvec_is_msg = true; con->out_kvec[v].iov_base = &m->footer; if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { if (con->ops->sign_message) @@ -1225,7 +1246,6 @@ static void prepare_write_message(struct ceph_connection *con) u32 crc; con_out_kvec_reset(con); - con->out_kvec_is_msg = true; con->out_msg_done = false; /* Sneak an ack in there first? If we can get it into the same @@ -1265,18 +1285,19 @@ static void prepare_write_message(struct ceph_connection *con) /* tag + hdr + front + middle */ con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); - con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); + con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); if (m->middle) con_out_kvec_add(con, m->middle->vec.iov_len, m->middle->vec.iov_base); - /* fill in crc (except data pages), footer */ + /* fill in hdr crc and finalize hdr */ crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); con->out_msg->hdr.crc = cpu_to_le32(crc); - con->out_msg->footer.flags = 0; + memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); + /* fill in front and middle crc, footer */ crc = crc32c(0, m->front.iov_base, m->front.iov_len); con->out_msg->footer.front_crc = cpu_to_le32(crc); if (m->middle) { @@ -1288,6 +1309,7 @@ static void prepare_write_message(struct ceph_connection *con) dout("%s front_crc %u middle_crc %u\n", __func__, le32_to_cpu(con->out_msg->footer.front_crc), le32_to_cpu(con->out_msg->footer.middle_crc)); + con->out_msg->footer.flags = 0; /* is there a data payload? */ con->out_msg->footer.data_crc = 0; @@ -1492,7 +1514,6 @@ static int write_partial_kvec(struct ceph_connection *con) } } con->out_kvec_left = 0; - con->out_kvec_is_msg = false; ret = 1; out: dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, @@ -1584,6 +1605,7 @@ static int write_partial_skip(struct ceph_connection *con) { int ret; + dout("%s %p %d left\n", __func__, con, con->out_skip); while (con->out_skip > 0) { size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); @@ -2506,13 +2528,13 @@ more: more_kvec: /* kvec data queued? */ - if (con->out_skip) { - ret = write_partial_skip(con); + if (con->out_kvec_left) { + ret = write_partial_kvec(con); if (ret <= 0) goto out; } - if (con->out_kvec_left) { - ret = write_partial_kvec(con); + if (con->out_skip) { + ret = write_partial_skip(con); if (ret <= 0) goto out; } @@ -2805,13 +2827,17 @@ static bool con_backoff(struct ceph_connection *con) static void con_fault_finish(struct ceph_connection *con) { + dout("%s %p\n", __func__, con); + /* * in case we faulted due to authentication, invalidate our * current tickets so that we can get new ones. */ - if (con->auth_retry && con->ops->invalidate_authorizer) { - dout("calling invalidate_authorizer()\n"); - con->ops->invalidate_authorizer(con); + if (con->auth_retry) { + dout("auth_retry %d, invalidating\n", con->auth_retry); + if (con->ops->invalidate_authorizer) + con->ops->invalidate_authorizer(con); + con->auth_retry = 0; } if (con->ops->fault) @@ -3050,16 +3076,31 @@ void ceph_msg_revoke(struct ceph_msg *msg) ceph_msg_put(msg); } if (con->out_msg == msg) { - dout("%s %p msg %p - was sending\n", __func__, con, msg); - con->out_msg = NULL; - if (con->out_kvec_is_msg) { - con->out_skip = con->out_kvec_bytes; - con->out_kvec_is_msg = false; + BUG_ON(con->out_skip); + /* footer */ + if (con->out_msg_done) { + con->out_skip += con_out_kvec_skip(con); + } else { + BUG_ON(!msg->data_length); + if (con->peer_features & CEPH_FEATURE_MSG_AUTH) + con->out_skip += sizeof(msg->footer); + else + con->out_skip += sizeof(msg->old_footer); } + /* data, middle, front */ + if (msg->data_length) + con->out_skip += msg->cursor.total_resid; + if (msg->middle) + con->out_skip += con_out_kvec_skip(con); + con->out_skip += con_out_kvec_skip(con); + + dout("%s %p msg %p - was sending, will write %d skip %d\n", + __func__, con, msg, con->out_kvec_bytes, con->out_skip); msg->hdr.seq = 0; - + con->out_msg = NULL; ceph_msg_put(msg); } + mutex_unlock(&con->mutex); } @@ -3361,9 +3402,7 @@ static void ceph_msg_free(struct ceph_msg *m) static void ceph_msg_release(struct kref *kref) { struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); - LIST_HEAD(data); - struct list_head *links; - struct list_head *next; + struct ceph_msg_data *data, *next; dout("%s %p\n", __func__, m); WARN_ON(!list_empty(&m->list_head)); @@ -3376,12 +3415,8 @@ static void ceph_msg_release(struct kref *kref) m->middle = NULL; } - list_splice_init(&m->data, &data); - list_for_each_safe(links, next, &data) { - struct ceph_msg_data *data; - - data = list_entry(links, struct ceph_msg_data, links); - list_del_init(links); + list_for_each_entry_safe(data, next, &m->data, links) { + list_del_init(&data->links); ceph_msg_data_destroy(data); } m->data_length = 0; diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index edda016..de85ddd 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -364,10 +364,6 @@ static bool have_debugfs_info(struct ceph_mon_client *monc) return monc->client->have_fsid && monc->auth->global_id > 0; } -/* - * The monitor responds with mount ack indicate mount success. The - * included client ticket allows the client to talk to MDSs and OSDs. - */ static void ceph_monc_handle_map(struct ceph_mon_client *monc, struct ceph_msg *msg) { |