diff options
38 files changed, 2261 insertions, 679 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 0a30647..501adc2 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -18,6 +18,28 @@ Removal of a device: $ echo <dev-id> > /sys/bus/rbd/remove +What: /sys/bus/rbd/add_single_major +Date: December 2013 +KernelVersion: 3.14 +Contact: Sage Weil <sage@inktank.com> +Description: Available only if rbd module is inserted with single_major + parameter set to true. + Usage is the same as for /sys/bus/rbd/add. If present, + should be used instead of the latter: any attempts to use + /sys/bus/rbd/add if /sys/bus/rbd/add_single_major is + available will fail for backwards compatibility reasons. + +What: /sys/bus/rbd/remove_single_major +Date: December 2013 +KernelVersion: 3.14 +Contact: Sage Weil <sage@inktank.com> +Description: Available only if rbd module is inserted with single_major + parameter set to true. + Usage is the same as for /sys/bus/rbd/remove. If present, + should be used instead of the latter: any attempts to use + /sys/bus/rbd/remove if /sys/bus/rbd/remove_single_major is + available will fail for backwards compatibility reasons. + Entries under /sys/bus/rbd/devices/<dev-id>/ -------------------------------------------- @@ -33,6 +55,10 @@ major The block device major number. +minor + + The block device minor number. (December 2013, since 3.14.) + name The name of the rbd image. diff --git a/MAINTAINERS b/MAINTAINERS index 2507f38..9bf651c 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -7075,7 +7075,7 @@ F: drivers/media/parport/*-qcam* RADOS BLOCK DEVICE (RBD) M: Yehuda Sadeh <yehuda@inktank.com> M: Sage Weil <sage@inktank.com> -M: Alex Elder <elder@inktank.com> +M: Alex Elder <elder@kernel.org> M: ceph-devel@vger.kernel.org W: http://ceph.com/ T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cb1db29..16cab66 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -41,6 +41,7 @@ #include <linux/fs.h> #include <linux/blkdev.h> #include <linux/slab.h> +#include <linux/idr.h> #include "rbd_types.h" @@ -89,9 +90,9 @@ static int atomic_dec_return_safe(atomic_t *v) } #define RBD_DRV_NAME "rbd" -#define RBD_DRV_NAME_LONG "rbd (rados block device)" -#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ +#define RBD_MINORS_PER_MAJOR 256 +#define RBD_SINGLE_MAJOR_PART_SHIFT 4 #define RBD_SNAP_DEV_NAME_PREFIX "snap_" #define RBD_MAX_SNAP_NAME_LEN \ @@ -323,6 +324,7 @@ struct rbd_device { int dev_id; /* blkdev unique id */ int major; /* blkdev assigned major */ + int minor; struct gendisk *disk; /* blkdev's gendisk and rq */ u32 image_format; /* Either 1 or 2 */ @@ -386,6 +388,17 @@ static struct kmem_cache *rbd_img_request_cache; static struct kmem_cache *rbd_obj_request_cache; static struct kmem_cache *rbd_segment_name_cache; +static int rbd_major; +static DEFINE_IDA(rbd_dev_id_ida); + +/* + * Default to false for now, as single-major requires >= 0.75 version of + * userspace rbd utility. + */ +static bool single_major = false; +module_param(single_major, bool, S_IRUGO); +MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)"); + static int rbd_img_request_submit(struct rbd_img_request *img_request); static void rbd_dev_device_release(struct device *dev); @@ -394,18 +407,52 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); static ssize_t rbd_remove(struct bus_type *bus, const char *buf, size_t count); +static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf, + size_t count); +static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf, + size_t count); static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping); static void rbd_spec_put(struct rbd_spec *spec); +static int rbd_dev_id_to_minor(int dev_id) +{ + return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT; +} + +static int minor_to_rbd_dev_id(int minor) +{ + return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; +} + static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); +static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); +static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major); static struct attribute *rbd_bus_attrs[] = { &bus_attr_add.attr, &bus_attr_remove.attr, + &bus_attr_add_single_major.attr, + &bus_attr_remove_single_major.attr, NULL, }; -ATTRIBUTE_GROUPS(rbd_bus); + +static umode_t rbd_bus_is_visible(struct kobject *kobj, + struct attribute *attr, int index) +{ + if (!single_major && + (attr == &bus_attr_add_single_major.attr || + attr == &bus_attr_remove_single_major.attr)) + return 0; + + return attr->mode; +} + +static const struct attribute_group rbd_bus_group = { + .attrs = rbd_bus_attrs, + .is_visible = rbd_bus_is_visible, +}; +__ATTRIBUTE_GROUPS(rbd_bus); static struct bus_type rbd_bus_type = { .name = "rbd", @@ -1041,9 +1088,9 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) name_format = "%s.%012llx"; if (rbd_dev->image_format == 2) name_format = "%s.%016llx"; - ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format, + ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format, rbd_dev->header.object_prefix, segment); - if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) { + if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) { pr_err("error formatting segment name for #%llu (%d)\n", segment, ret); kfree(name); @@ -1761,11 +1808,8 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req->r_callback = rbd_osd_req_callback; osd_req->r_priv = obj_request; - osd_req->r_oid_len = strlen(obj_request->object_name); - rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); - memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); - - osd_req->r_file_layout = rbd_dev->layout; /* struct */ + osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); return osd_req; } @@ -1802,11 +1846,8 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) osd_req->r_callback = rbd_osd_req_callback; osd_req->r_priv = obj_request; - osd_req->r_oid_len = strlen(obj_request->object_name); - rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); - memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); - - osd_req->r_file_layout = rbd_dev->layout; /* struct */ + osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); return osd_req; } @@ -2866,7 +2907,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) * Request sync osd watch/unwatch. The value of "start" determines * whether a watch request is being initiated or torn down. */ -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) +static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; @@ -2941,6 +2982,22 @@ out_cancel: return ret; } +static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) +{ + return __rbd_dev_header_watch_sync(rbd_dev, true); +} + +static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +{ + int ret; + + ret = __rbd_dev_header_watch_sync(rbd_dev, false); + if (ret) { + rbd_warn(rbd_dev, "unable to tear down watch request: %d\n", + ret); + } +} + /* * Synchronous osd object method call. Returns the number of bytes * returned in the outbound buffer, or a negative error code. @@ -3388,14 +3445,18 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) u64 segment_size; /* create gendisk info */ - disk = alloc_disk(RBD_MINORS_PER_MAJOR); + disk = alloc_disk(single_major ? + (1 << RBD_SINGLE_MAJOR_PART_SHIFT) : + RBD_MINORS_PER_MAJOR); if (!disk) return -ENOMEM; snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", rbd_dev->dev_id); disk->major = rbd_dev->major; - disk->first_minor = 0; + disk->first_minor = rbd_dev->minor; + if (single_major) + disk->flags |= GENHD_FL_EXT_DEVT; disk->fops = &rbd_bd_ops; disk->private_data = rbd_dev; @@ -3467,7 +3528,14 @@ static ssize_t rbd_major_show(struct device *dev, return sprintf(buf, "%d\n", rbd_dev->major); return sprintf(buf, "(none)\n"); +} +static ssize_t rbd_minor_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%d\n", rbd_dev->minor); } static ssize_t rbd_client_id_show(struct device *dev, @@ -3589,6 +3657,7 @@ static ssize_t rbd_image_refresh(struct device *dev, static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); +static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); @@ -3602,6 +3671,7 @@ static struct attribute *rbd_attrs[] = { &dev_attr_size.attr, &dev_attr_features.attr, &dev_attr_major.attr, + &dev_attr_minor.attr, &dev_attr_client_id.attr, &dev_attr_pool.attr, &dev_attr_pool_id.attr, @@ -4372,21 +4442,29 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev) device_unregister(&rbd_dev->dev); } -static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); - /* * Get a unique rbd identifier for the given new rbd_dev, and add - * the rbd_dev to the global list. The minimum rbd id is 1. + * the rbd_dev to the global list. */ -static void rbd_dev_id_get(struct rbd_device *rbd_dev) +static int rbd_dev_id_get(struct rbd_device *rbd_dev) { - rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max); + int new_dev_id; + + new_dev_id = ida_simple_get(&rbd_dev_id_ida, + 0, minor_to_rbd_dev_id(1 << MINORBITS), + GFP_KERNEL); + if (new_dev_id < 0) + return new_dev_id; + + rbd_dev->dev_id = new_dev_id; spin_lock(&rbd_dev_list_lock); list_add_tail(&rbd_dev->node, &rbd_dev_list); spin_unlock(&rbd_dev_list_lock); - dout("rbd_dev %p given dev id %llu\n", rbd_dev, - (unsigned long long) rbd_dev->dev_id); + + dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id); + + return 0; } /* @@ -4395,49 +4473,13 @@ static void rbd_dev_id_get(struct rbd_device *rbd_dev) */ static void rbd_dev_id_put(struct rbd_device *rbd_dev) { - struct list_head *tmp; - int rbd_id = rbd_dev->dev_id; - int max_id; - - rbd_assert(rbd_id > 0); - - dout("rbd_dev %p released dev id %llu\n", rbd_dev, - (unsigned long long) rbd_dev->dev_id); spin_lock(&rbd_dev_list_lock); list_del_init(&rbd_dev->node); - - /* - * If the id being "put" is not the current maximum, there - * is nothing special we need to do. - */ - if (rbd_id != atomic64_read(&rbd_dev_id_max)) { - spin_unlock(&rbd_dev_list_lock); - return; - } - - /* - * We need to update the current maximum id. Search the - * list to find out what it is. We're more likely to find - * the maximum at the end, so search the list backward. - */ - max_id = 0; - list_for_each_prev(tmp, &rbd_dev_list) { - struct rbd_device *rbd_dev; - - rbd_dev = list_entry(tmp, struct rbd_device, node); - if (rbd_dev->dev_id > max_id) - max_id = rbd_dev->dev_id; - } spin_unlock(&rbd_dev_list_lock); - /* - * The max id could have been updated by rbd_dev_id_get(), in - * which case it now accurately reflects the new maximum. - * Be careful not to overwrite the maximum value in that - * case. - */ - atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id); - dout(" max dev id has been reset\n"); + ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); + + dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id); } /* @@ -4860,20 +4902,29 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) { int ret; - /* generate unique id: find highest unique id, add one */ - rbd_dev_id_get(rbd_dev); + /* Get an id and fill in device name. */ + + ret = rbd_dev_id_get(rbd_dev); + if (ret) + return ret; - /* Fill in the device name, now that we have its id. */ BUILD_BUG_ON(DEV_NAME_LEN < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); - /* Get our block major device number. */ + /* Record our major and minor device numbers. */ - ret = register_blkdev(0, rbd_dev->name); - if (ret < 0) - goto err_out_id; - rbd_dev->major = ret; + if (!single_major) { + ret = register_blkdev(0, rbd_dev->name); + if (ret < 0) + goto err_out_id; + + rbd_dev->major = ret; + rbd_dev->minor = 0; + } else { + rbd_dev->major = rbd_major; + rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id); + } /* Set up the blkdev mapping. */ @@ -4905,7 +4956,8 @@ err_out_mapping: err_out_disk: rbd_free_disk(rbd_dev); err_out_blkdev: - unregister_blkdev(rbd_dev->major, rbd_dev->name); + if (!single_major) + unregister_blkdev(rbd_dev->major, rbd_dev->name); err_out_id: rbd_dev_id_put(rbd_dev); rbd_dev_mapping_clear(rbd_dev); @@ -4961,7 +5013,6 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev) static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) { int ret; - int tmp; /* * Get the id from the image id object. Unless there's an @@ -4980,7 +5031,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) goto err_out_format; if (mapping) { - ret = rbd_dev_header_watch_sync(rbd_dev, true); + ret = rbd_dev_header_watch_sync(rbd_dev); if (ret) goto out_header_name; } @@ -5007,12 +5058,8 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) err_out_probe: rbd_dev_unprobe(rbd_dev); err_out_watch: - if (mapping) { - tmp = rbd_dev_header_watch_sync(rbd_dev, false); - if (tmp) - rbd_warn(rbd_dev, "unable to tear down " - "watch request (%d)\n", tmp); - } + if (mapping) + rbd_dev_header_unwatch_sync(rbd_dev); out_header_name: kfree(rbd_dev->header_name); rbd_dev->header_name = NULL; @@ -5026,9 +5073,9 @@ err_out_format: return ret; } -static ssize_t rbd_add(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t do_rbd_add(struct bus_type *bus, + const char *buf, + size_t count) { struct rbd_device *rbd_dev = NULL; struct ceph_options *ceph_opts = NULL; @@ -5090,6 +5137,12 @@ static ssize_t rbd_add(struct bus_type *bus, rc = rbd_dev_device_setup(rbd_dev); if (rc) { + /* + * rbd_dev_header_unwatch_sync() can't be moved into + * rbd_dev_image_release() without refactoring, see + * commit 1f3ef78861ac. + */ + rbd_dev_header_unwatch_sync(rbd_dev); rbd_dev_image_release(rbd_dev); goto err_out_module; } @@ -5110,6 +5163,23 @@ err_out_module: return (ssize_t)rc; } +static ssize_t rbd_add(struct bus_type *bus, + const char *buf, + size_t count) +{ + if (single_major) + return -EINVAL; + + return do_rbd_add(bus, buf, count); +} + +static ssize_t rbd_add_single_major(struct bus_type *bus, + const char *buf, + size_t count) +{ + return do_rbd_add(bus, buf, count); +} + static void rbd_dev_device_release(struct device *dev) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); @@ -5117,8 +5187,8 @@ static void rbd_dev_device_release(struct device *dev) rbd_free_disk(rbd_dev); clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); rbd_dev_mapping_clear(rbd_dev); - unregister_blkdev(rbd_dev->major, rbd_dev->name); - rbd_dev->major = 0; + if (!single_major) + unregister_blkdev(rbd_dev->major, rbd_dev->name); rbd_dev_id_put(rbd_dev); rbd_dev_mapping_clear(rbd_dev); } @@ -5149,9 +5219,9 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) } } -static ssize_t rbd_remove(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t do_rbd_remove(struct bus_type *bus, + const char *buf, + size_t count) { struct rbd_device *rbd_dev = NULL; struct list_head *tmp; @@ -5191,16 +5261,14 @@ static ssize_t rbd_remove(struct bus_type *bus, if (ret < 0 || already) return ret; - ret = rbd_dev_header_watch_sync(rbd_dev, false); - if (ret) - rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret); - + rbd_dev_header_unwatch_sync(rbd_dev); /* * flush remaining watch callbacks - these must be complete * before the osd_client is shutdown */ dout("%s: flushing notifies", __func__); ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); + /* * Don't free anything from rbd_dev->disk until after all * notifies are completely processed. Otherwise @@ -5214,6 +5282,23 @@ static ssize_t rbd_remove(struct bus_type *bus, return count; } +static ssize_t rbd_remove(struct bus_type *bus, + const char *buf, + size_t count) +{ + if (single_major) + return -EINVAL; + + return do_rbd_remove(bus, buf, count); +} + +static ssize_t rbd_remove_single_major(struct bus_type *bus, + const char *buf, + size_t count) +{ + return do_rbd_remove(bus, buf, count); +} + /* * create control files in sysfs * /sys/bus/rbd/... @@ -5259,7 +5344,7 @@ static int rbd_slab_init(void) rbd_assert(!rbd_segment_name_cache); rbd_segment_name_cache = kmem_cache_create("rbd_segment_name", - MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL); + CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL); if (rbd_segment_name_cache) return 0; out_err: @@ -5295,24 +5380,45 @@ static int __init rbd_init(void) if (!libceph_compatible(NULL)) { rbd_warn(NULL, "libceph incompatibility (quitting)"); - return -EINVAL; } + rc = rbd_slab_init(); if (rc) return rc; + + if (single_major) { + rbd_major = register_blkdev(0, RBD_DRV_NAME); + if (rbd_major < 0) { + rc = rbd_major; + goto err_out_slab; + } + } + rc = rbd_sysfs_init(); if (rc) - rbd_slab_exit(); + goto err_out_blkdev; + + if (single_major) + pr_info("loaded (major %d)\n", rbd_major); else - pr_info("loaded " RBD_DRV_NAME_LONG "\n"); + pr_info("loaded\n"); + + return 0; +err_out_blkdev: + if (single_major) + unregister_blkdev(rbd_major, RBD_DRV_NAME); +err_out_slab: + rbd_slab_exit(); return rc; } static void __exit rbd_exit(void) { rbd_sysfs_cleanup(); + if (single_major) + unregister_blkdev(rbd_major, RBD_DRV_NAME); rbd_slab_exit(); } @@ -5322,9 +5428,8 @@ module_exit(rbd_exit); MODULE_AUTHOR("Alex Elder <elder@inktank.com>"); MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); -MODULE_DESCRIPTION("rados block device"); - /* following authorship retained from original osdblk.c */ MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); +MODULE_DESCRIPTION("RADOS Block Device (RBD) driver"); MODULE_LICENSE("GPL"); diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig index ac9a2ef..264e9bf 100644 --- a/fs/ceph/Kconfig +++ b/fs/ceph/Kconfig @@ -25,3 +25,16 @@ config CEPH_FSCACHE caching support for Ceph clients using FS-Cache endif + +config CEPH_FS_POSIX_ACL + bool "Ceph POSIX Access Control Lists" + depends on CEPH_FS + select FS_POSIX_ACL + help + POSIX Access Control Lists (ACLs) support permissions for users and + groups beyond the owner/group/world scheme. + + To learn more about Access Control Lists, visit the POSIX ACLs for + Linux website <http://acl.bestbits.at/>. + + If you don't know what Access Control Lists are, say N diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile index 32e3010..85a4230 100644 --- a/fs/ceph/Makefile +++ b/fs/ceph/Makefile @@ -10,3 +10,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \ debugfs.o ceph-$(CONFIG_CEPH_FSCACHE) += cache.o +ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c new file mode 100644 index 0000000..64fddbc --- /dev/null +++ b/fs/ceph/acl.c @@ -0,0 +1,332 @@ +/* + * linux/fs/ceph/acl.c + * + * Copyright (C) 2013 Guangliang Zhao, <lucienchao@gmail.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA. + */ + +#include <linux/ceph/ceph_debug.h> +#include <linux/fs.h> +#include <linux/string.h> +#include <linux/xattr.h> +#include <linux/posix_acl_xattr.h> +#include <linux/posix_acl.h> +#include <linux/sched.h> +#include <linux/slab.h> + +#include "super.h" + +static inline void ceph_set_cached_acl(struct inode *inode, + int type, struct posix_acl *acl) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + + spin_lock(&ci->i_ceph_lock); + if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0)) + set_cached_acl(inode, type, acl); + spin_unlock(&ci->i_ceph_lock); +} + +static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode, + int type) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct posix_acl *acl = ACL_NOT_CACHED; + + spin_lock(&ci->i_ceph_lock); + if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0)) + acl = get_cached_acl(inode, type); + spin_unlock(&ci->i_ceph_lock); + + return acl; +} + +void ceph_forget_all_cached_acls(struct inode *inode) +{ + forget_all_cached_acls(inode); +} + +struct posix_acl *ceph_get_acl(struct inode *inode, int type) +{ + int size; + const char *name; + char *value = NULL; + struct posix_acl *acl; + + if (!IS_POSIXACL(inode)) + return NULL; + + acl = ceph_get_cached_acl(inode, type); + if (acl != ACL_NOT_CACHED) + return acl; + + switch (type) { + case ACL_TYPE_ACCESS: + name = POSIX_ACL_XATTR_ACCESS; + break; + case ACL_TYPE_DEFAULT: + name = POSIX_ACL_XATTR_DEFAULT; + break; + default: + BUG(); + } + + size = __ceph_getxattr(inode, name, "", 0); + if (size > 0) { + value = kzalloc(size, GFP_NOFS); + if (!value) + return ERR_PTR(-ENOMEM); + size = __ceph_getxattr(inode, name, value, size); + } + + if (size > 0) + acl = posix_acl_from_xattr(&init_user_ns, value, size); + else if (size == -ERANGE || size == -ENODATA || size == 0) + acl = NULL; + else + acl = ERR_PTR(-EIO); + + kfree(value); + + if (!IS_ERR(acl)) + ceph_set_cached_acl(inode, type, acl); + + return acl; +} + +static int ceph_set_acl(struct dentry *dentry, struct inode *inode, + struct posix_acl *acl, int type) +{ + int ret = 0, size = 0; + const char *name = NULL; + char *value = NULL; + struct iattr newattrs; + umode_t new_mode = inode->i_mode, old_mode = inode->i_mode; + + if (acl) { + ret = posix_acl_valid(acl); + if (ret < 0) + goto out; + } + + switch (type) { + case ACL_TYPE_ACCESS: + name = POSIX_ACL_XATTR_ACCESS; + if (acl) { + ret = posix_acl_equiv_mode(acl, &new_mode); + if (ret < 0) + goto out; + if (ret == 0) + acl = NULL; + } + break; + case ACL_TYPE_DEFAULT: + if (!S_ISDIR(inode->i_mode)) { + ret = acl ? -EINVAL : 0; + goto out; + } + name = POSIX_ACL_XATTR_DEFAULT; + break; + default: + ret = -EINVAL; + goto out; + } + + if (acl) { + size = posix_acl_xattr_size(acl->a_count); + value = kmalloc(size, GFP_NOFS); + if (!value) { + ret = -ENOMEM; + goto out; + } + + ret = posix_acl_to_xattr(&init_user_ns, acl, value, size); + if (ret < 0) + goto out_free; + } + + if (new_mode != old_mode) { + newattrs.ia_mode = new_mode; + newattrs.ia_valid = ATTR_MODE; + ret = ceph_setattr(dentry, &newattrs); + if (ret) + goto out_free; + } + + if (value) + ret = __ceph_setxattr(dentry, name, value, size, 0); + else + ret = __ceph_removexattr(dentry, name); + + if (ret) { + if (new_mode != old_mode) { + newattrs.ia_mode = old_mode; + newattrs.ia_valid = ATTR_MODE; + ceph_setattr(dentry, &newattrs); + } + goto out_free; + } + + ceph_set_cached_acl(inode, type, acl); + +out_free: + kfree(value); +out: + return ret; +} + +int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir) +{ + struct posix_acl *acl = NULL; + int ret = 0; + + if (!S_ISLNK(inode->i_mode)) { + if (IS_POSIXACL(dir)) { + acl = ceph_get_acl(dir, ACL_TYPE_DEFAULT); + if (IS_ERR(acl)) { + ret = PTR_ERR(acl); + goto out; + } + } + + if (!acl) + inode->i_mode &= ~current_umask(); + } + + if (IS_POSIXACL(dir) && acl) { + if (S_ISDIR(inode->i_mode)) { + ret = ceph_set_acl(dentry, inode, acl, + ACL_TYPE_DEFAULT); + if (ret) + goto out_release; + } + ret = posix_acl_create(&acl, GFP_NOFS, &inode->i_mode); + if (ret < 0) + goto out; + else if (ret > 0) + ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS); + else + cache_no_acl(inode); + } else { + cache_no_acl(inode); + } + +out_release: + posix_acl_release(acl); +out: + return ret; +} + +int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) +{ + struct posix_acl *acl; + int ret = 0; + + if (S_ISLNK(inode->i_mode)) { + ret = -EOPNOTSUPP; + goto out; + } + + if (!IS_POSIXACL(inode)) + goto out; + + acl = ceph_get_acl(inode, ACL_TYPE_ACCESS); + if (IS_ERR_OR_NULL(acl)) { + ret = PTR_ERR(acl); + goto out; + } + + ret = posix_acl_chmod(&acl, GFP_KERNEL, inode->i_mode); + if (ret) + goto out; + ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS); + posix_acl_release(acl); +out: + return ret; +} + +static int ceph_xattr_acl_get(struct dentry *dentry, const char *name, + void *value, size_t size, int type) +{ + struct posix_acl *acl; + int ret = 0; + + if (!IS_POSIXACL(dentry->d_inode)) + return -EOPNOTSUPP; + + acl = ceph_get_acl(dentry->d_inode, type); + if (IS_ERR(acl)) + return PTR_ERR(acl); + if (acl == NULL) + return -ENODATA; + + ret = posix_acl_to_xattr(&init_user_ns, acl, value, size); + posix_acl_release(acl); + + return ret; +} + +static int ceph_xattr_acl_set(struct dentry *dentry, const char *name, + const void *value, size_t size, int flags, int type) +{ + int ret = 0; + struct posix_acl *acl = NULL; + + if (!inode_owner_or_capable(dentry->d_inode)) { + ret = -EPERM; + goto out; + } + + if (!IS_POSIXACL(dentry->d_inode)) { + ret = -EOPNOTSUPP; + goto out; + } + + if (value) { + acl = posix_acl_from_xattr(&init_user_ns, value, size); + if (IS_ERR(acl)) { + ret = PTR_ERR(acl); + goto out; + } + + if (acl) { + ret = posix_acl_valid(acl); + if (ret) + goto out_release; + } + } + + ret = ceph_set_acl(dentry, dentry->d_inode, acl, type); + +out_release: + posix_acl_release(acl); +out: + return ret; +} + +const struct xattr_handler ceph_xattr_acl_default_handler = { + .prefix = POSIX_ACL_XATTR_DEFAULT, + .flags = ACL_TYPE_DEFAULT, + .get = ceph_xattr_acl_get, + .set = ceph_xattr_acl_set, +}; + +const struct xattr_handler ceph_xattr_acl_access_handler = { + .prefix = POSIX_ACL_XATTR_ACCESS, + .flags = ACL_TYPE_ACCESS, + .get = ceph_xattr_acl_get, + .set = ceph_xattr_acl_set, +}; diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index ec3ba43..b53278c 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -209,6 +209,7 @@ static int readpage_nounlock(struct file *filp, struct page *page) err = 0; if (err < 0) { SetPageError(page); + ceph_fscache_readpage_cancel(inode, page); goto out; } else { if (err < PAGE_CACHE_SIZE) { @@ -256,6 +257,8 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) for (i = 0; i < num_pages; i++) { struct page *page = osd_data->pages[i]; + if (rc < 0) + goto unlock; if (bytes < (int)PAGE_CACHE_SIZE) { /* zero (remainder of) page */ int s = bytes < 0 ? 0 : bytes; @@ -266,6 +269,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) flush_dcache_page(page); SetPageUptodate(page); ceph_readpage_to_fscache(inode, page); +unlock: unlock_page(page); page_cache_release(page); bytes -= PAGE_CACHE_SIZE; @@ -1207,6 +1211,41 @@ const struct address_space_operations ceph_aops = { /* * vm ops */ +static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) +{ + struct inode *inode = file_inode(vma->vm_file); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_file_info *fi = vma->vm_file->private_data; + loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT; + int want, got, ret; + + dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n", + inode, ceph_vinop(inode), off, (size_t)PAGE_CACHE_SIZE); + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_CACHE; + while (1) { + got = 0; + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); + if (ret == 0) + break; + if (ret != -ERESTARTSYS) { + WARN_ON(1); + return VM_FAULT_SIGBUS; + } + } + dout("filemap_fault %p %llu~%zd got cap refs on %s\n", + inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got)); + + ret = filemap_fault(vma, vmf); + + dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", + inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); + ceph_put_cap_refs(ci, got); + + return ret; +} /* * Reuse write_begin here for simplicity. @@ -1214,23 +1253,41 @@ const struct address_space_operations ceph_aops = { static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) { struct inode *inode = file_inode(vma->vm_file); - struct page *page = vmf->page; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_file_info *fi = vma->vm_file->private_data; struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct page *page = vmf->page; loff_t off = page_offset(page); - loff_t size, len; - int ret; - - /* Update time before taking page lock */ - file_update_time(vma->vm_file); + loff_t size = i_size_read(inode); + size_t len; + int want, got, ret; - size = i_size_read(inode); if (off + PAGE_CACHE_SIZE <= size) len = PAGE_CACHE_SIZE; else len = size & ~PAGE_CACHE_MASK; - dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode, - off, len, page, page->index); + dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n", + inode, ceph_vinop(inode), off, len, size); + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_BUFFER; + while (1) { + got = 0; + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len); + if (ret == 0) + break; + if (ret != -ERESTARTSYS) { + WARN_ON(1); + return VM_FAULT_SIGBUS; + } + } + dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", + inode, off, len, ceph_cap_string(got)); + + /* Update time before taking page lock */ + file_update_time(vma->vm_file); lock_page(page); @@ -1252,14 +1309,26 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = VM_FAULT_SIGBUS; } out: - dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret); - if (ret != VM_FAULT_LOCKED) + if (ret != VM_FAULT_LOCKED) { unlock_page(page); + } else { + int dirty; + spin_lock(&ci->i_ceph_lock); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + spin_unlock(&ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(inode, dirty); + } + + dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n", + inode, off, len, ceph_cap_string(got), ret); + ceph_put_cap_refs(ci, got); + return ret; } static struct vm_operations_struct ceph_vmops = { - .fault = filemap_fault, + .fault = ceph_filemap_fault, .page_mkwrite = ceph_page_mkwrite, .remap_pages = generic_file_remap_pages, }; diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h index ba94940..da95f61 100644 --- a/fs/ceph/cache.h +++ b/fs/ceph/cache.h @@ -67,6 +67,14 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp) return fscache_maybe_release_page(ci->fscache, page, gfp); } +static inline void ceph_fscache_readpage_cancel(struct inode *inode, + struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + if (fscache_cookie_valid(ci->fscache) && PageFsCache(page)) + __fscache_uncache_page(ci->fscache, page); +} + static inline void ceph_fscache_readpages_cancel(struct inode *inode, struct list_head *pages) { @@ -145,6 +153,11 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp) return 1; } +static inline void ceph_fscache_readpage_cancel(struct inode *inode, + struct page *page) +{ +} + static inline void ceph_fscache_readpages_cancel(struct inode *inode, struct list_head *pages) { diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 3c0a4bd..1754338 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -555,21 +555,34 @@ retry: cap->ci = ci; __insert_cap_node(ci, cap); - /* clear out old exporting info? (i.e. on cap import) */ - if (ci->i_cap_exporting_mds == mds) { - ci->i_cap_exporting_issued = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_mds = -1; - } - /* add to session cap list */ cap->session = session; spin_lock(&session->s_cap_lock); list_add_tail(&cap->session_caps, &session->s_caps); session->s_nr_caps++; spin_unlock(&session->s_cap_lock); - } else if (new_cap) - ceph_put_cap(mdsc, new_cap); + } else { + if (new_cap) + ceph_put_cap(mdsc, new_cap); + + /* + * auth mds of the inode changed. we received the cap export + * message, but still haven't received the cap import message. + * handle_cap_export() updated the new auth MDS' cap. + * + * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing + * a message that was send before the cap import message. So + * don't remove caps. + */ + if (ceph_seq_cmp(seq, cap->seq) <= 0) { + WARN_ON(cap != ci->i_auth_cap); + WARN_ON(cap->cap_id != cap_id); + seq = cap->seq; + mseq = cap->mseq; + issued |= cap->issued; + flags |= CEPH_CAP_FLAG_AUTH; + } + } if (!ci->i_snap_realm) { /* @@ -611,15 +624,9 @@ retry: if (ci->i_auth_cap == NULL || ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) ci->i_auth_cap = cap; - } else if (ci->i_auth_cap == cap) { - ci->i_auth_cap = NULL; - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p to cap_dirty_migrating\n", inode); - list_move(&ci->i_dirty_item, - &mdsc->cap_dirty_migrating); - } - spin_unlock(&mdsc->cap_dirty_lock); + ci->i_cap_exporting_issued = 0; + } else { + WARN_ON(ci->i_auth_cap == cap); } dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", @@ -628,7 +635,7 @@ retry: cap->cap_id = cap_id; cap->issued = issued; cap->implemented |= issued; - if (mseq > cap->mseq) + if (ceph_seq_cmp(mseq, cap->mseq) > 0) cap->mds_wanted = wanted; else cap->mds_wanted |= wanted; @@ -816,7 +823,7 @@ int __ceph_caps_revoking_other(struct ceph_inode_info *ci, for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); - if (cap != ocap && __cap_is_valid(cap) && + if (cap != ocap && (cap->implemented & ~cap->issued & mask)) return 1; } @@ -888,7 +895,19 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) */ static int __ceph_is_any_caps(struct ceph_inode_info *ci) { - return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0; + return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued; +} + +int ceph_is_any_caps(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + spin_lock(&ci->i_ceph_lock); + ret = __ceph_is_any_caps(ci); + spin_unlock(&ci->i_ceph_lock); + + return ret; } /* @@ -1383,13 +1402,10 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ci->i_snap_realm->cached_context); dout(" inode %p now dirty snapc %p auth cap %p\n", &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); + WARN_ON(!ci->i_auth_cap); BUG_ON(!list_empty(&ci->i_dirty_item)); spin_lock(&mdsc->cap_dirty_lock); - if (ci->i_auth_cap) - list_add(&ci->i_dirty_item, &mdsc->cap_dirty); - else - list_add(&ci->i_dirty_item, - &mdsc->cap_dirty_migrating); + list_add(&ci->i_dirty_item, &mdsc->cap_dirty); spin_unlock(&mdsc->cap_dirty_lock); if (ci->i_flushing_caps == 0) { ihold(inode); @@ -1735,13 +1751,12 @@ ack: /* * Try to flush dirty caps back to the auth mds. */ -static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, - unsigned *flush_tid) +static int try_flush_caps(struct inode *inode, unsigned *flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - int unlock_session = session ? 0 : 1; int flushing = 0; + struct ceph_mds_session *session = NULL; retry: spin_lock(&ci->i_ceph_lock); @@ -1755,13 +1770,14 @@ retry: int want = __ceph_caps_wanted(ci); int delayed; - if (!session) { + if (!session || session != cap->session) { spin_unlock(&ci->i_ceph_lock); + if (session) + mutex_unlock(&session->s_mutex); session = cap->session; mutex_lock(&session->s_mutex); goto retry; } - BUG_ON(session != cap->session); if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; @@ -1780,7 +1796,7 @@ retry: out: spin_unlock(&ci->i_ceph_lock); out_unlocked: - if (session && unlock_session) + if (session) mutex_unlock(&session->s_mutex); return flushing; } @@ -1865,7 +1881,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) return ret; mutex_lock(&inode->i_mutex); - dirty = try_flush_caps(inode, NULL, &flush_tid); + dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); /* @@ -1900,7 +1916,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) dout("write_inode %p wait=%d\n", inode, wait); if (wait) { - dirty = try_flush_caps(inode, NULL, &flush_tid); + dirty = try_flush_caps(inode, &flush_tid); if (dirty) err = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); @@ -2350,11 +2366,11 @@ static void invalidate_aliases(struct inode *inode) d_prune_aliases(inode); /* * For non-directory inode, d_find_alias() only returns - * connected dentry. After calling d_invalidate(), the - * dentry become disconnected. + * hashed dentry. After calling d_invalidate(), the + * dentry becomes unhashed. * * For directory inode, d_find_alias() can return - * disconnected dentry. But directory inode should have + * unhashed dentry. But directory inode should have * one alias at most. */ while ((dn = d_find_alias(inode))) { @@ -2408,6 +2424,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, inode->i_size); + + /* + * auth mds of the inode changed. we received the cap export message, + * but still haven't received the cap import message. handle_cap_export + * updated the new auth MDS' cap. + * + * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message + * that was sent before the cap import message. So don't remove caps. + */ + if (ceph_seq_cmp(seq, cap->seq) <= 0) { + WARN_ON(cap != ci->i_auth_cap); + WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id)); + seq = cap->seq; + newcaps |= cap->issued; + } + /* * If CACHE is being revoked, and we have no dirty buffers, * try to invalidate (once). (If there are dirty buffers, we @@ -2434,6 +2466,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, issued |= implemented | __ceph_caps_dirty(ci); cap->cap_gen = session->s_cap_gen; + cap->seq = seq; __check_cap_issue(ci, cap, newcaps); @@ -2464,6 +2497,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ceph_buffer_put(ci->i_xattrs.blob); ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); ci->i_xattrs.version = version; + ceph_forget_all_cached_acls(inode); } } @@ -2483,6 +2517,10 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, &atime); + + /* file layout may have changed */ + ci->i_layout = grant->layout; + /* max size increase? */ if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); @@ -2511,11 +2549,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, check_caps = 1; } - cap->seq = seq; - - /* file layout may have changed */ - ci->i_layout = grant->layout; - /* revocation, grant, or no-op? */ if (cap->issued & ~newcaps) { int revoking = cap->issued & ~newcaps; @@ -2741,65 +2774,114 @@ static void handle_cap_trunc(struct inode *inode, * caller holds s_mutex */ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, - struct ceph_mds_session *session, - int *open_target_sessions) + struct ceph_mds_cap_peer *ph, + struct ceph_mds_session *session) { struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct ceph_mds_session *tsession = NULL; + struct ceph_cap *cap, *tcap; struct ceph_inode_info *ci = ceph_inode(inode); - int mds = session->s_mds; + u64 t_cap_id; unsigned mseq = le32_to_cpu(ex->migrate_seq); - struct ceph_cap *cap = NULL, *t; - struct rb_node *p; - int remember = 1; + unsigned t_seq, t_mseq; + int target, issued; + int mds = session->s_mds; - dout("handle_cap_export inode %p ci %p mds%d mseq %d\n", - inode, ci, mds, mseq); + if (ph) { + t_cap_id = le64_to_cpu(ph->cap_id); + t_seq = le32_to_cpu(ph->seq); + t_mseq = le32_to_cpu(ph->mseq); + target = le32_to_cpu(ph->mds); + } else { + t_cap_id = t_seq = t_mseq = 0; + target = -1; + } + dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n", + inode, ci, mds, mseq, target); +retry: spin_lock(&ci->i_ceph_lock); + cap = __get_cap_for_mds(ci, mds); + if (!cap) + goto out_unlock; - /* make sure we haven't seen a higher mseq */ - for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { - t = rb_entry(p, struct ceph_cap, ci_node); - if (ceph_seq_cmp(t->mseq, mseq) > 0) { - dout(" higher mseq on cap from mds%d\n", - t->session->s_mds); - remember = 0; - } - if (t->session->s_mds == mds) - cap = t; + if (target < 0) { + __ceph_remove_cap(cap, false); + goto out_unlock; } - if (cap) { - if (remember) { - /* make note */ - ci->i_cap_exporting_mds = mds; - ci->i_cap_exporting_mseq = mseq; - ci->i_cap_exporting_issued = cap->issued; - - /* - * make sure we have open sessions with all possible - * export targets, so that we get the matching IMPORT - */ - *open_target_sessions = 1; + /* + * now we know we haven't received the cap import message yet + * because the exported cap still exist. + */ - /* - * we can't flush dirty caps that we've seen the - * EXPORT but no IMPORT for - */ - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p to cap_dirty_migrating\n", - inode); - list_move(&ci->i_dirty_item, - &mdsc->cap_dirty_migrating); + issued = cap->issued; + WARN_ON(issued != cap->implemented); + + tcap = __get_cap_for_mds(ci, target); + if (tcap) { + /* already have caps from the target */ + if (tcap->cap_id != t_cap_id || + ceph_seq_cmp(tcap->seq, t_seq) < 0) { + dout(" updating import cap %p mds%d\n", tcap, target); + tcap->cap_id = t_cap_id; + tcap->seq = t_seq - 1; + tcap->issue_seq = t_seq - 1; + tcap->mseq = t_mseq; + tcap->issued |= issued; + tcap->implemented |= issued; + if (cap == ci->i_auth_cap) + ci->i_auth_cap = tcap; + if (ci->i_flushing_caps && ci->i_auth_cap == tcap) { + spin_lock(&mdsc->cap_dirty_lock); + list_move_tail(&ci->i_flushing_item, + &tcap->session->s_cap_flushing); + spin_unlock(&mdsc->cap_dirty_lock); } - spin_unlock(&mdsc->cap_dirty_lock); } __ceph_remove_cap(cap, false); + goto out_unlock; } - /* else, we already released it */ + if (tsession) { + int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; + spin_unlock(&ci->i_ceph_lock); + /* add placeholder for the export tagert */ + ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, + t_seq - 1, t_mseq, (u64)-1, flag, NULL); + goto retry; + } + + spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&session->s_mutex); + + /* open target session */ + tsession = ceph_mdsc_open_export_target_session(mdsc, target); + if (!IS_ERR(tsession)) { + if (mds > target) { + mutex_lock(&session->s_mutex); + mutex_lock_nested(&tsession->s_mutex, + SINGLE_DEPTH_NESTING); + } else { + mutex_lock(&tsession->s_mutex); + mutex_lock_nested(&session->s_mutex, + SINGLE_DEPTH_NESTING); + } + ceph_add_cap_releases(mdsc, tsession); + } else { + WARN_ON(1); + tsession = NULL; + target = -1; + } + goto retry; + +out_unlock: spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&session->s_mutex); + if (tsession) { + mutex_unlock(&tsession->s_mutex); + ceph_put_mds_session(tsession); + } } /* @@ -2810,10 +2892,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, */ static void handle_cap_import(struct ceph_mds_client *mdsc, struct inode *inode, struct ceph_mds_caps *im, + struct ceph_mds_cap_peer *ph, struct ceph_mds_session *session, void *snaptrace, int snaptrace_len) { struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_cap *cap; int mds = session->s_mds; unsigned issued = le32_to_cpu(im->caps); unsigned wanted = le32_to_cpu(im->wanted); @@ -2821,28 +2905,44 @@ static void handle_cap_import(struct ceph_mds_client *mdsc, unsigned mseq = le32_to_cpu(im->migrate_seq); u64 realmino = le64_to_cpu(im->realm); u64 cap_id = le64_to_cpu(im->cap_id); + u64 p_cap_id; + int peer; - if (ci->i_cap_exporting_mds >= 0 && - ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) { - dout("handle_cap_import inode %p ci %p mds%d mseq %d" - " - cleared exporting from mds%d\n", - inode, ci, mds, mseq, - ci->i_cap_exporting_mds); - ci->i_cap_exporting_issued = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_mds = -1; + if (ph) { + p_cap_id = le64_to_cpu(ph->cap_id); + peer = le32_to_cpu(ph->mds); + } else { + p_cap_id = 0; + peer = -1; + } - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p back to cap_dirty\n", inode); - list_move(&ci->i_dirty_item, &mdsc->cap_dirty); + dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", + inode, ci, mds, mseq, peer); + + spin_lock(&ci->i_ceph_lock); + cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; + if (cap && cap->cap_id == p_cap_id) { + dout(" remove export cap %p mds%d flags %d\n", + cap, peer, ph->flags); + if ((ph->flags & CEPH_CAP_FLAG_AUTH) && + (cap->seq != le32_to_cpu(ph->seq) || + cap->mseq != le32_to_cpu(ph->mseq))) { + pr_err("handle_cap_import: mismatched seq/mseq: " + "ino (%llx.%llx) mds%d seq %d mseq %d " + "importer mds%d has peer seq %d mseq %d\n", + ceph_vinop(inode), peer, cap->seq, + cap->mseq, mds, le32_to_cpu(ph->seq), + le32_to_cpu(ph->mseq)); } - spin_unlock(&mdsc->cap_dirty_lock); - } else { - dout("handle_cap_import inode %p ci %p mds%d mseq %d\n", - inode, ci, mds, mseq); + ci->i_cap_exporting_issued = cap->issued; + __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); } + /* make sure we re-request max_size, if necessary */ + ci->i_wanted_max_size = 0; + ci->i_requested_max_size = 0; + spin_unlock(&ci->i_ceph_lock); + down_write(&mdsc->snap_rwsem); ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, false); @@ -2853,11 +2953,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc, kick_flushing_inode_caps(mdsc, session, inode); up_read(&mdsc->snap_rwsem); - /* make sure we re-request max_size, if necessary */ - spin_lock(&ci->i_ceph_lock); - ci->i_wanted_max_size = 0; /* reset */ - ci->i_requested_max_size = 0; - spin_unlock(&ci->i_ceph_lock); } /* @@ -2875,6 +2970,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_inode_info *ci; struct ceph_cap *cap; struct ceph_mds_caps *h; + struct ceph_mds_cap_peer *peer = NULL; int mds = session->s_mds; int op; u32 seq, mseq; @@ -2885,12 +2981,13 @@ void ceph_handle_caps(struct ceph_mds_session *session, void *snaptrace; size_t snaptrace_len; void *flock; + void *end; u32 flock_len; - int open_target_sessions = 0; dout("handle_caps from mds%d\n", mds); /* decode */ + end = msg->front.iov_base + msg->front.iov_len; tid = le64_to_cpu(msg->hdr.tid); if (msg->front.iov_len < sizeof(*h)) goto bad; @@ -2908,17 +3005,28 @@ void ceph_handle_caps(struct ceph_mds_session *session, snaptrace_len = le32_to_cpu(h->snap_trace_len); if (le16_to_cpu(msg->hdr.version) >= 2) { - void *p, *end; - - p = snaptrace + snaptrace_len; - end = msg->front.iov_base + msg->front.iov_len; + void *p = snaptrace + snaptrace_len; ceph_decode_32_safe(&p, end, flock_len, bad); + if (p + flock_len > end) + goto bad; flock = p; } else { flock = NULL; flock_len = 0; } + if (le16_to_cpu(msg->hdr.version) >= 3) { + if (op == CEPH_CAP_OP_IMPORT) { + void *p = flock + flock_len; + if (p + sizeof(*peer) > end) + goto bad; + peer = p; + } else if (op == CEPH_CAP_OP_EXPORT) { + /* recorded in unused fields */ + peer = (void *)&h->size; + } + } + mutex_lock(&session->s_mutex); session->s_seq++; dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, @@ -2951,11 +3059,11 @@ void ceph_handle_caps(struct ceph_mds_session *session, goto done; case CEPH_CAP_OP_EXPORT: - handle_cap_export(inode, h, session, &open_target_sessions); - goto done; + handle_cap_export(inode, h, peer, session); + goto done_unlocked; case CEPH_CAP_OP_IMPORT: - handle_cap_import(mdsc, inode, h, session, + handle_cap_import(mdsc, inode, h, peer, session, snaptrace, snaptrace_len); } @@ -3007,8 +3115,6 @@ done: done_unlocked: if (inode) iput(inode); - if (open_target_sessions) - ceph_mdsc_open_export_target_sessions(mdsc, session); return; bad: diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 2a0bcae..619616d 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -693,6 +693,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); + + if (!err) + err = ceph_init_acl(dentry, dentry->d_inode, dir); + if (err) d_drop(dentry); return err; @@ -1037,14 +1041,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) valid = 1; } else if (dentry_lease_is_valid(dentry) || dir_lease_is_valid(dir, dentry)) { - valid = 1; + if (dentry->d_inode) + valid = ceph_is_any_caps(dentry->d_inode); + else + valid = 1; } dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); - if (valid) + if (valid) { ceph_dentry_lru_touch(dentry); - else + } else { + ceph_dir_clear_complete(dir); d_drop(dentry); + } iput(dir); return valid; } @@ -1293,6 +1302,7 @@ const struct inode_operations ceph_dir_iops = { .getxattr = ceph_getxattr, .listxattr = ceph_listxattr, .removexattr = ceph_removexattr, + .get_acl = ceph_get_acl, .mknod = ceph_mknod, .symlink = ceph_symlink, .mkdir = ceph_mkdir, diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 3de8982..dfd2ce3 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -408,51 +408,92 @@ more: * * If the read spans object boundary, just do multiple reads. */ -static ssize_t ceph_sync_read(struct file *file, char __user *data, - unsigned len, loff_t *poff, int *checkeof) +static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, + int *checkeof) { + struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct page **pages; - u64 off = *poff; + u64 off = iocb->ki_pos; int num_pages, ret; + size_t len = i->count; - dout("sync_read on file %p %llu~%u %s\n", file, off, len, + dout("sync_read on file %p %llu~%u %s\n", file, off, + (unsigned)len, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); - - if (file->f_flags & O_DIRECT) { - num_pages = calc_pages_for((unsigned long)data, len); - pages = ceph_get_direct_page_vector(data, num_pages, true); - } else { - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); - } - if (IS_ERR(pages)) - return PTR_ERR(pages); - /* * flush any page cache pages in this range. this * will make concurrent normal and sync io slow, * but it will at least behave sensibly when they are * in sequence. */ - ret = filemap_write_and_wait(inode->i_mapping); + ret = filemap_write_and_wait_range(inode->i_mapping, off, + off + len); if (ret < 0) - goto done; + return ret; - ret = striped_read(inode, off, len, pages, num_pages, checkeof, - file->f_flags & O_DIRECT, - (unsigned long)data & ~PAGE_MASK); + if (file->f_flags & O_DIRECT) { + while (iov_iter_count(i)) { + void __user *data = i->iov[0].iov_base + i->iov_offset; + size_t len = i->iov[0].iov_len - i->iov_offset; + + num_pages = calc_pages_for((unsigned long)data, len); + pages = ceph_get_direct_page_vector(data, + num_pages, true); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + ret = striped_read(inode, off, len, + pages, num_pages, checkeof, + 1, (unsigned long)data & ~PAGE_MASK); + ceph_put_page_vector(pages, num_pages, true); + + if (ret <= 0) + break; + off += ret; + iov_iter_advance(i, ret); + if (ret < len) + break; + } + } else { + num_pages = calc_pages_for(off, len); + pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); + if (IS_ERR(pages)) + return PTR_ERR(pages); + ret = striped_read(inode, off, len, pages, + num_pages, checkeof, 0, 0); + if (ret > 0) { + int l, k = 0; + size_t left = len = ret; + + while (left) { + void __user *data = i->iov[0].iov_base + + i->iov_offset; + l = min(i->iov[0].iov_len - i->iov_offset, + left); + + ret = ceph_copy_page_vector_to_user(&pages[k], + data, off, + l); + if (ret > 0) { + iov_iter_advance(i, ret); + left -= ret; + off += ret; + k = calc_pages_for(iocb->ki_pos, + len - left + 1) - 1; + BUG_ON(k >= num_pages && left); + } else + break; + } + } + ceph_release_page_vector(pages, num_pages); + } - if (ret >= 0 && (file->f_flags & O_DIRECT) == 0) - ret = ceph_copy_page_vector_to_user(pages, data, off, ret); - if (ret >= 0) - *poff = off + ret; + if (off > iocb->ki_pos) { + ret = off - iocb->ki_pos; + iocb->ki_pos = off; + } -done: - if (file->f_flags & O_DIRECT) - ceph_put_page_vector(pages, num_pages, true); - else - ceph_release_page_vector(pages, num_pages); dout("sync_read result %d\n", ret); return ret; } @@ -489,83 +530,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) } } + /* - * Synchronous write, straight from __user pointer or user pages (if - * O_DIRECT). + * Synchronous write, straight from __user pointer or user pages. * * If write spans object boundary, just do multiple writes. (For a * correct atomic write, we should e.g. take write locks on all * objects, rollback on failure, etc.) */ -static ssize_t ceph_sync_write(struct file *file, const char __user *data, - size_t left, loff_t pos, loff_t *ppos) +static ssize_t +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, size_t count) { + struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_snap_context *snapc; struct ceph_vino vino; struct ceph_osd_request *req; - int num_ops = 1; struct page **pages; int num_pages; - u64 len; int written = 0; int flags; int check_caps = 0; - int page_align, io_align; - unsigned long buf_align; + int page_align; int ret; struct timespec mtime = CURRENT_TIME; - bool own_pages = false; + loff_t pos = iocb->ki_pos; + struct iov_iter i; if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; - dout("sync_write on file %p %lld~%u %s\n", file, pos, - (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); + dout("sync_direct_write on file %p %lld~%u\n", file, pos, + (unsigned)count); - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); if (ret < 0) return ret; ret = invalidate_inode_pages2_range(inode->i_mapping, pos >> PAGE_CACHE_SHIFT, - (pos + left) >> PAGE_CACHE_SHIFT); + (pos + count) >> PAGE_CACHE_SHIFT); if (ret < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE; - if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) - flags |= CEPH_OSD_FLAG_ACK; - else - num_ops++; /* Also include a 'startsync' command. */ - /* - * we may need to do multiple writes here if we span an object - * boundary. this isn't atomic, unfortunately. :( - */ -more: - io_align = pos & ~PAGE_MASK; - buf_align = (unsigned long)data & ~PAGE_MASK; - len = left; - - snapc = ci->i_snap_realm->cached_context; - vino = ceph_vino(inode); - req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, num_ops, - CEPH_OSD_OP_WRITE, flags, snapc, - ci->i_truncate_seq, ci->i_truncate_size, - false); - if (IS_ERR(req)) - return PTR_ERR(req); + iov_iter_init(&i, iov, nr_segs, count, 0); + + while (iov_iter_count(&i) > 0) { + void __user *data = i.iov->iov_base + i.iov_offset; + u64 len = i.iov->iov_len - i.iov_offset; + + page_align = (unsigned long)data & ~PAGE_MASK; + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, pos, &len, + 2,/*include a 'startsync' command*/ + CEPH_OSD_OP_WRITE, flags, snapc, + ci->i_truncate_seq, + ci->i_truncate_size, + false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } - /* write from beginning of first page, regardless of io alignment */ - page_align = file->f_flags & O_DIRECT ? buf_align : io_align; - num_pages = calc_pages_for(page_align, len); - if (file->f_flags & O_DIRECT) { + num_pages = calc_pages_for(page_align, len); pages = ceph_get_direct_page_vector(data, num_pages, false); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -577,60 +614,175 @@ more: * may block. */ truncate_inode_pages_range(inode->i_mapping, pos, - (pos+len) | (PAGE_CACHE_SIZE-1)); - } else { + (pos+len) | (PAGE_CACHE_SIZE-1)); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, + false, false); + + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); + + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + + ceph_put_page_vector(pages, num_pages, false); + +out: + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + iov_iter_advance(&i, (size_t)len); + + if (pos > i_size_read(inode)) { + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } + } else + break; + } + + if (ret != -EOLDSNAPC && written > 0) { + iocb->ki_pos = pos; + ret = written; + } + return ret; +} + + +/* + * Synchronous write, straight from __user pointer or user pages. + * + * If write spans object boundary, just do multiple writes. (For a + * correct atomic write, we should e.g. take write locks on all + * objects, rollback on failure, etc.) + */ +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, size_t count) +{ + struct file *file = iocb->ki_filp; + struct inode *inode = file_inode(file); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_snap_context *snapc; + struct ceph_vino vino; + struct ceph_osd_request *req; + struct page **pages; + u64 len; + int num_pages; + int written = 0; + int flags; + int check_caps = 0; + int ret; + struct timespec mtime = CURRENT_TIME; + loff_t pos = iocb->ki_pos; + struct iov_iter i; + + if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) + return -EROFS; + + dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count); + + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); + if (ret < 0) + return ret; + + ret = invalidate_inode_pages2_range(inode->i_mapping, + pos >> PAGE_CACHE_SHIFT, + (pos + count) >> PAGE_CACHE_SHIFT); + if (ret < 0) + dout("invalidate_inode_pages2_range returned %d\n", ret); + + flags = CEPH_OSD_FLAG_ORDERSNAP | + CEPH_OSD_FLAG_ONDISK | + CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ACK; + + iov_iter_init(&i, iov, nr_segs, count, 0); + + while ((len = iov_iter_count(&i)) > 0) { + size_t left; + int n; + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, pos, &len, 1, + CEPH_OSD_OP_WRITE, flags, snapc, + ci->i_truncate_seq, + ci->i_truncate_size, + false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } + + /* + * write from beginning of first page, + * regardless of io alignment + */ + num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; + pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out; } - ret = ceph_copy_user_to_page_vector(pages, data, pos, len); + + left = len; + for (n = 0; n < num_pages; n++) { + size_t plen = min_t(size_t, left, PAGE_SIZE); + ret = iov_iter_copy_from_user(pages[n], &i, 0, plen); + if (ret != plen) { + ret = -EFAULT; + break; + } + left -= ret; + iov_iter_advance(&i, ret); + } + if (ret < 0) { ceph_release_page_vector(pages, num_pages); goto out; } - if ((file->f_flags & O_SYNC) == 0) { - /* get a second commit callback */ - req->r_unsafe_callback = ceph_sync_write_unsafe; - req->r_inode = inode; - own_pages = true; - } - } - osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, - false, own_pages); + /* get a second commit callback */ + req->r_unsafe_callback = ceph_sync_write_unsafe; + req->r_inode = inode; - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ - ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, + false, true); - ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); - if (!ret) - ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); - if (file->f_flags & O_DIRECT) - ceph_put_page_vector(pages, num_pages, false); - else if (file->f_flags & O_SYNC) - ceph_release_page_vector(pages, num_pages); + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); out: - ceph_osdc_put_request(req); - if (ret == 0) { - pos += len; - written += len; - left -= len; - data += len; - if (left) - goto more; + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + + if (pos > i_size_read(inode)) { + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } + } else + break; + } + if (ret != -EOLDSNAPC && written > 0) { ret = written; - *ppos = pos; - if (pos > i_size_read(inode)) - check_caps = ceph_inode_set_size(inode, pos); - if (check_caps) - ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, - NULL); - } else if (ret != -EOLDSNAPC && written > 0) { - ret = written; + iocb->ki_pos = pos; } return ret; } @@ -647,55 +799,84 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov, { struct file *filp = iocb->ki_filp; struct ceph_file_info *fi = filp->private_data; - loff_t *ppos = &iocb->ki_pos; - size_t len = iov->iov_len; + size_t len = iocb->ki_nbytes; struct inode *inode = file_inode(filp); struct ceph_inode_info *ci = ceph_inode(inode); - void __user *base = iov->iov_base; ssize_t ret; int want, got = 0; int checkeof = 0, read = 0; - dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", - inode, ceph_vinop(inode), pos, (unsigned)len, inode); again: + dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", + inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode); + if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; else want = CEPH_CAP_FILE_CACHE; ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); if (ret < 0) - goto out; - dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", - inode, ceph_vinop(inode), pos, (unsigned)len, - ceph_cap_string(got)); + return ret; if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (fi->flags & CEPH_F_SYNC)) + (fi->flags & CEPH_F_SYNC)) { + struct iov_iter i; + + dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n", + inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, + ceph_cap_string(got)); + + if (!read) { + ret = generic_segment_checks(iov, &nr_segs, + &len, VERIFY_WRITE); + if (ret) + goto out; + } + + iov_iter_init(&i, iov, nr_segs, len, read); + /* hmm, this isn't really async... */ - ret = ceph_sync_read(filp, base, len, ppos, &checkeof); - else - ret = generic_file_aio_read(iocb, iov, nr_segs, pos); + ret = ceph_sync_read(iocb, &i, &checkeof); + } else { + /* + * We can't modify the content of iov, + * so we only read from beginning. + */ + if (read) { + iocb->ki_pos = pos; + len = iocb->ki_nbytes; + read = 0; + } + dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", + inode, ceph_vinop(inode), pos, (unsigned)len, + ceph_cap_string(got)); + ret = generic_file_aio_read(iocb, iov, nr_segs, pos); + } out: dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); ceph_put_cap_refs(ci, got); if (checkeof && ret >= 0) { - int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); + int statret = ceph_do_getattr(inode, + CEPH_STAT_CAP_SIZE); /* hit EOF or hole? */ - if (statret == 0 && *ppos < inode->i_size) { - dout("aio_read sync_read hit hole, ppos %lld < size %lld, reading more\n", *ppos, inode->i_size); + if (statret == 0 && iocb->ki_pos < inode->i_size && + ret < len) { + dout("sync_read hit hole, ppos %lld < size %lld" + ", reading more\n", iocb->ki_pos, + inode->i_size); + read += ret; - base += ret; len -= ret; checkeof = 0; goto again; } } + if (ret >= 0) ret += read; @@ -772,11 +953,13 @@ retry_snap: inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || - (iocb->ki_filp->f_flags & O_DIRECT) || - (fi->flags & CEPH_F_SYNC)) { + (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) { mutex_unlock(&inode->i_mutex); - written = ceph_sync_write(file, iov->iov_base, count, - pos, &iocb->ki_pos); + if (file->f_flags & O_DIRECT) + written = ceph_sync_direct_write(iocb, iov, + nr_segs, count); + else + written = ceph_sync_write(iocb, iov, nr_segs, count); if (written == -EOLDSNAPC) { dout("aio_write %p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n", @@ -1018,7 +1201,7 @@ static long ceph_fallocate(struct file *file, int mode, loff_t offset, loff_t length) { struct ceph_file_info *fi = file->private_data; - struct inode *inode = file->f_dentry->d_inode; + struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 278fd28..6fc10a7 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -95,6 +95,7 @@ const struct inode_operations ceph_file_iops = { .getxattr = ceph_getxattr, .listxattr = ceph_listxattr, .removexattr = ceph_removexattr, + .get_acl = ceph_get_acl, }; @@ -335,12 +336,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_hold_caps_min = 0; ci->i_hold_caps_max = 0; INIT_LIST_HEAD(&ci->i_cap_delay_list); - ci->i_cap_exporting_mds = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_issued = 0; INIT_LIST_HEAD(&ci->i_cap_snaps); ci->i_head_snapc = NULL; ci->i_snap_caps = 0; + ci->i_cap_exporting_issued = 0; for (i = 0; i < CEPH_FILE_MODE_NUM; i++) ci->i_nr_by_mode[i] = 0; @@ -436,6 +435,16 @@ void ceph_destroy_inode(struct inode *inode) call_rcu(&inode->i_rcu, ceph_i_callback); } +int ceph_drop_inode(struct inode *inode) +{ + /* + * Positve dentry and corresponding inode are always accompanied + * in MDS reply. So no need to keep inode in the cache after + * dropping all its aliases. + */ + return 1; +} + /* * Helpers to fill in size, ctime, mtime, and atime. We have to be * careful because either the client or MDS may have more up to date @@ -670,6 +679,7 @@ static int fill_inode(struct inode *inode, memcpy(ci->i_xattrs.blob->vec.iov_base, iinfo->xattr_data, iinfo->xattr_len); ci->i_xattrs.version = le64_to_cpu(info->xattr_version); + ceph_forget_all_cached_acls(inode); xattr_blob = NULL; } @@ -1454,7 +1464,8 @@ static void ceph_invalidate_work(struct work_struct *work) dout("invalidate_pages %p gen %d revoking %d\n", inode, ci->i_rdcache_gen, ci->i_rdcache_revoking); if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { - /* nevermind! */ + if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE)) + check = 1; spin_unlock(&ci->i_ceph_lock); mutex_unlock(&ci->i_truncate_mutex); goto out; @@ -1475,13 +1486,14 @@ static void ceph_invalidate_work(struct work_struct *work) dout("invalidate_pages %p gen %d raced, now %d revoking %d\n", inode, orig_gen, ci->i_rdcache_gen, ci->i_rdcache_revoking); + if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE)) + check = 1; } spin_unlock(&ci->i_ceph_lock); mutex_unlock(&ci->i_truncate_mutex); - +out: if (check) ceph_check_caps(ci, 0, NULL); -out: iput(inode); } @@ -1602,6 +1614,7 @@ static const struct inode_operations ceph_symlink_iops = { .getxattr = ceph_getxattr, .listxattr = ceph_listxattr, .removexattr = ceph_removexattr, + .get_acl = ceph_get_acl, }; /* @@ -1675,6 +1688,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) dirtied |= CEPH_CAP_AUTH_EXCL; } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 || attr->ia_mode != inode->i_mode) { + inode->i_mode = attr->ia_mode; req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode); mask |= CEPH_SETATTR_MODE; release |= CEPH_CAP_AUTH_SHARED; @@ -1790,6 +1804,12 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (inode_dirty_flags) __mark_inode_dirty(inode, inode_dirty_flags); + if (ia_valid & ATTR_MODE) { + err = ceph_acl_chmod(dentry, inode); + if (err) + goto out_put; + } + if (mask) { req->r_inode = inode; ihold(inode); @@ -1809,6 +1829,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) return err; out: spin_unlock(&ci->i_ceph_lock); +out_put: ceph_mdsc_put_request(req); return err; } diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 669622f..dc66c9e 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -183,6 +183,8 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->client->osdc; + struct ceph_object_locator oloc; + struct ceph_object_id oid; u64 len = 1, olen; u64 tmp; struct ceph_pg pgid; @@ -211,8 +213,10 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", ceph_ino(inode), dl.object_no); - r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, - ceph_file_layout_pg_pool(ci->i_layout)); + oloc.pool = ceph_file_layout_pg_pool(ci->i_layout); + ceph_oid_set_name(&oid, dl.object_name); + + r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid); if (r < 0) { up_read(&osdc->map_sem); return r; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index d90861f..f4f050a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -63,7 +63,7 @@ static const struct ceph_connection_operations mds_con_ops; */ static int parse_reply_info_in(void **p, void *end, struct ceph_mds_reply_info_in *info, - int features) + u64 features) { int err = -EIO; @@ -98,7 +98,7 @@ bad: */ static int parse_reply_info_trace(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { int err; @@ -145,7 +145,7 @@ out_bad: */ static int parse_reply_info_dir(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { u32 num, i = 0; int err; @@ -217,7 +217,7 @@ out_bad: */ static int parse_reply_info_filelock(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (*p + sizeof(*info->filelock_reply) > end) goto bad; @@ -238,7 +238,7 @@ bad: */ static int parse_reply_info_create(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { if (*p == end) { @@ -262,7 +262,7 @@ bad: */ static int parse_reply_info_extra(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (info->head->op == CEPH_MDS_OP_GETFILELOCK) return parse_reply_info_filelock(p, end, info, features); @@ -280,7 +280,7 @@ static int parse_reply_info_extra(void **p, void *end, */ static int parse_reply_info(struct ceph_msg *msg, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { void *p, *end; u32 len; @@ -713,14 +713,15 @@ static int __choose_mds(struct ceph_mds_client *mdsc, struct dentry *dn = get_nonsnap_parent(parent); inode = dn->d_inode; dout("__choose_mds using nonsnap parent %p\n", inode); - } else if (req->r_dentry->d_inode) { + } else { /* dentry target */ inode = req->r_dentry->d_inode; - } else { - /* dir + name */ - inode = dir; - hash = ceph_dentry_hash(dir, req->r_dentry); - is_hash = true; + if (!inode || mode == USE_AUTH_MDS) { + /* dir + name */ + inode = dir; + hash = ceph_dentry_hash(dir, req->r_dentry); + is_hash = true; + } } } @@ -846,35 +847,56 @@ static int __open_session(struct ceph_mds_client *mdsc, * * called under mdsc->mutex */ +static struct ceph_mds_session * +__open_export_target_session(struct ceph_mds_client *mdsc, int target) +{ + struct ceph_mds_session *session; + + session = __ceph_lookup_mds_session(mdsc, target); + if (!session) { + session = register_session(mdsc, target); + if (IS_ERR(session)) + return session; + } + if (session->s_state == CEPH_MDS_SESSION_NEW || + session->s_state == CEPH_MDS_SESSION_CLOSING) + __open_session(mdsc, session); + + return session; +} + +struct ceph_mds_session * +ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) +{ + struct ceph_mds_session *session; + + dout("open_export_target_session to mds%d\n", target); + + mutex_lock(&mdsc->mutex); + session = __open_export_target_session(mdsc, target); + mutex_unlock(&mdsc->mutex); + + return session; +} + static void __open_export_target_sessions(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_mds_info *mi; struct ceph_mds_session *ts; int i, mds = session->s_mds; - int target; if (mds >= mdsc->mdsmap->m_max_mds) return; + mi = &mdsc->mdsmap->m_info[mds]; dout("open_export_target_sessions for mds%d (%d targets)\n", session->s_mds, mi->num_export_targets); for (i = 0; i < mi->num_export_targets; i++) { - target = mi->export_targets[i]; - ts = __ceph_lookup_mds_session(mdsc, target); - if (!ts) { - ts = register_session(mdsc, target); - if (IS_ERR(ts)) - return; - } - if (session->s_state == CEPH_MDS_SESSION_NEW || - session->s_state == CEPH_MDS_SESSION_CLOSING) - __open_session(mdsc, session); - else - dout(" mds%d target mds%d %p is %s\n", session->s_mds, - i, ts, session_state_name(ts->s_state)); - ceph_put_mds_session(ts); + ts = __open_export_target_session(mdsc, mi->export_targets[i]); + if (!IS_ERR(ts)) + ceph_put_mds_session(ts); } } @@ -1136,6 +1158,21 @@ static int send_renew_caps(struct ceph_mds_client *mdsc, return 0; } +static int send_flushmsg_ack(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, u64 seq) +{ + struct ceph_msg *msg; + + dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", + session->s_mds, session_state_name(session->s_state), seq); + msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); + if (!msg) + return -ENOMEM; + ceph_con_send(&session->s_con, msg); + return 0; +} + + /* * Note new cap ttl, and any transition from stale -> not stale (fresh?). * @@ -1214,7 +1251,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) { struct ceph_mds_session *session = arg; struct ceph_inode_info *ci = ceph_inode(inode); - int used, oissued, mine; + int used, wanted, oissued, mine; if (session->s_trim_caps <= 0) return -1; @@ -1222,14 +1259,19 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) spin_lock(&ci->i_ceph_lock); mine = cap->issued | cap->implemented; used = __ceph_caps_used(ci); + wanted = __ceph_caps_file_wanted(ci); oissued = __ceph_caps_issued_other(ci, cap); - dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", + dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), - ceph_cap_string(used)); - if (ci->i_dirty_caps) - goto out; /* dirty caps */ - if ((used & ~oissued) & mine) + ceph_cap_string(used), ceph_cap_string(wanted)); + if (cap == ci->i_auth_cap) { + if (ci->i_dirty_caps | ci->i_flushing_caps) + goto out; + if ((used | wanted) & CEPH_CAP_ANY_WR) + goto out; + } + if ((used | wanted) & ~oissued & mine) goto out; /* we need these caps */ session->s_trim_caps--; @@ -2156,26 +2198,16 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) */ if (result == -ESTALE) { dout("got ESTALE on request %llu", req->r_tid); - if (!req->r_inode) { - /* do nothing; not an authority problem */ - } else if (req->r_direct_mode != USE_AUTH_MDS) { + if (req->r_direct_mode != USE_AUTH_MDS) { dout("not using auth, setting for that now"); req->r_direct_mode = USE_AUTH_MDS; __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; } else { - struct ceph_inode_info *ci = ceph_inode(req->r_inode); - struct ceph_cap *cap = NULL; - - if (req->r_session) - cap = ceph_get_cap_for_mds(ci, - req->r_session->s_mds); - - dout("already using auth"); - if ((!cap || cap != ci->i_auth_cap) || - (cap->mseq != req->r_sent_on_mseq)) { - dout("but cap changed, so resending"); + int mds = __choose_mds(mdsc, req); + if (mds >= 0 && mds != req->r_session->s_mds) { + dout("but auth changed, so resending"); __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; @@ -2400,6 +2432,10 @@ static void handle_session(struct ceph_mds_session *session, trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); break; + case CEPH_SESSION_FLUSHMSG: + send_flushmsg_ack(mdsc, session, seq); + break; + default: pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); WARN_ON(1); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 4c053d0..6828891 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -383,6 +383,8 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg); +extern struct ceph_mds_session * +ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target); extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c index 89fa4a9..4440f44 100644 --- a/fs/ceph/strings.c +++ b/fs/ceph/strings.c @@ -41,6 +41,8 @@ const char *ceph_session_op_name(int op) case CEPH_SESSION_RENEWCAPS: return "renewcaps"; case CEPH_SESSION_STALE: return "stale"; case CEPH_SESSION_RECALL_STATE: return "recall_state"; + case CEPH_SESSION_FLUSHMSG: return "flushmsg"; + case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack"; } return "???"; } diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 6a0951e..2df963f 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -490,10 +490,10 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, struct ceph_options *opt) { struct ceph_fs_client *fsc; - const unsigned supported_features = + const u64 supported_features = CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH; - const unsigned required_features = 0; + const u64 required_features = 0; int page_count; size_t size; int err = -ENOMEM; @@ -686,6 +686,7 @@ static const struct super_operations ceph_super_ops = { .alloc_inode = ceph_alloc_inode, .destroy_inode = ceph_destroy_inode, .write_inode = ceph_write_inode, + .drop_inode = ceph_drop_inode, .sync_fs = ceph_sync_fs, .put_super = ceph_put_super, .show_options = ceph_show_options, @@ -818,7 +819,11 @@ static int ceph_set_super(struct super_block *s, void *data) s->s_flags = fsc->mount_options->sb_flags; s->s_maxbytes = 1ULL << 40; /* temp value until we get mdsmap */ +#ifdef CONFIG_CEPH_FS_POSIX_ACL + s->s_flags |= MS_POSIXACL; +#endif + s->s_xattr = ceph_xattr_handlers; s->s_fs_info = fsc; fsc->sb = s; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index ef4ac38..c299f7d 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -287,14 +287,12 @@ struct ceph_inode_info { unsigned long i_hold_caps_min; /* jiffies */ unsigned long i_hold_caps_max; /* jiffies */ struct list_head i_cap_delay_list; /* for delayed cap release to mds */ - int i_cap_exporting_mds; /* to handle cap migration between */ - unsigned i_cap_exporting_mseq; /* mds's. */ - unsigned i_cap_exporting_issued; struct ceph_cap_reservation i_cap_migration_resv; struct list_head i_cap_snaps; /* snapped state pending flush to mds */ struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or dirty|flushing caps */ unsigned i_snap_caps; /* cap bits for snapped files */ + unsigned i_cap_exporting_issued; int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ @@ -335,7 +333,6 @@ struct ceph_inode_info { u32 i_fscache_gen; /* sequence, for delayed fscache validate */ struct work_struct i_revalidate_work; #endif - struct inode vfs_inode; /* at end */ }; @@ -529,6 +526,8 @@ static inline int __ceph_caps_dirty(struct ceph_inode_info *ci) } extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask); +extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci, + struct ceph_cap *ocap, int mask); extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask); extern int __ceph_caps_used(struct ceph_inode_info *ci); @@ -691,6 +690,7 @@ extern const struct inode_operations ceph_file_iops; extern struct inode *ceph_alloc_inode(struct super_block *sb); extern void ceph_destroy_inode(struct inode *inode); +extern int ceph_drop_inode(struct inode *inode); extern struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino); @@ -724,6 +724,9 @@ extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, /* xattr.c */ extern int ceph_setxattr(struct dentry *, const char *, const void *, size_t, int); +int __ceph_setxattr(struct dentry *, const char *, const void *, size_t, int); +ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t); +int __ceph_removexattr(struct dentry *, const char *); extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t); extern ssize_t ceph_listxattr(struct dentry *, char *, size_t); extern int ceph_removexattr(struct dentry *, const char *); @@ -732,6 +735,39 @@ extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci); extern void __init ceph_xattr_init(void); extern void ceph_xattr_exit(void); +/* acl.c */ +extern const struct xattr_handler ceph_xattr_acl_access_handler; +extern const struct xattr_handler ceph_xattr_acl_default_handler; +extern const struct xattr_handler *ceph_xattr_handlers[]; + +#ifdef CONFIG_CEPH_FS_POSIX_ACL + +struct posix_acl *ceph_get_acl(struct inode *, int); +int ceph_init_acl(struct dentry *, struct inode *, struct inode *); +int ceph_acl_chmod(struct dentry *, struct inode *); +void ceph_forget_all_cached_acls(struct inode *inode); + +#else + +#define ceph_get_acl NULL + +static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode, + struct inode *dir) +{ + return 0; +} + +static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) +{ + return 0; +} + +static inline void ceph_forget_all_cached_acls(struct inode *inode) +{ +} + +#endif + /* caps.c */ extern const char *ceph_cap_string(int c); extern void ceph_handle_caps(struct ceph_mds_session *session, @@ -744,6 +780,7 @@ extern int ceph_add_cap(struct inode *inode, extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); extern void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap); +extern int ceph_is_any_caps(struct inode *inode); extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino, u64 cap_id, u32 migrate_seq, u32 issue_seq); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index be661d8..c7581f37 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -11,11 +11,24 @@ #define XATTR_CEPH_PREFIX "ceph." #define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1) +/* + * List of handlers for synthetic system.* attributes. Other + * attributes are handled directly. + */ +const struct xattr_handler *ceph_xattr_handlers[] = { +#ifdef CONFIG_CEPH_FS_POSIX_ACL + &ceph_xattr_acl_access_handler, + &ceph_xattr_acl_default_handler, +#endif + NULL, +}; + static bool ceph_is_valid_xattr(const char *name) { return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) || !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN) || + !strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN) || !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) || !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN); } @@ -663,10 +676,9 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci) } } -ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, +ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, size_t size) { - struct inode *inode = dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); int err; struct ceph_inode_xattr *xattr; @@ -675,7 +687,6 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, if (!ceph_is_valid_xattr(name)) return -ENODATA; - /* let's see if a virtual xattr was requested */ vxattr = ceph_match_vxattr(inode, name); if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { @@ -725,6 +736,15 @@ out: return err; } +ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, + size_t size) +{ + if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) + return generic_getxattr(dentry, name, value, size); + + return __ceph_getxattr(dentry->d_inode, name, value, size); +} + ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) { struct inode *inode = dentry->d_inode; @@ -863,8 +883,8 @@ out: return err; } -int ceph_setxattr(struct dentry *dentry, const char *name, - const void *value, size_t size, int flags) +int __ceph_setxattr(struct dentry *dentry, const char *name, + const void *value, size_t size, int flags) { struct inode *inode = dentry->d_inode; struct ceph_vxattr *vxattr; @@ -879,9 +899,6 @@ int ceph_setxattr(struct dentry *dentry, const char *name, struct ceph_inode_xattr *xattr = NULL; int required_blob_size; - if (ceph_snap(inode) != CEPH_NOSNAP) - return -EROFS; - if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -958,6 +975,18 @@ out: return err; } +int ceph_setxattr(struct dentry *dentry, const char *name, + const void *value, size_t size, int flags) +{ + if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP) + return -EROFS; + + if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) + return generic_setxattr(dentry, name, value, size, flags); + + return __ceph_setxattr(dentry, name, value, size, flags); +} + static int ceph_send_removexattr(struct dentry *dentry, const char *name) { struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); @@ -984,7 +1013,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name) return err; } -int ceph_removexattr(struct dentry *dentry, const char *name) +int __ceph_removexattr(struct dentry *dentry, const char *name) { struct inode *inode = dentry->d_inode; struct ceph_vxattr *vxattr; @@ -994,9 +1023,6 @@ int ceph_removexattr(struct dentry *dentry, const char *name) int required_blob_size; int dirty; - if (ceph_snap(inode) != CEPH_NOSNAP) - return -EROFS; - if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -1053,3 +1079,13 @@ out: return err; } +int ceph_removexattr(struct dentry *dentry, const char *name) +{ + if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP) + return -EROFS; + + if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) + return generic_removexattr(dentry, name); + + return __ceph_removexattr(dentry, name); +} diff --git a/include/linux/ceph/buffer.h b/include/linux/ceph/buffer.h index 58d1901..07ad423 100644 --- a/include/linux/ceph/buffer.h +++ b/include/linux/ceph/buffer.h @@ -17,7 +17,6 @@ struct ceph_buffer { struct kref kref; struct kvec vec; size_t alloc_len; - bool is_vmalloc; }; extern struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp); diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index 4c420803..138448f 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -4,42 +4,73 @@ /* * feature bits */ -#define CEPH_FEATURE_UID (1<<0) -#define CEPH_FEATURE_NOSRCADDR (1<<1) -#define CEPH_FEATURE_MONCLOCKCHECK (1<<2) -#define CEPH_FEATURE_FLOCK (1<<3) -#define CEPH_FEATURE_SUBSCRIBE2 (1<<4) -#define CEPH_FEATURE_MONNAMES (1<<5) -#define CEPH_FEATURE_RECONNECT_SEQ (1<<6) -#define CEPH_FEATURE_DIRLAYOUTHASH (1<<7) -#define CEPH_FEATURE_OBJECTLOCATOR (1<<8) -#define CEPH_FEATURE_PGID64 (1<<9) -#define CEPH_FEATURE_INCSUBOSDMAP (1<<10) -#define CEPH_FEATURE_PGPOOL3 (1<<11) -#define CEPH_FEATURE_OSDREPLYMUX (1<<12) -#define CEPH_FEATURE_OSDENC (1<<13) -#define CEPH_FEATURE_OMAP (1<<14) -#define CEPH_FEATURE_MONENC (1<<15) -#define CEPH_FEATURE_QUERY_T (1<<16) -#define CEPH_FEATURE_INDEP_PG_MAP (1<<17) -#define CEPH_FEATURE_CRUSH_TUNABLES (1<<18) -#define CEPH_FEATURE_CHUNKY_SCRUB (1<<19) -#define CEPH_FEATURE_MON_NULLROUTE (1<<20) -#define CEPH_FEATURE_MON_GV (1<<21) -#define CEPH_FEATURE_BACKFILL_RESERVATION (1<<22) -#define CEPH_FEATURE_MSG_AUTH (1<<23) -#define CEPH_FEATURE_RECOVERY_RESERVATION (1<<24) -#define CEPH_FEATURE_CRUSH_TUNABLES2 (1<<25) -#define CEPH_FEATURE_CREATEPOOLID (1<<26) -#define CEPH_FEATURE_REPLY_CREATE_INODE (1<<27) -#define CEPH_FEATURE_OSD_HBMSGS (1<<28) -#define CEPH_FEATURE_MDSENC (1<<29) -#define CEPH_FEATURE_OSDHASHPSPOOL (1<<30) +#define CEPH_FEATURE_UID (1ULL<<0) +#define CEPH_FEATURE_NOSRCADDR (1ULL<<1) +#define CEPH_FEATURE_MONCLOCKCHECK (1ULL<<2) +#define CEPH_FEATURE_FLOCK (1ULL<<3) +#define CEPH_FEATURE_SUBSCRIBE2 (1ULL<<4) +#define CEPH_FEATURE_MONNAMES (1ULL<<5) +#define CEPH_FEATURE_RECONNECT_SEQ (1ULL<<6) +#define CEPH_FEATURE_DIRLAYOUTHASH (1ULL<<7) +#define CEPH_FEATURE_OBJECTLOCATOR (1ULL<<8) +#define CEPH_FEATURE_PGID64 (1ULL<<9) +#define CEPH_FEATURE_INCSUBOSDMAP (1ULL<<10) +#define CEPH_FEATURE_PGPOOL3 (1ULL<<11) +#define CEPH_FEATURE_OSDREPLYMUX (1ULL<<12) +#define CEPH_FEATURE_OSDENC (1ULL<<13) +#define CEPH_FEATURE_OMAP (1ULL<<14) +#define CEPH_FEATURE_MONENC (1ULL<<15) +#define CEPH_FEATURE_QUERY_T (1ULL<<16) +#define CEPH_FEATURE_INDEP_PG_MAP (1ULL<<17) +#define CEPH_FEATURE_CRUSH_TUNABLES (1ULL<<18) +#define CEPH_FEATURE_CHUNKY_SCRUB (1ULL<<19) +#define CEPH_FEATURE_MON_NULLROUTE (1ULL<<20) +#define CEPH_FEATURE_MON_GV (1ULL<<21) +#define CEPH_FEATURE_BACKFILL_RESERVATION (1ULL<<22) +#define CEPH_FEATURE_MSG_AUTH (1ULL<<23) +#define CEPH_FEATURE_RECOVERY_RESERVATION (1ULL<<24) +#define CEPH_FEATURE_CRUSH_TUNABLES2 (1ULL<<25) +#define CEPH_FEATURE_CREATEPOOLID (1ULL<<26) +#define CEPH_FEATURE_REPLY_CREATE_INODE (1ULL<<27) +#define CEPH_FEATURE_OSD_HBMSGS (1ULL<<28) +#define CEPH_FEATURE_MDSENC (1ULL<<29) +#define CEPH_FEATURE_OSDHASHPSPOOL (1ULL<<30) +#define CEPH_FEATURE_MON_SINGLE_PAXOS (1ULL<<31) +#define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32) +#define CEPH_FEATURE_MON_SCRUB (1ULL<<33) +#define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<34) +#define CEPH_FEATURE_OSD_CACHEPOOL (1ULL<<35) +#define CEPH_FEATURE_CRUSH_V2 (1ULL<<36) /* new indep; SET_* steps */ +#define CEPH_FEATURE_EXPORT_PEER (1ULL<<37) +#define CEPH_FEATURE_OSD_ERASURE_CODES (1ULL<<38) + +/* + * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature + * vector to evaluate to 64 bit ~0. To cope, we designate 1ULL << 63 + * to mean 33 bit ~0, and introduce a helper below to do the + * translation. + * + * This was introduced by ceph.git commit + * 9ea02b84104045c2ffd7e7f4e7af512953855ecd v0.58-657-g9ea02b8 + * and fixed by ceph.git commit + * 4255b5c2fb54ae40c53284b3ab700fdfc7e61748 v0.65-263-g4255b5c + */ +#define CEPH_FEATURE_RESERVED (1ULL<<63) + +static inline u64 ceph_sanitize_features(u64 features) +{ + if (features & CEPH_FEATURE_RESERVED) { + /* everything through OSD_SNAPMAPPER */ + return 0x1ffffffffull; + } else { + return features; + } +} /* * Features supported. */ -#define CEPH_FEATURES_SUPPORTED_DEFAULT \ +#define CEPH_FEATURES_SUPPORTED_DEFAULT \ (CEPH_FEATURE_NOSRCADDR | \ CEPH_FEATURE_RECONNECT_SEQ | \ CEPH_FEATURE_PGID64 | \ @@ -48,7 +79,10 @@ CEPH_FEATURE_CRUSH_TUNABLES | \ CEPH_FEATURE_CRUSH_TUNABLES2 | \ CEPH_FEATURE_REPLY_CREATE_INODE | \ - CEPH_FEATURE_OSDHASHPSPOOL) + CEPH_FEATURE_OSDHASHPSPOOL | \ + CEPH_FEATURE_OSD_CACHEPOOL | \ + CEPH_FEATURE_CRUSH_V2 | \ + CEPH_FEATURE_EXPORT_PEER) #define CEPH_FEATURES_REQUIRED_DEFAULT \ (CEPH_FEATURE_NOSRCADDR | \ @@ -56,4 +90,5 @@ CEPH_FEATURE_PGID64 | \ CEPH_FEATURE_PGPOOL3 | \ CEPH_FEATURE_OSDENC) + #endif diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 2ad7b86..2623cff 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -53,6 +53,29 @@ struct ceph_file_layout { __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ } __attribute__ ((packed)); +#define ceph_file_layout_su(l) ((__s32)le32_to_cpu((l).fl_stripe_unit)) +#define ceph_file_layout_stripe_count(l) \ + ((__s32)le32_to_cpu((l).fl_stripe_count)) +#define ceph_file_layout_object_size(l) ((__s32)le32_to_cpu((l).fl_object_size)) +#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash)) +#define ceph_file_layout_object_su(l) \ + ((__s32)le32_to_cpu((l).fl_object_stripe_unit)) +#define ceph_file_layout_pg_pool(l) \ + ((__s32)le32_to_cpu((l).fl_pg_pool)) + +static inline unsigned ceph_file_layout_stripe_width(struct ceph_file_layout *l) +{ + return le32_to_cpu(l->fl_stripe_unit) * + le32_to_cpu(l->fl_stripe_count); +} + +/* "period" == bytes before i start on a new set of objects */ +static inline unsigned ceph_file_layout_period(struct ceph_file_layout *l) +{ + return le32_to_cpu(l->fl_object_size) * + le32_to_cpu(l->fl_stripe_count); +} + #define CEPH_MIN_STRIPE_UNIT 65536 int ceph_file_layout_is_valid(const struct ceph_file_layout *layout); @@ -282,6 +305,8 @@ enum { CEPH_SESSION_RENEWCAPS, CEPH_SESSION_STALE, CEPH_SESSION_RECALL_STATE, + CEPH_SESSION_FLUSHMSG, + CEPH_SESSION_FLUSHMSG_ACK, }; extern const char *ceph_session_op_name(int op); @@ -457,7 +482,8 @@ struct ceph_mds_reply_cap { __u8 flags; /* CEPH_CAP_FLAG_* */ } __attribute__ ((packed)); -#define CEPH_CAP_FLAG_AUTH 1 /* cap is issued by auth mds */ +#define CEPH_CAP_FLAG_AUTH (1 << 0) /* cap is issued by auth mds */ +#define CEPH_CAP_FLAG_RELEASE (1 << 1) /* release the cap */ /* inode record, for bundling with mds reply */ struct ceph_mds_reply_inode { @@ -658,6 +684,14 @@ struct ceph_mds_caps { __le32 time_warp_seq; } __attribute__ ((packed)); +struct ceph_mds_cap_peer { + __le64 cap_id; + __le32 seq; + __le32 mseq; + __le32 mds; + __u8 flags; +} __attribute__ ((packed)); + /* cap release msg head */ struct ceph_mds_cap_release { __le32 num; /* number of cap_items that follow */ diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 2e30248..2f49aa4 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -122,8 +122,8 @@ struct ceph_client { int (*extra_mon_dispatch)(struct ceph_client *, struct ceph_msg *); - u32 supported_features; - u32 required_features; + u64 supported_features; + u64 required_features; struct ceph_messenger msgr; /* messenger instance */ struct ceph_mon_client monc; @@ -173,15 +173,18 @@ static inline int calc_pages_for(u64 off, u64 len) (off >> PAGE_CACHE_SHIFT); } +extern struct kmem_cache *ceph_inode_cachep; +extern struct kmem_cache *ceph_cap_cachep; +extern struct kmem_cache *ceph_dentry_cachep; +extern struct kmem_cache *ceph_file_cachep; + /* ceph_common.c */ extern bool libceph_compatible(void *data); extern const char *ceph_msg_type_name(int type); extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid); -extern struct kmem_cache *ceph_inode_cachep; -extern struct kmem_cache *ceph_cap_cachep; -extern struct kmem_cache *ceph_dentry_cachep; -extern struct kmem_cache *ceph_file_cachep; +extern void *ceph_kvmalloc(size_t size, gfp_t flags); +extern void ceph_kvfree(const void *ptr); extern struct ceph_options *ceph_parse_options(char *options, const char *dev_name, const char *dev_name_end, @@ -192,8 +195,8 @@ extern int ceph_compare_options(struct ceph_options *new_opt, struct ceph_client *client); extern struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, - unsigned supported_features, - unsigned required_features); + u64 supported_features, + u64 required_features); extern u64 ceph_client_id(struct ceph_client *client); extern void ceph_destroy_client(struct ceph_client *client); extern int __ceph_open_session(struct ceph_client *client, diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 7c1420b..20ee8b6 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -60,8 +60,8 @@ struct ceph_messenger { u32 global_seq; spinlock_t global_seq_lock; - u32 supported_features; - u32 required_features; + u64 supported_features; + u64 required_features; }; enum ceph_msg_data_type { @@ -154,10 +154,9 @@ struct ceph_msg { struct list_head list_head; /* links for connection lists */ struct kref kref; - bool front_is_vmalloc; bool more_to_follow; bool needs_out_seq; - int front_max; + int front_alloc_len; unsigned long ack_stamp; /* tx: when we were acked */ struct ceph_msgpool *pool; @@ -192,7 +191,7 @@ struct ceph_connection { struct ceph_entity_name peer_name; /* peer name */ - unsigned peer_features; + u64 peer_features; u32 connect_seq; /* identify the most recent connection attempt for this connection, client */ u32 peer_global_seq; /* peer's global seq for this connection */ @@ -256,8 +255,8 @@ extern void ceph_msgr_flush(void); extern void ceph_messenger_init(struct ceph_messenger *msgr, struct ceph_entity_addr *myaddr, - u32 supported_features, - u32 required_features, + u64 supported_features, + u64 required_features, bool nocrc); extern void ceph_con_init(struct ceph_connection *con, void *private, diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 8f47625..fd47e87 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -12,12 +12,6 @@ #include <linux/ceph/auth.h> #include <linux/ceph/pagelist.h> -/* - * Maximum object name size - * (must be at least as big as RBD_MAX_MD_NAME_LEN -- currently 100) - */ -#define MAX_OBJ_NAME_SIZE 100 - struct ceph_msg; struct ceph_snap_context; struct ceph_osd_request; @@ -138,6 +132,7 @@ struct ceph_osd_request { __le64 *r_request_pool; void *r_request_pgid; __le32 *r_request_attempts; + bool r_paused; struct ceph_eversion *r_request_reassert_version; int r_result; @@ -158,15 +153,21 @@ struct ceph_osd_request { struct inode *r_inode; /* for use by callbacks */ void *r_priv; /* ditto */ - char r_oid[MAX_OBJ_NAME_SIZE]; /* object name */ - int r_oid_len; + struct ceph_object_locator r_base_oloc; + struct ceph_object_id r_base_oid; + struct ceph_object_locator r_target_oloc; + struct ceph_object_id r_target_oid; + u64 r_snapid; unsigned long r_stamp; /* send OR check time */ - struct ceph_file_layout r_file_layout; struct ceph_snap_context *r_snapc; /* snap context for writes */ }; +struct ceph_request_redirect { + struct ceph_object_locator oloc; +}; + struct ceph_osd_event { u64 cookie; int one_shot; diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index d05cc44..49ff69f 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -35,13 +35,26 @@ struct ceph_pg_pool_info { u8 object_hash; u32 pg_num, pgp_num; int pg_num_mask, pgp_num_mask; + s64 read_tier; + s64 write_tier; /* wins for read+write ops */ u64 flags; char *name; }; struct ceph_object_locator { - uint64_t pool; - char *key; + s64 pool; +}; + +/* + * Maximum supported by kernel client object name length + * + * (probably outdated: must be >= RBD_MAX_MD_NAME_LEN -- currently 100) + */ +#define CEPH_MAX_OID_NAME_LEN 100 + +struct ceph_object_id { + char name[CEPH_MAX_OID_NAME_LEN]; + int name_len; }; struct ceph_pg_mapping { @@ -73,33 +86,30 @@ struct ceph_osdmap { struct crush_map *crush; }; -/* - * file layout helpers - */ -#define ceph_file_layout_su(l) ((__s32)le32_to_cpu((l).fl_stripe_unit)) -#define ceph_file_layout_stripe_count(l) \ - ((__s32)le32_to_cpu((l).fl_stripe_count)) -#define ceph_file_layout_object_size(l) ((__s32)le32_to_cpu((l).fl_object_size)) -#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash)) -#define ceph_file_layout_object_su(l) \ - ((__s32)le32_to_cpu((l).fl_object_stripe_unit)) -#define ceph_file_layout_pg_pool(l) \ - ((__s32)le32_to_cpu((l).fl_pg_pool)) - -static inline unsigned ceph_file_layout_stripe_width(struct ceph_file_layout *l) +static inline void ceph_oid_set_name(struct ceph_object_id *oid, + const char *name) { - return le32_to_cpu(l->fl_stripe_unit) * - le32_to_cpu(l->fl_stripe_count); + int len; + + len = strlen(name); + if (len > sizeof(oid->name)) { + WARN(1, "ceph_oid_set_name '%s' len %d vs %zu, truncating\n", + name, len, sizeof(oid->name)); + len = sizeof(oid->name); + } + + memcpy(oid->name, name, len); + oid->name_len = len; } -/* "period" == bytes before i start on a new set of objects */ -static inline unsigned ceph_file_layout_period(struct ceph_file_layout *l) +static inline void ceph_oid_copy(struct ceph_object_id *dest, + struct ceph_object_id *src) { - return le32_to_cpu(l->fl_object_size) * - le32_to_cpu(l->fl_stripe_count); + BUG_ON(src->name_len > sizeof(dest->name)); + memcpy(dest->name, src->name, src->name_len); + dest->name_len = src->name_len; } - static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd) { return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP); @@ -155,14 +165,20 @@ extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, u64 *bno, u64 *oxoff, u64 *oxlen); /* calculate mapping of object to a placement group */ -extern int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, - struct ceph_osdmap *osdmap, uint64_t pool); +extern int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, + struct ceph_object_locator *oloc, + struct ceph_object_id *oid, + struct ceph_pg *pg_out); + extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, int *acting); extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid); +extern struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, + u64 id); + extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id); extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name); diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 68c96a5..96292df 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -344,6 +344,10 @@ enum { CEPH_OSD_FLAG_EXEC_PUBLIC = 0x1000, /* DEPRECATED op may exec (public) */ CEPH_OSD_FLAG_LOCALIZE_READS = 0x2000, /* read from nearby replica, if any */ CEPH_OSD_FLAG_RWORDERED = 0x4000, /* order wrt concurrent reads */ + CEPH_OSD_FLAG_IGNORE_CACHE = 0x8000, /* ignore cache logic */ + CEPH_OSD_FLAG_SKIPRWLOCKS = 0x10000, /* skip rw locks */ + CEPH_OSD_FLAG_IGNORE_OVERLAY = 0x20000, /* ignore pool overlay */ + CEPH_OSD_FLAG_FLUSH = 0x40000, /* this is part of flush */ }; enum { diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 6a1101f..acaa561 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -19,11 +19,12 @@ #define CRUSH_MAGIC 0x00010000ul /* for detecting algorithm revisions */ - #define CRUSH_MAX_DEPTH 10 /* max crush hierarchy depth */ -#define CRUSH_MAX_SET 10 /* max size of a mapping result */ +#define CRUSH_ITEM_UNDEF 0x7ffffffe /* undefined result (internal use only) */ +#define CRUSH_ITEM_NONE 0x7fffffff /* no result */ + /* * CRUSH uses user-defined "rules" to describe how inputs should be * mapped to devices. A rule consists of sequence of steps to perform @@ -43,8 +44,13 @@ enum { /* arg2 = type */ CRUSH_RULE_CHOOSE_INDEP = 3, /* same */ CRUSH_RULE_EMIT = 4, /* no args */ - CRUSH_RULE_CHOOSE_LEAF_FIRSTN = 6, - CRUSH_RULE_CHOOSE_LEAF_INDEP = 7, + CRUSH_RULE_CHOOSELEAF_FIRSTN = 6, + CRUSH_RULE_CHOOSELEAF_INDEP = 7, + + CRUSH_RULE_SET_CHOOSE_TRIES = 8, /* override choose_total_tries */ + CRUSH_RULE_SET_CHOOSELEAF_TRIES = 9, /* override chooseleaf_descend_once */ + CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES = 10, + CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES = 11, }; /* @@ -162,7 +168,10 @@ struct crush_map { __u32 choose_local_fallback_tries; /* choose attempts before giving up */ __u32 choose_total_tries; - /* attempt chooseleaf inner descent once; on failure retry outer descent */ + /* attempt chooseleaf inner descent once for firstn mode; on + * reject retry outer descent. Note that this does *not* + * apply to a collision: in that case we will retry as we used + * to. */ __u32 chooseleaf_descend_once; }; @@ -174,6 +183,7 @@ extern void crush_destroy_bucket_list(struct crush_bucket_list *b); extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b); extern void crush_destroy_bucket_straw(struct crush_bucket_straw *b); extern void crush_destroy_bucket(struct crush_bucket *b); +extern void crush_destroy_rule(struct crush_rule *r); extern void crush_destroy(struct crush_map *map); static inline int crush_calc_tree_node(int i) diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h index 5772dee..eab3674 100644 --- a/include/linux/crush/mapper.h +++ b/include/linux/crush/mapper.h @@ -14,6 +14,7 @@ extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, i extern int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - const __u32 *weights); + const __u32 *weights, int weight_max, + int *scratch); #endif diff --git a/net/ceph/buffer.c b/net/ceph/buffer.c index bf3e6a1..621b5f6 100644 --- a/net/ceph/buffer.c +++ b/net/ceph/buffer.c @@ -6,6 +6,7 @@ #include <linux/ceph/buffer.h> #include <linux/ceph/decode.h> +#include <linux/ceph/libceph.h> /* for ceph_kv{malloc,free} */ struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp) { @@ -15,16 +16,10 @@ struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp) if (!b) return NULL; - b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN); - if (b->vec.iov_base) { - b->is_vmalloc = false; - } else { - b->vec.iov_base = __vmalloc(len, gfp | __GFP_HIGHMEM, PAGE_KERNEL); - if (!b->vec.iov_base) { - kfree(b); - return NULL; - } - b->is_vmalloc = true; + b->vec.iov_base = ceph_kvmalloc(len, gfp); + if (!b->vec.iov_base) { + kfree(b); + return NULL; } kref_init(&b->kref); @@ -40,12 +35,7 @@ void ceph_buffer_release(struct kref *kref) struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref); dout("buffer_release %p\n", b); - if (b->vec.iov_base) { - if (b->is_vmalloc) - vfree(b->vec.iov_base); - else - kfree(b->vec.iov_base); - } + ceph_kvfree(b->vec.iov_base); kfree(b); } EXPORT_SYMBOL(ceph_buffer_release); diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 34b11ee..67d7721 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -15,6 +15,7 @@ #include <linux/slab.h> #include <linux/statfs.h> #include <linux/string.h> +#include <linux/vmalloc.h> #include <linux/nsproxy.h> #include <net/net_namespace.h> @@ -170,6 +171,25 @@ int ceph_compare_options(struct ceph_options *new_opt, } EXPORT_SYMBOL(ceph_compare_options); +void *ceph_kvmalloc(size_t size, gfp_t flags) +{ + if (size <= (PAGE_SIZE << PAGE_ALLOC_COSTLY_ORDER)) { + void *ptr = kmalloc(size, flags | __GFP_NOWARN); + if (ptr) + return ptr; + } + + return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL); +} + +void ceph_kvfree(const void *ptr) +{ + if (is_vmalloc_addr(ptr)) + vfree(ptr); + else + kfree(ptr); +} + static int parse_fsid(const char *str, struct ceph_fsid *fsid) { @@ -461,8 +481,8 @@ EXPORT_SYMBOL(ceph_client_id); * create a fresh client instance */ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, - unsigned int supported_features, - unsigned int required_features) + u64 supported_features, + u64 required_features) { struct ceph_client *client; struct ceph_entity_addr *myaddr = NULL; diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index 0896132..16bc199d 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -116,11 +116,14 @@ void crush_destroy(struct crush_map *map) if (map->rules) { __u32 b; for (b = 0; b < map->max_rules; b++) - kfree(map->rules[b]); + crush_destroy_rule(map->rules[b]); kfree(map->rules); } kfree(map); } - +void crush_destroy_rule(struct crush_rule *rule) +{ + kfree(rule); +} diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index cbd06a9..b703790 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -189,7 +189,7 @@ static int terminal(int x) static int bucket_tree_choose(struct crush_bucket_tree *bucket, int x, int r) { - int n, l; + int n; __u32 w; __u64 t; @@ -197,6 +197,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket, n = bucket->num_nodes >> 1; while (!terminal(n)) { + int l; /* pick point in [0, w) */ w = bucket->node_weights[n]; t = (__u64)crush_hash32_4(bucket->h.hash, x, n, r, @@ -264,8 +265,12 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r) * true if device is marked "out" (failed, fully offloaded) * of the cluster */ -static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x) +static int is_out(const struct crush_map *map, + const __u32 *weight, int weight_max, + int item, int x) { + if (item >= weight_max) + return 1; if (weight[item] >= 0x10000) return 0; if (weight[item] == 0) @@ -277,7 +282,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in } /** - * crush_choose - choose numrep distinct items of given type + * crush_choose_firstn - choose numrep distinct items of given type * @map: the crush_map * @bucket: the bucket we are choose an item from * @x: crush input value @@ -285,18 +290,24 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in * @type: the type of item to choose * @out: pointer to output vector * @outpos: our position in that vector - * @firstn: true if choosing "first n" items, false if choosing "indep" - * @recurse_to_leaf: true if we want one device under each item of given type - * @descend_once: true if we should only try one descent before giving up + * @tries: number of attempts to make + * @recurse_tries: number of attempts to have recursive chooseleaf make + * @local_tries: localized retries + * @local_fallback_tries: localized fallback retries + * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose) * @out2: second output vector for leaf items (if @recurse_to_leaf) */ -static int crush_choose(const struct crush_map *map, - struct crush_bucket *bucket, - const __u32 *weight, - int x, int numrep, int type, - int *out, int outpos, - int firstn, int recurse_to_leaf, - int descend_once, int *out2) +static int crush_choose_firstn(const struct crush_map *map, + struct crush_bucket *bucket, + const __u32 *weight, int weight_max, + int x, int numrep, int type, + int *out, int outpos, + unsigned int tries, + unsigned int recurse_tries, + unsigned int local_tries, + unsigned int local_fallback_tries, + int recurse_to_leaf, + int *out2) { int rep; unsigned int ftotal, flocal; @@ -325,35 +336,17 @@ static int crush_choose(const struct crush_map *map, collide = 0; retry_bucket = 0; r = rep; - if (in->alg == CRUSH_BUCKET_UNIFORM) { - /* be careful */ - if (firstn || (__u32)numrep >= in->size) - /* r' = r + f_total */ - r += ftotal; - else if (in->size % numrep == 0) - /* r'=r+(n+1)*f_local */ - r += (numrep+1) * - (flocal+ftotal); - else - /* r' = r + n*f_local */ - r += numrep * (flocal+ftotal); - } else { - if (firstn) - /* r' = r + f_total */ - r += ftotal; - else - /* r' = r + n*f_local */ - r += numrep * (flocal+ftotal); - } + /* r' = r + f_total */ + r += ftotal; /* bucket choose */ if (in->size == 0) { reject = 1; goto reject; } - if (map->choose_local_fallback_tries > 0 && + if (local_fallback_tries > 0 && flocal >= (in->size>>1) && - flocal > map->choose_local_fallback_tries) + flocal > local_fallback_tries) item = bucket_perm_choose(in, x, r); else item = crush_bucket_choose(in, x, r); @@ -394,13 +387,15 @@ static int crush_choose(const struct crush_map *map, reject = 0; if (!collide && recurse_to_leaf) { if (item < 0) { - if (crush_choose(map, + if (crush_choose_firstn(map, map->buckets[-1-item], - weight, + weight, weight_max, x, outpos+1, 0, out2, outpos, - firstn, 0, - map->chooseleaf_descend_once, + recurse_tries, 0, + local_tries, + local_fallback_tries, + 0, NULL) <= outpos) /* didn't get leaf */ reject = 1; @@ -414,6 +409,7 @@ static int crush_choose(const struct crush_map *map, /* out? */ if (itemtype == 0) reject = is_out(map, weight, + weight_max, item, x); else reject = 0; @@ -424,17 +420,14 @@ reject: ftotal++; flocal++; - if (reject && descend_once) - /* let outer call try again */ - skip_rep = 1; - else if (collide && flocal <= map->choose_local_tries) + if (collide && flocal <= local_tries) /* retry locally a few times */ retry_bucket = 1; - else if (map->choose_local_fallback_tries > 0 && - flocal <= in->size + map->choose_local_fallback_tries) + else if (local_fallback_tries > 0 && + flocal <= in->size + local_fallback_tries) /* exhaustive bucket search */ retry_bucket = 1; - else if (ftotal <= map->choose_total_tries) + else if (ftotal <= tries) /* then retry descent */ retry_descent = 1; else @@ -464,21 +457,179 @@ reject: /** + * crush_choose_indep: alternative breadth-first positionally stable mapping + * + */ +static void crush_choose_indep(const struct crush_map *map, + struct crush_bucket *bucket, + const __u32 *weight, int weight_max, + int x, int left, int numrep, int type, + int *out, int outpos, + unsigned int tries, + unsigned int recurse_tries, + int recurse_to_leaf, + int *out2, + int parent_r) +{ + struct crush_bucket *in = bucket; + int endpos = outpos + left; + int rep; + unsigned int ftotal; + int r; + int i; + int item = 0; + int itemtype; + int collide; + + dprintk("CHOOSE%s INDEP bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", + bucket->id, x, outpos, numrep); + + /* initially my result is undefined */ + for (rep = outpos; rep < endpos; rep++) { + out[rep] = CRUSH_ITEM_UNDEF; + if (out2) + out2[rep] = CRUSH_ITEM_UNDEF; + } + + for (ftotal = 0; left > 0 && ftotal < tries; ftotal++) { + for (rep = outpos; rep < endpos; rep++) { + if (out[rep] != CRUSH_ITEM_UNDEF) + continue; + + in = bucket; /* initial bucket */ + + /* choose through intervening buckets */ + for (;;) { + /* note: we base the choice on the position + * even in the nested call. that means that + * if the first layer chooses the same bucket + * in a different position, we will tend to + * choose a different item in that bucket. + * this will involve more devices in data + * movement and tend to distribute the load. + */ + r = rep + parent_r; + + /* be careful */ + if (in->alg == CRUSH_BUCKET_UNIFORM && + in->size % numrep == 0) + /* r'=r+(n+1)*f_total */ + r += (numrep+1) * ftotal; + else + /* r' = r + n*f_total */ + r += numrep * ftotal; + + /* bucket choose */ + if (in->size == 0) { + dprintk(" empty bucket\n"); + break; + } + + item = crush_bucket_choose(in, x, r); + if (item >= map->max_devices) { + dprintk(" bad item %d\n", item); + out[rep] = CRUSH_ITEM_NONE; + if (out2) + out2[rep] = CRUSH_ITEM_NONE; + left--; + break; + } + + /* desired type? */ + if (item < 0) + itemtype = map->buckets[-1-item]->type; + else + itemtype = 0; + dprintk(" item %d type %d\n", item, itemtype); + + /* keep going? */ + if (itemtype != type) { + if (item >= 0 || + (-1-item) >= map->max_buckets) { + dprintk(" bad item type %d\n", type); + out[rep] = CRUSH_ITEM_NONE; + if (out2) + out2[rep] = + CRUSH_ITEM_NONE; + left--; + break; + } + in = map->buckets[-1-item]; + continue; + } + + /* collision? */ + collide = 0; + for (i = outpos; i < endpos; i++) { + if (out[i] == item) { + collide = 1; + break; + } + } + if (collide) + break; + + if (recurse_to_leaf) { + if (item < 0) { + crush_choose_indep(map, + map->buckets[-1-item], + weight, weight_max, + x, 1, numrep, 0, + out2, rep, + recurse_tries, 0, + 0, NULL, r); + if (out2[rep] == CRUSH_ITEM_NONE) { + /* placed nothing; no leaf */ + break; + } + } else { + /* we already have a leaf! */ + out2[rep] = item; + } + } + + /* out? */ + if (itemtype == 0 && + is_out(map, weight, weight_max, item, x)) + break; + + /* yay! */ + out[rep] = item; + left--; + break; + } + } + } + for (rep = outpos; rep < endpos; rep++) { + if (out[rep] == CRUSH_ITEM_UNDEF) { + out[rep] = CRUSH_ITEM_NONE; + } + if (out2 && out2[rep] == CRUSH_ITEM_UNDEF) { + out2[rep] = CRUSH_ITEM_NONE; + } + } +} + +/** * crush_do_rule - calculate a mapping with the given input and rule * @map: the crush_map * @ruleno: the rule id * @x: hash input * @result: pointer to result vector * @result_max: maximum result size + * @weight: weight vector (for map leaves) + * @weight_max: size of weight vector + * @scratch: scratch vector for private use; must be >= 3 * result_max */ int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - const __u32 *weight) + const __u32 *weight, int weight_max, + int *scratch) { int result_len; - int a[CRUSH_MAX_SET]; - int b[CRUSH_MAX_SET]; - int c[CRUSH_MAX_SET]; + int *a = scratch; + int *b = scratch + result_max; + int *c = scratch + result_max*2; int recurse_to_leaf; int *w; int wsize = 0; @@ -489,8 +640,10 @@ int crush_do_rule(const struct crush_map *map, __u32 step; int i, j; int numrep; - int firstn; - const int descend_once = 0; + int choose_tries = map->choose_total_tries; + int choose_local_tries = map->choose_local_tries; + int choose_local_fallback_tries = map->choose_local_fallback_tries; + int choose_leaf_tries = 0; if ((__u32)ruleno >= map->max_rules) { dprintk(" bad ruleno %d\n", ruleno); @@ -503,29 +656,49 @@ int crush_do_rule(const struct crush_map *map, o = b; for (step = 0; step < rule->len; step++) { + int firstn = 0; struct crush_rule_step *curstep = &rule->steps[step]; - firstn = 0; switch (curstep->op) { case CRUSH_RULE_TAKE: w[0] = curstep->arg1; wsize = 1; break; - case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: + case CRUSH_RULE_SET_CHOOSE_TRIES: + if (curstep->arg1 > 0) + choose_tries = curstep->arg1; + break; + + case CRUSH_RULE_SET_CHOOSELEAF_TRIES: + if (curstep->arg1 > 0) + choose_leaf_tries = curstep->arg1; + break; + + case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES: + if (curstep->arg1 > 0) + choose_local_tries = curstep->arg1; + break; + + case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES: + if (curstep->arg1 > 0) + choose_local_fallback_tries = curstep->arg1; + break; + + case CRUSH_RULE_CHOOSELEAF_FIRSTN: case CRUSH_RULE_CHOOSE_FIRSTN: firstn = 1; /* fall through */ - case CRUSH_RULE_CHOOSE_LEAF_INDEP: + case CRUSH_RULE_CHOOSELEAF_INDEP: case CRUSH_RULE_CHOOSE_INDEP: if (wsize == 0) break; recurse_to_leaf = curstep->op == - CRUSH_RULE_CHOOSE_LEAF_FIRSTN || + CRUSH_RULE_CHOOSELEAF_FIRSTN || curstep->op == - CRUSH_RULE_CHOOSE_LEAF_INDEP; + CRUSH_RULE_CHOOSELEAF_INDEP; /* reset output */ osize = 0; @@ -543,22 +716,51 @@ int crush_do_rule(const struct crush_map *map, continue; } j = 0; - osize += crush_choose(map, - map->buckets[-1-w[i]], - weight, - x, numrep, - curstep->arg2, - o+osize, j, - firstn, - recurse_to_leaf, - descend_once, c+osize); + if (firstn) { + int recurse_tries; + if (choose_leaf_tries) + recurse_tries = + choose_leaf_tries; + else if (map->chooseleaf_descend_once) + recurse_tries = 1; + else + recurse_tries = choose_tries; + osize += crush_choose_firstn( + map, + map->buckets[-1-w[i]], + weight, weight_max, + x, numrep, + curstep->arg2, + o+osize, j, + choose_tries, + recurse_tries, + choose_local_tries, + choose_local_fallback_tries, + recurse_to_leaf, + c+osize); + } else { + crush_choose_indep( + map, + map->buckets[-1-w[i]], + weight, weight_max, + x, numrep, numrep, + curstep->arg2, + o+osize, j, + choose_tries, + choose_leaf_tries ? + choose_leaf_tries : 1, + recurse_to_leaf, + c+osize, + 0); + osize += numrep; + } } if (recurse_to_leaf) /* copy final _leaf_ values to output set */ memcpy(o, c, osize*sizeof(*o)); - /* swap t and w arrays */ + /* swap o and w arrays */ tmp = o; o = w; w = tmp; diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 83661cd..258a382 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -132,7 +132,8 @@ static int osdc_show(struct seq_file *s, void *pp) req->r_osd ? req->r_osd->o_osd : -1, req->r_pgid.pool, req->r_pgid.seed); - seq_printf(s, "%.*s", req->r_oid_len, req->r_oid); + seq_printf(s, "%.*s", req->r_base_oid.name_len, + req->r_base_oid.name); if (req->r_reassert_version.epoch) seq_printf(s, "\t%u'%llu", diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 4a5df7b..2ed1304 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -15,6 +15,7 @@ #include <linux/dns_resolver.h> #include <net/tcp.h> +#include <linux/ceph/ceph_features.h> #include <linux/ceph/libceph.h> #include <linux/ceph/messenger.h> #include <linux/ceph/decode.h> @@ -1865,7 +1866,9 @@ int ceph_parse_ips(const char *c, const char *end, port = (port * 10) + (*p - '0'); p++; } - if (port > 65535 || port == 0) + if (port == 0) + port = CEPH_MON_PORT; + else if (port > 65535) goto bad; } else { port = CEPH_MON_PORT; @@ -1945,7 +1948,8 @@ static int process_connect(struct ceph_connection *con) { u64 sup_feat = con->msgr->supported_features; u64 req_feat = con->msgr->required_features; - u64 server_feat = le64_to_cpu(con->in_reply.features); + u64 server_feat = ceph_sanitize_features( + le64_to_cpu(con->in_reply.features)); int ret; dout("process_connect on %p tag %d\n", con, (int)con->in_tag); @@ -2853,8 +2857,8 @@ static void con_fault(struct ceph_connection *con) */ void ceph_messenger_init(struct ceph_messenger *msgr, struct ceph_entity_addr *myaddr, - u32 supported_features, - u32 required_features, + u64 supported_features, + u64 required_features, bool nocrc) { msgr->supported_features = supported_features; @@ -3126,15 +3130,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, INIT_LIST_HEAD(&m->data); /* front */ - m->front_max = front_len; if (front_len) { - if (front_len > PAGE_CACHE_SIZE) { - m->front.iov_base = __vmalloc(front_len, flags, - PAGE_KERNEL); - m->front_is_vmalloc = true; - } else { - m->front.iov_base = kmalloc(front_len, flags); - } + m->front.iov_base = ceph_kvmalloc(front_len, flags); if (m->front.iov_base == NULL) { dout("ceph_msg_new can't allocate %d bytes\n", front_len); @@ -3143,7 +3140,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, } else { m->front.iov_base = NULL; } - m->front.iov_len = front_len; + m->front_alloc_len = m->front.iov_len = front_len; dout("ceph_msg_new %p front %d\n", m, front_len); return m; @@ -3256,10 +3253,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) void ceph_msg_kfree(struct ceph_msg *m) { dout("msg_kfree %p\n", m); - if (m->front_is_vmalloc) - vfree(m->front.iov_base); - else - kfree(m->front.iov_base); + ceph_kvfree(m->front.iov_base); kmem_cache_free(ceph_msg_cache, m); } @@ -3301,8 +3295,8 @@ EXPORT_SYMBOL(ceph_msg_last_put); void ceph_msg_dump(struct ceph_msg *msg) { - pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, - msg->front_max, msg->data_length); + pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg, + msg->front_alloc_len, msg->data_length); print_hex_dump(KERN_DEBUG, "header: ", DUMP_PREFIX_OFFSET, 16, 1, &msg->hdr, sizeof(msg->hdr), true); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 1fe25cd..2ac9ef3 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -152,7 +152,7 @@ static int __open_session(struct ceph_mon_client *monc) /* initiatiate authentication handshake */ ret = ceph_auth_build_hello(monc->auth, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); __send_prepared_auth_request(monc, ret); } else { dout("open_session mon%d already open\n", monc->cur_mon); @@ -196,7 +196,7 @@ static void __send_subscribe(struct ceph_mon_client *monc) int num; p = msg->front.iov_base; - end = p + msg->front_max; + end = p + msg->front_alloc_len; num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; ceph_encode_32(&p, num); @@ -897,7 +897,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc, ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, msg->front.iov_len, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); if (ret < 0) { monc->client->auth_err = ret; wake_up_all(&monc->client->auth_wq); @@ -939,7 +939,7 @@ static int __validate_auth(struct ceph_mon_client *monc) return 0; ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, - monc->m_auth->front_max); + monc->m_auth->front_alloc_len); if (ret <= 0) return ret; /* either an error, or no need to authenticate */ __send_prepared_auth_request(monc, ret); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 2b4b32a..010ff3b 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, msg_size = 4 + 4 + 8 + 8 + 4+8; msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ msg_size += 1 + 8 + 4 + 4; /* pg_t */ - msg_size += 4 + MAX_OBJ_NAME_SIZE; + msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); msg_size += 8; /* snapid */ msg_size += 8; /* snap_seq */ @@ -368,6 +368,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); + req->r_base_oloc.pool = -1; + req->r_target_oloc.pool = -1; + /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); @@ -761,11 +764,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (num_ops > 1) osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); - req->r_file_layout = *layout; /* keep a copy */ + req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); - snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", - vino.ino, objnum); - req->r_oid_len = strlen(req->r_oid); + snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), + "%llx.%08llx", vino.ino, objnum); + req->r_base_oid.name_len = strlen(req->r_base_oid.name); return req; } @@ -1044,8 +1047,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) !ceph_con_opened(&osd->o_con)) { struct ceph_osd_request *req; - dout(" osd addr hasn't changed and connection never opened," - " letting msgr retry"); + dout("osd addr hasn't changed and connection never opened, " + "letting msgr retry\n"); /* touch each r_stamp for handle_timeout()'s benfit */ list_for_each_entry(req, &osd->o_requests, r_osd_item) req->r_stamp = jiffies; @@ -1232,6 +1235,61 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, EXPORT_SYMBOL(ceph_osdc_set_request_linger); /* + * Returns whether a request should be blocked from being sent + * based on the current osdmap and osd_client settings. + * + * Caller should hold map_sem for read. + */ +static bool __req_should_be_paused(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); + bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); + return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) || + (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr); +} + +/* + * Calculate mapping of a request to a PG. Takes tiering into account. + */ +static int __calc_request_pg(struct ceph_osdmap *osdmap, + struct ceph_osd_request *req, + struct ceph_pg *pg_out) +{ + bool need_check_tiering; + + need_check_tiering = false; + if (req->r_target_oloc.pool == -1) { + req->r_target_oloc = req->r_base_oloc; /* struct */ + need_check_tiering = true; + } + if (req->r_target_oid.name_len == 0) { + ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); + need_check_tiering = true; + } + + if (need_check_tiering && + (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { + struct ceph_pg_pool_info *pi; + + pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool); + if (pi) { + if ((req->r_flags & CEPH_OSD_FLAG_READ) && + pi->read_tier >= 0) + req->r_target_oloc.pool = pi->read_tier; + if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && + pi->write_tier >= 0) + req->r_target_oloc.pool = pi->write_tier; + } + /* !pi is caught in ceph_oloc_oid_to_pg() */ + } + + return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc, + &req->r_target_oid, pg_out); +} + +/* * Pick an osd (the first 'up' osd in the pg), allocate the osd struct * (as needed), and set the request r_osd appropriately. If there is * no up osd, set r_osd to NULL. Move the request to the appropriate list @@ -1248,10 +1306,11 @@ static int __map_request(struct ceph_osd_client *osdc, int acting[CEPH_PG_MAX_SIZE]; int o = -1, num = 0; int err; + bool was_paused; dout("map_request %p tid %lld\n", req, req->r_tid); - err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, - ceph_file_layout_pg_pool(req->r_file_layout)); + + err = __calc_request_pg(osdc->osdmap, req, &pgid); if (err) { list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; @@ -1264,12 +1323,18 @@ static int __map_request(struct ceph_osd_client *osdc, num = err; } + was_paused = req->r_paused; + req->r_paused = __req_should_be_paused(osdc, req); + if (was_paused && !req->r_paused) + force_resend = 1; + if ((!force_resend && req->r_osd && req->r_osd->o_osd == o && req->r_sent >= req->r_osd->o_incarnation && req->r_num_pg_osds == num && memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || - (req->r_osd == NULL && o == -1)) + (req->r_osd == NULL && o == -1) || + req->r_paused) return 0; /* no change */ dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", @@ -1331,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc, /* fill in message content that changes each time we send it */ put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); put_unaligned_le32(req->r_flags, req->r_request_flags); - put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); + put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool); p = req->r_request_pgid; ceph_encode_64(&p, req->r_pgid.pool); ceph_encode_32(&p, req->r_pgid.seed); @@ -1432,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work) round_jiffies_relative(delay)); } +static int ceph_oloc_decode(void **p, void *end, + struct ceph_object_locator *oloc) +{ + u8 struct_v, struct_cv; + u32 len; + void *struct_end; + int ret = 0; + + ceph_decode_need(p, end, 1 + 1 + 4, e_inval); + struct_v = ceph_decode_8(p); + struct_cv = ceph_decode_8(p); + if (struct_v < 3) { + pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", + struct_v, struct_cv); + goto e_inval; + } + if (struct_cv > 6) { + pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", + struct_v, struct_cv); + goto e_inval; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, e_inval); + struct_end = *p + len; + + oloc->pool = ceph_decode_64(p); + *p += 4; /* skip preferred */ + + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_object_locator::key is set\n"); + goto e_inval; + } + + if (struct_v >= 5) { + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_object_locator::nspace is set\n"); + goto e_inval; + } + } + + if (struct_v >= 6) { + s64 hash = ceph_decode_64(p); + if (hash != -1) { + pr_warn("ceph_object_locator::hash is set\n"); + goto e_inval; + } + } + + /* skip the rest */ + *p = struct_end; +out: + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + +static int ceph_redirect_decode(void **p, void *end, + struct ceph_request_redirect *redir) +{ + u8 struct_v, struct_cv; + u32 len; + void *struct_end; + int ret; + + ceph_decode_need(p, end, 1 + 1 + 4, e_inval); + struct_v = ceph_decode_8(p); + struct_cv = ceph_decode_8(p); + if (struct_cv > 1) { + pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", + struct_v, struct_cv); + goto e_inval; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, e_inval); + struct_end = *p + len; + + ret = ceph_oloc_decode(p, end, &redir->oloc); + if (ret) + goto out; + + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_request_redirect::object_name is set\n"); + goto e_inval; + } + + len = ceph_decode_32(p); + *p += len; /* skip osd_instructions */ + + /* skip the rest */ + *p = struct_end; +out: + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + static void complete_request(struct ceph_osd_request *req) { complete_all(&req->r_safe_completion); /* fsync waiter */ @@ -1446,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, { void *p, *end; struct ceph_osd_request *req; + struct ceph_request_redirect redir; u64 tid; int object_len; unsigned int numops; @@ -1525,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, for (i = 0; i < numops; i++) req->r_reply_op_result[i] = ceph_decode_32(&p); - already_completed = req->r_got_reply; + if (le16_to_cpu(msg->hdr.version) >= 6) { + p += 8 + 4; /* skip replay_version */ + p += 8; /* skip user_version */ - if (!req->r_got_reply) { + err = ceph_redirect_decode(&p, end, &redir); + if (err) + goto bad_put; + } else { + redir.oloc.pool = -1; + } + + if (redir.oloc.pool != -1) { + dout("redirect pool %lld\n", redir.oloc.pool); + + __unregister_request(osdc, req); + mutex_unlock(&osdc->request_mutex); + + req->r_target_oloc = redir.oloc; /* struct */ + + /* + * Start redirect requests with nofail=true. If + * mapping fails, request will end up on the notarget + * list, waiting for the new osdmap (which can take + * a while), even though the original request mapped + * successfully. In the future we might want to follow + * original request's nofail setting here. + */ + err = ceph_osdc_start_request(osdc, req, true); + BUG_ON(err); + goto done; + } + + already_completed = req->r_got_reply; + if (!req->r_got_reply) { req->r_result = result; dout("handle_reply result %d bytes %d\n", req->r_result, bytes); @@ -1581,6 +1781,13 @@ done: return; bad_put: + req->r_result = -EIO; + __unregister_request(osdc, req); + if (req->r_callback) + req->r_callback(req, msg); + else + complete_all(&req->r_completion); + complete_request(req); ceph_osdc_put_request(req); bad_mutex: mutex_unlock(&osdc->request_mutex); @@ -1613,14 +1820,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc) * * Caller should hold map_sem for read. */ -static void kick_requests(struct ceph_osd_client *osdc, int force_resend) +static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, + bool force_resend_writes) { struct ceph_osd_request *req, *nreq; struct rb_node *p; int needmap = 0; int err; + bool force_resend_req; - dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); + dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", + force_resend_writes ? " (force resend writes)" : ""); mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; ) { req = rb_entry(p, struct ceph_osd_request, r_node); @@ -1645,7 +1855,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) continue; } - err = __map_request(osdc, req, force_resend); + force_resend_req = force_resend || + (force_resend_writes && + req->r_flags & CEPH_OSD_FLAG_WRITE); + err = __map_request(osdc, req, force_resend_req); if (err < 0) continue; /* error */ if (req->r_osd == NULL) { @@ -1665,7 +1878,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) r_linger_item) { dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); - err = __map_request(osdc, req, force_resend); + err = __map_request(osdc, req, + force_resend || force_resend_writes); dout("__map_request returned %d\n", err); if (err == 0) continue; /* no change and no osd was specified */ @@ -1707,6 +1921,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) struct ceph_osdmap *newmap = NULL, *oldmap; int err; struct ceph_fsid fsid; + bool was_full; dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); p = msg->front.iov_base; @@ -1720,6 +1935,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) down_write(&osdc->map_sem); + was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); + /* incremental maps */ ceph_decode_32_safe(&p, end, nr_maps, bad); dout(" %d inc maps\n", nr_maps); @@ -1744,7 +1961,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ceph_osdmap_destroy(osdc->osdmap); osdc->osdmap = newmap; } - kick_requests(osdc, 0); + was_full = was_full || + ceph_osdmap_flag(osdc->osdmap, + CEPH_OSDMAP_FULL); + kick_requests(osdc, 0, was_full); } else { dout("ignoring incremental map %u len %d\n", epoch, maplen); @@ -1787,7 +2007,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) skipped_map = 1; ceph_osdmap_destroy(oldmap); } - kick_requests(osdc, skipped_map); + was_full = was_full || + ceph_osdmap_flag(osdc->osdmap, + CEPH_OSDMAP_FULL); + kick_requests(osdc, skipped_map, was_full); } p += maplen; nr_maps--; @@ -1804,7 +2027,9 @@ done: * we find out when we are no longer full and stop returning * ENOSPC. */ - if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) ceph_monc_request_next_osdmap(&osdc->client->monc); mutex_lock(&osdc->request_mutex); @@ -2068,10 +2293,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, ceph_encode_32(&p, -1); /* preferred */ /* oid */ - ceph_encode_32(&p, req->r_oid_len); - memcpy(p, req->r_oid, req->r_oid_len); - dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); - p += req->r_oid_len; + ceph_encode_32(&p, req->r_base_oid.name_len); + memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); + dout("oid '%.*s' len %d\n", req->r_base_oid.name_len, + req->r_base_oid.name, req->r_base_oid.name_len); + p += req->r_base_oid.name_len; /* ops--can imply data */ ceph_encode_16(&p, (u16)req->r_num_ops); @@ -2454,7 +2680,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, struct ceph_osd_client *osdc = osd->o_osdc; struct ceph_msg *m; struct ceph_osd_request *req; - int front = le32_to_cpu(hdr->front_len); + int front_len = le32_to_cpu(hdr->front_len); int data_len = le32_to_cpu(hdr->data_len); u64 tid; @@ -2474,12 +2700,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply, req->r_reply->con); ceph_msg_revoke_incoming(req->r_reply); - if (front > req->r_reply->front.iov_len) { + if (front_len > req->r_reply->front_alloc_len) { pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", - front, (int)req->r_reply->front.iov_len, + front_len, req->r_reply->front_alloc_len, (unsigned int)con->peer_name.type, le64_to_cpu(con->peer_name.num)); - m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); + m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, + false); if (!m) goto out; ceph_msg_put(req->r_reply); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index dbd9a47..aade4a5 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -464,6 +464,11 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id) return NULL; } +struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id) +{ + return __lookup_pg_pool(&map->pg_pools, id); +} + const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) { struct ceph_pg_pool_info *pi; @@ -514,8 +519,8 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); return -EINVAL; } - if (cv > 7) { - pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv); + if (cv > 9) { + pr_warning("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv); return -EINVAL; } len = ceph_decode_32(p); @@ -543,12 +548,34 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) *p += len; } - /* skip removed snaps */ + /* skip removed_snaps */ num = ceph_decode_32(p); *p += num * (8 + 8); *p += 8; /* skip auid */ pi->flags = ceph_decode_64(p); + *p += 4; /* skip crash_replay_interval */ + + if (ev >= 7) + *p += 1; /* skip min_size */ + + if (ev >= 8) + *p += 8 + 8; /* skip quota_max_* */ + + if (ev >= 9) { + /* skip tiers */ + num = ceph_decode_32(p); + *p += num * 8; + + *p += 8; /* skip tier_of */ + *p += 1; /* skip cache_mode */ + + pi->read_tier = ceph_decode_64(p); + pi->write_tier = ceph_decode_64(p); + } else { + pi->read_tier = -1; + pi->write_tier = -1; + } /* ignore the rest */ @@ -1090,25 +1117,40 @@ invalid: EXPORT_SYMBOL(ceph_calc_file_object_mapping); /* - * calculate an object layout (i.e. pgid) from an oid, - * file_layout, and osdmap + * Calculate mapping of a (oloc, oid) pair to a PG. Should only be + * called with target's (oloc, oid), since tiering isn't taken into + * account. */ -int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, - struct ceph_osdmap *osdmap, uint64_t pool) +int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, + struct ceph_object_locator *oloc, + struct ceph_object_id *oid, + struct ceph_pg *pg_out) { - struct ceph_pg_pool_info *pool_info; + struct ceph_pg_pool_info *pi; - BUG_ON(!osdmap); - pool_info = __lookup_pg_pool(&osdmap->pg_pools, pool); - if (!pool_info) + pi = __lookup_pg_pool(&osdmap->pg_pools, oloc->pool); + if (!pi) return -EIO; - pg->pool = pool; - pg->seed = ceph_str_hash(pool_info->object_hash, oid, strlen(oid)); - dout("%s '%s' pgid %lld.%x\n", __func__, oid, pg->pool, pg->seed); + pg_out->pool = oloc->pool; + pg_out->seed = ceph_str_hash(pi->object_hash, oid->name, + oid->name_len); + + dout("%s '%.*s' pgid %llu.%x\n", __func__, oid->name_len, oid->name, + pg_out->pool, pg_out->seed); return 0; } -EXPORT_SYMBOL(ceph_calc_ceph_pg); +EXPORT_SYMBOL(ceph_oloc_oid_to_pg); + +static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x, + int *result, int result_max, + const __u32 *weight, int weight_max) +{ + int scratch[result_max * 3]; + + return crush_do_rule(map, ruleno, x, result, result_max, + weight, weight_max, scratch); +} /* * Calculate raw osd vector for the given pgid. Return pointer to osd @@ -1163,9 +1205,9 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, pool->pgp_num_mask) + (unsigned)pgid.pool; } - r = crush_do_rule(osdmap->crush, ruleno, pps, osds, - min_t(int, pool->size, *num), - osdmap->osd_weight); + r = crush_do_rule_ary(osdmap->crush, ruleno, pps, + osds, min_t(int, pool->size, *num), + osdmap->osd_weight, osdmap->max_osd); if (r < 0) { pr_err("error %d from crush rule: pool %lld ruleset %d type %d" " size %d\n", r, pgid.pool, pool->crush_ruleset, |