diff options
-rw-r--r-- | drivers/block/rbd.c | 1773 | ||||
-rw-r--r-- | fs/ceph/addr.c | 7 | ||||
-rw-r--r-- | fs/ceph/caps.c | 32 | ||||
-rw-r--r-- | fs/ceph/file.c | 8 | ||||
-rw-r--r-- | fs/ceph/ioctl.c | 2 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 33 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 6 | ||||
-rw-r--r-- | fs/ceph/strings.c | 4 | ||||
-rw-r--r-- | fs/ceph/super.h | 8 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 210 | ||||
-rw-r--r-- | include/linux/ceph/ceph_features.h | 8 | ||||
-rw-r--r-- | include/linux/ceph/ceph_fs.h | 32 | ||||
-rw-r--r-- | include/linux/ceph/decode.h | 29 | ||||
-rw-r--r-- | include/linux/ceph/libceph.h | 16 | ||||
-rw-r--r-- | include/linux/ceph/messenger.h | 2 | ||||
-rw-r--r-- | include/linux/ceph/osd_client.h | 54 | ||||
-rw-r--r-- | include/linux/ceph/osdmap.h | 2 | ||||
-rw-r--r-- | include/linux/ceph/rados.h | 93 | ||||
-rw-r--r-- | include/linux/crush/crush.h | 2 | ||||
-rw-r--r-- | net/ceph/ceph_common.c | 16 | ||||
-rw-r--r-- | net/ceph/ceph_strings.c | 39 | ||||
-rw-r--r-- | net/ceph/crush/mapper.c | 15 | ||||
-rw-r--r-- | net/ceph/messenger.c | 5 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 418 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 43 | ||||
-rw-r--r-- | net/ceph/pagevec.c | 24 |
26 files changed, 1756 insertions, 1125 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 89576a0..b0eea3e 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -52,9 +52,12 @@ #define SECTOR_SHIFT 9 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) -/* It might be useful to have this defined elsewhere too */ +/* It might be useful to have these defined elsewhere */ -#define U64_MAX ((u64) (~0ULL)) +#define U8_MAX ((u8) (~0U)) +#define U16_MAX ((u16) (~0U)) +#define U32_MAX ((u32) (~0U)) +#define U64_MAX ((u64) (~0ULL)) #define RBD_DRV_NAME "rbd" #define RBD_DRV_NAME_LONG "rbd (rados block device)" @@ -66,7 +69,6 @@ (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ -#define RBD_MAX_OPT_LEN 1024 #define RBD_SNAP_HEAD_NAME "-" @@ -93,8 +95,6 @@ #define DEV_NAME_LEN 32 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) -#define RBD_READ_ONLY_DEFAULT false - /* * block device image metadata (in-memory version) */ @@ -119,16 +119,33 @@ struct rbd_image_header { * An rbd image specification. * * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely - * identify an image. + * identify an image. Each rbd_dev structure includes a pointer to + * an rbd_spec structure that encapsulates this identity. + * + * Each of the id's in an rbd_spec has an associated name. For a + * user-mapped image, the names are supplied and the id's associated + * with them are looked up. For a layered image, a parent image is + * defined by the tuple, and the names are looked up. + * + * An rbd_dev structure contains a parent_spec pointer which is + * non-null if the image it represents is a child in a layered + * image. This pointer will refer to the rbd_spec structure used + * by the parent rbd_dev for its own identity (i.e., the structure + * is shared between the parent and child). + * + * Since these structures are populated once, during the discovery + * phase of image construction, they are effectively immutable so + * we make no effort to synchronize access to them. + * + * Note that code herein does not assume the image name is known (it + * could be a null pointer). */ struct rbd_spec { u64 pool_id; char *pool_name; char *image_id; - size_t image_id_len; char *image_name; - size_t image_name_len; u64 snap_id; char *snap_name; @@ -136,10 +153,6 @@ struct rbd_spec { struct kref kref; }; -struct rbd_options { - bool read_only; -}; - /* * an instance of the client. multiple devices may share an rbd client. */ @@ -149,37 +162,76 @@ struct rbd_client { struct list_head node; }; -/* - * a request completion status - */ -struct rbd_req_status { - int done; - int rc; - u64 bytes; +struct rbd_img_request; +typedef void (*rbd_img_callback_t)(struct rbd_img_request *); + +#define BAD_WHICH U32_MAX /* Good which or bad which, which? */ + +struct rbd_obj_request; +typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *); + +enum obj_request_type { + OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES }; -/* - * a collection of requests - */ -struct rbd_req_coll { - int total; - int num_done; +struct rbd_obj_request { + const char *object_name; + u64 offset; /* object start byte */ + u64 length; /* bytes from offset */ + + struct rbd_img_request *img_request; + struct list_head links; /* img_request->obj_requests */ + u32 which; /* posn image request list */ + + enum obj_request_type type; + union { + struct bio *bio_list; + struct { + struct page **pages; + u32 page_count; + }; + }; + + struct ceph_osd_request *osd_req; + + u64 xferred; /* bytes transferred */ + u64 version; + s32 result; + atomic_t done; + + rbd_obj_callback_t callback; + struct completion completion; + struct kref kref; - struct rbd_req_status status[0]; }; -/* - * a single io request - */ -struct rbd_request { - struct request *rq; /* blk layer request */ - struct bio *bio; /* cloned bio */ - struct page **pages; /* list of used pages */ - u64 len; - int coll_index; - struct rbd_req_coll *coll; +struct rbd_img_request { + struct request *rq; + struct rbd_device *rbd_dev; + u64 offset; /* starting image byte offset */ + u64 length; /* byte count from offset */ + bool write_request; /* false for read */ + union { + struct ceph_snap_context *snapc; /* for writes */ + u64 snap_id; /* for reads */ + }; + spinlock_t completion_lock;/* protects next_completion */ + u32 next_completion; + rbd_img_callback_t callback; + + u32 obj_request_count; + struct list_head obj_requests; /* rbd_obj_request structs */ + + struct kref kref; }; +#define for_each_obj_request(ireq, oreq) \ + list_for_each_entry(oreq, &(ireq)->obj_requests, links) +#define for_each_obj_request_from(ireq, oreq) \ + list_for_each_entry_from(oreq, &(ireq)->obj_requests, links) +#define for_each_obj_request_safe(ireq, oreq, n) \ + list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) + struct rbd_snap { struct device dev; const char *name; @@ -209,16 +261,18 @@ struct rbd_device { char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ - spinlock_t lock; /* queue lock */ + spinlock_t lock; /* queue, flags, open_count */ struct rbd_image_header header; - bool exists; + unsigned long flags; /* possibly lock protected */ struct rbd_spec *spec; char *header_name; + struct ceph_file_layout layout; + struct ceph_osd_event *watch_event; - struct ceph_osd_request *watch_request; + struct rbd_obj_request *watch_request; struct rbd_spec *parent_spec; u64 parent_overlap; @@ -235,7 +289,19 @@ struct rbd_device { /* sysfs related */ struct device dev; - unsigned long open_count; + unsigned long open_count; /* protected by lock */ +}; + +/* + * Flag bits for rbd_dev->flags. If atomicity is required, + * rbd_dev->lock is used to protect access. + * + * Currently, only the "removing" flag (which is coupled with the + * "open_count" field) requires atomic access. + */ +enum rbd_dev_flags { + RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ + RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ }; static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ @@ -277,6 +343,33 @@ static struct device rbd_root_dev = { .release = rbd_root_dev_release, }; +static __printf(2, 3) +void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) +{ + struct va_format vaf; + va_list args; + + va_start(args, fmt); + vaf.fmt = fmt; + vaf.va = &args; + + if (!rbd_dev) + printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); + else if (rbd_dev->disk) + printk(KERN_WARNING "%s: %s: %pV\n", + RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); + else if (rbd_dev->spec && rbd_dev->spec->image_name) + printk(KERN_WARNING "%s: image %s: %pV\n", + RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); + else if (rbd_dev->spec && rbd_dev->spec->image_id) + printk(KERN_WARNING "%s: id %s: %pV\n", + RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); + else /* punt */ + printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", + RBD_DRV_NAME, rbd_dev, &vaf); + va_end(args); +} + #ifdef RBD_DEBUG #define rbd_assert(expr) \ if (unlikely(!(expr))) { \ @@ -296,14 +389,23 @@ static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); static int rbd_open(struct block_device *bdev, fmode_t mode) { struct rbd_device *rbd_dev = bdev->bd_disk->private_data; + bool removing = false; if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) return -EROFS; + spin_lock_irq(&rbd_dev->lock); + if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) + removing = true; + else + rbd_dev->open_count++; + spin_unlock_irq(&rbd_dev->lock); + if (removing) + return -ENOENT; + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); (void) get_device(&rbd_dev->dev); set_device_ro(bdev, rbd_dev->mapping.read_only); - rbd_dev->open_count++; mutex_unlock(&ctl_mutex); return 0; @@ -312,10 +414,14 @@ static int rbd_open(struct block_device *bdev, fmode_t mode) static int rbd_release(struct gendisk *disk, fmode_t mode) { struct rbd_device *rbd_dev = disk->private_data; + unsigned long open_count_before; + + spin_lock_irq(&rbd_dev->lock); + open_count_before = rbd_dev->open_count--; + spin_unlock_irq(&rbd_dev->lock); + rbd_assert(open_count_before > 0); mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - rbd_assert(rbd_dev->open_count > 0); - rbd_dev->open_count--; put_device(&rbd_dev->dev); mutex_unlock(&ctl_mutex); @@ -426,6 +532,12 @@ static match_table_t rbd_opts_tokens = { {-1, NULL} }; +struct rbd_options { + bool read_only; +}; + +#define RBD_READ_ONLY_DEFAULT false + static int parse_rbd_opts_token(char *c, void *private) { struct rbd_options *rbd_opts = private; @@ -512,18 +624,6 @@ static void rbd_put_client(struct rbd_client *rbdc) kref_put(&rbdc->kref, rbd_client_release); } -/* - * Destroy requests collection - */ -static void rbd_coll_release(struct kref *kref) -{ - struct rbd_req_coll *coll = - container_of(kref, struct rbd_req_coll, kref); - - dout("rbd_coll_release %p\n", coll); - kfree(coll); -} - static bool rbd_image_format_valid(u32 image_format) { return image_format == 1 || image_format == 2; @@ -707,7 +807,8 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev) goto done; rbd_dev->mapping.read_only = true; } - rbd_dev->exists = true; + set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); + done: return ret; } @@ -724,7 +825,7 @@ static void rbd_header_free(struct rbd_image_header *header) header->snapc = NULL; } -static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) +static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) { char *name; u64 segment; @@ -767,23 +868,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev, return length; } -static int rbd_get_num_segments(struct rbd_image_header *header, - u64 ofs, u64 len) -{ - u64 start_seg; - u64 end_seg; - - if (!len) - return 0; - if (len - 1 > U64_MAX - ofs) - return -ERANGE; - - start_seg = ofs >> header->obj_order; - end_seg = (ofs + len - 1) >> header->obj_order; - - return end_seg - start_seg + 1; -} - /* * returns the size of an object in the image */ @@ -949,8 +1033,10 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src, unsigned int bi_size; struct bio *bio; - if (!bi) + if (!bi) { + rbd_warn(NULL, "bio_chain exhausted with %u left", len); goto out_err; /* EINVAL; ran out of bio's */ + } bi_size = min_t(unsigned int, bi->bi_size - off, len); bio = bio_clone_range(bi, off, bi_size, gfpmask); if (!bio) @@ -976,399 +1062,665 @@ out_err: return NULL; } -/* - * helpers for osd request op vectors. - */ -static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops, - int opcode, u32 payload_len) +static void rbd_obj_request_get(struct rbd_obj_request *obj_request) { - struct ceph_osd_req_op *ops; + kref_get(&obj_request->kref); +} - ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO); - if (!ops) +static void rbd_obj_request_destroy(struct kref *kref); +static void rbd_obj_request_put(struct rbd_obj_request *obj_request) +{ + rbd_assert(obj_request != NULL); + kref_put(&obj_request->kref, rbd_obj_request_destroy); +} + +static void rbd_img_request_get(struct rbd_img_request *img_request) +{ + kref_get(&img_request->kref); +} + +static void rbd_img_request_destroy(struct kref *kref); +static void rbd_img_request_put(struct rbd_img_request *img_request) +{ + rbd_assert(img_request != NULL); + kref_put(&img_request->kref, rbd_img_request_destroy); +} + +static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, + struct rbd_obj_request *obj_request) +{ + rbd_assert(obj_request->img_request == NULL); + + rbd_obj_request_get(obj_request); + obj_request->img_request = img_request; + obj_request->which = img_request->obj_request_count; + rbd_assert(obj_request->which != BAD_WHICH); + img_request->obj_request_count++; + list_add_tail(&obj_request->links, &img_request->obj_requests); +} + +static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, + struct rbd_obj_request *obj_request) +{ + rbd_assert(obj_request->which != BAD_WHICH); + + list_del(&obj_request->links); + rbd_assert(img_request->obj_request_count > 0); + img_request->obj_request_count--; + rbd_assert(obj_request->which == img_request->obj_request_count); + obj_request->which = BAD_WHICH; + rbd_assert(obj_request->img_request == img_request); + obj_request->img_request = NULL; + obj_request->callback = NULL; + rbd_obj_request_put(obj_request); +} + +static bool obj_request_type_valid(enum obj_request_type type) +{ + switch (type) { + case OBJ_REQUEST_NODATA: + case OBJ_REQUEST_BIO: + case OBJ_REQUEST_PAGES: + return true; + default: + return false; + } +} + +struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...) +{ + struct ceph_osd_req_op *op; + va_list args; + size_t size; + + op = kzalloc(sizeof (*op), GFP_NOIO); + if (!op) return NULL; + op->op = opcode; + va_start(args, opcode); + switch (opcode) { + case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_WRITE: + /* rbd_osd_req_op_create(READ, offset, length) */ + /* rbd_osd_req_op_create(WRITE, offset, length) */ + op->extent.offset = va_arg(args, u64); + op->extent.length = va_arg(args, u64); + if (opcode == CEPH_OSD_OP_WRITE) + op->payload_len = op->extent.length; + break; + case CEPH_OSD_OP_STAT: + break; + case CEPH_OSD_OP_CALL: + /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */ + op->cls.class_name = va_arg(args, char *); + size = strlen(op->cls.class_name); + rbd_assert(size <= (size_t) U8_MAX); + op->cls.class_len = size; + op->payload_len = size; + + op->cls.method_name = va_arg(args, char *); + size = strlen(op->cls.method_name); + rbd_assert(size <= (size_t) U8_MAX); + op->cls.method_len = size; + op->payload_len += size; + + op->cls.argc = 0; + op->cls.indata = va_arg(args, void *); + size = va_arg(args, size_t); + rbd_assert(size <= (size_t) U32_MAX); + op->cls.indata_len = (u32) size; + op->payload_len += size; + break; + case CEPH_OSD_OP_NOTIFY_ACK: + case CEPH_OSD_OP_WATCH: + /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */ + /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */ + op->watch.cookie = va_arg(args, u64); + op->watch.ver = va_arg(args, u64); + op->watch.ver = cpu_to_le64(op->watch.ver); + if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int)) + op->watch.flag = (u8) 1; + break; + default: + rbd_warn(NULL, "unsupported opcode %hu\n", opcode); + kfree(op); + op = NULL; + break; + } + va_end(args); - ops[0].op = opcode; + return op; +} - /* - * op extent offset and length will be set later on - * in calc_raw_layout() - */ - ops[0].payload_len = payload_len; +static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op) +{ + kfree(op); +} - return ops; +static int rbd_obj_request_submit(struct ceph_osd_client *osdc, + struct rbd_obj_request *obj_request) +{ + return ceph_osdc_start_request(osdc, obj_request->osd_req, false); } -static void rbd_destroy_ops(struct ceph_osd_req_op *ops) +static void rbd_img_request_complete(struct rbd_img_request *img_request) { - kfree(ops); + if (img_request->callback) + img_request->callback(img_request); + else + rbd_img_request_put(img_request); } -static void rbd_coll_end_req_index(struct request *rq, - struct rbd_req_coll *coll, - int index, - int ret, u64 len) +/* Caller is responsible for rbd_obj_request_destroy(obj_request) */ + +static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) { - struct request_queue *q; - int min, max, i; + return wait_for_completion_interruptible(&obj_request->completion); +} - dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n", - coll, index, ret, (unsigned long long) len); +static void obj_request_done_init(struct rbd_obj_request *obj_request) +{ + atomic_set(&obj_request->done, 0); + smp_wmb(); +} - if (!rq) - return; +static void obj_request_done_set(struct rbd_obj_request *obj_request) +{ + atomic_set(&obj_request->done, 1); + smp_wmb(); +} - if (!coll) { - blk_end_request(rq, ret, len); - return; - } +static bool obj_request_done_test(struct rbd_obj_request *obj_request) +{ + smp_rmb(); + return atomic_read(&obj_request->done) != 0; +} + +static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request, + struct ceph_osd_op *op) +{ + obj_request_done_set(obj_request); +} - q = rq->q; - - spin_lock_irq(q->queue_lock); - coll->status[index].done = 1; - coll->status[index].rc = ret; - coll->status[index].bytes = len; - max = min = coll->num_done; - while (max < coll->total && coll->status[max].done) - max++; - - for (i = min; i<max; i++) { - __blk_end_request(rq, coll->status[i].rc, - coll->status[i].bytes); - coll->num_done++; - kref_put(&coll->kref, rbd_coll_release); +static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) +{ + if (obj_request->callback) + obj_request->callback(obj_request); + else + complete_all(&obj_request->completion); +} + +static void rbd_osd_read_callback(struct rbd_obj_request *obj_request, + struct ceph_osd_op *op) +{ + u64 xferred; + + /* + * We support a 64-bit length, but ultimately it has to be + * passed to blk_end_request(), which takes an unsigned int. + */ + xferred = le64_to_cpu(op->extent.length); + rbd_assert(xferred < (u64) UINT_MAX); + if (obj_request->result == (s32) -ENOENT) { + zero_bio_chain(obj_request->bio_list, 0); + obj_request->result = 0; + } else if (xferred < obj_request->length && !obj_request->result) { + zero_bio_chain(obj_request->bio_list, xferred); + xferred = obj_request->length; } - spin_unlock_irq(q->queue_lock); + obj_request->xferred = xferred; + obj_request_done_set(obj_request); } -static void rbd_coll_end_req(struct rbd_request *req, - int ret, u64 len) +static void rbd_osd_write_callback(struct rbd_obj_request *obj_request, + struct ceph_osd_op *op) { - rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len); + obj_request->xferred = le64_to_cpu(op->extent.length); + obj_request_done_set(obj_request); } /* - * Send ceph osd request + * For a simple stat call there's nothing to do. We'll do more if + * this is part of a write sequence for a layered image. */ -static int rbd_do_request(struct request *rq, - struct rbd_device *rbd_dev, - struct ceph_snap_context *snapc, - u64 snapid, - const char *object_name, u64 ofs, u64 len, - struct bio *bio, - struct page **pages, - int num_pages, - int flags, - struct ceph_osd_req_op *ops, - struct rbd_req_coll *coll, - int coll_index, - void (*rbd_cb)(struct ceph_osd_request *req, - struct ceph_msg *msg), - struct ceph_osd_request **linger_req, - u64 *ver) -{ - struct ceph_osd_request *req; - struct ceph_file_layout *layout; - int ret; - u64 bno; - struct timespec mtime = CURRENT_TIME; - struct rbd_request *req_data; - struct ceph_osd_request_head *reqhead; - struct ceph_osd_client *osdc; +static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request, + struct ceph_osd_op *op) +{ + obj_request_done_set(obj_request); +} - req_data = kzalloc(sizeof(*req_data), GFP_NOIO); - if (!req_data) { - if (coll) - rbd_coll_end_req_index(rq, coll, coll_index, - -ENOMEM, len); - return -ENOMEM; +static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, + struct ceph_msg *msg) +{ + struct rbd_obj_request *obj_request = osd_req->r_priv; + struct ceph_osd_reply_head *reply_head; + struct ceph_osd_op *op; + u32 num_ops; + u16 opcode; + + rbd_assert(osd_req == obj_request->osd_req); + rbd_assert(!!obj_request->img_request ^ + (obj_request->which == BAD_WHICH)); + + obj_request->xferred = le32_to_cpu(msg->hdr.data_len); + reply_head = msg->front.iov_base; + obj_request->result = (s32) le32_to_cpu(reply_head->result); + obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); + + num_ops = le32_to_cpu(reply_head->num_ops); + WARN_ON(num_ops != 1); /* For now */ + + op = &reply_head->ops[0]; + opcode = le16_to_cpu(op->op); + switch (opcode) { + case CEPH_OSD_OP_READ: + rbd_osd_read_callback(obj_request, op); + break; + case CEPH_OSD_OP_WRITE: + rbd_osd_write_callback(obj_request, op); + break; + case CEPH_OSD_OP_STAT: + rbd_osd_stat_callback(obj_request, op); + break; + case CEPH_OSD_OP_CALL: + case CEPH_OSD_OP_NOTIFY_ACK: + case CEPH_OSD_OP_WATCH: + rbd_osd_trivial_callback(obj_request, op); + break; + default: + rbd_warn(NULL, "%s: unsupported op %hu\n", + obj_request->object_name, (unsigned short) opcode); + break; } - if (coll) { - req_data->coll = coll; - req_data->coll_index = coll_index; + if (obj_request_done_test(obj_request)) + rbd_obj_request_complete(obj_request); +} + +static struct ceph_osd_request *rbd_osd_req_create( + struct rbd_device *rbd_dev, + bool write_request, + struct rbd_obj_request *obj_request, + struct ceph_osd_req_op *op) +{ + struct rbd_img_request *img_request = obj_request->img_request; + struct ceph_snap_context *snapc = NULL; + struct ceph_osd_client *osdc; + struct ceph_osd_request *osd_req; + struct timespec now; + struct timespec *mtime; + u64 snap_id = CEPH_NOSNAP; + u64 offset = obj_request->offset; + u64 length = obj_request->length; + + if (img_request) { + rbd_assert(img_request->write_request == write_request); + if (img_request->write_request) + snapc = img_request->snapc; + else + snap_id = img_request->snap_id; } - dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n", - object_name, (unsigned long long) ofs, - (unsigned long long) len, coll, coll_index); + /* Allocate and initialize the request, for the single op */ osdc = &rbd_dev->rbd_client->client->osdc; - req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, - false, GFP_NOIO, pages, bio); - if (!req) { - ret = -ENOMEM; - goto done_pages; + osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC); + if (!osd_req) + return NULL; /* ENOMEM */ + + rbd_assert(obj_request_type_valid(obj_request->type)); + switch (obj_request->type) { + case OBJ_REQUEST_NODATA: + break; /* Nothing to do */ + case OBJ_REQUEST_BIO: + rbd_assert(obj_request->bio_list != NULL); + osd_req->r_bio = obj_request->bio_list; + break; + case OBJ_REQUEST_PAGES: + osd_req->r_pages = obj_request->pages; + osd_req->r_num_pages = obj_request->page_count; + osd_req->r_page_alignment = offset & ~PAGE_MASK; + break; } - req->r_callback = rbd_cb; + if (write_request) { + osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + now = CURRENT_TIME; + mtime = &now; + } else { + osd_req->r_flags = CEPH_OSD_FLAG_READ; + mtime = NULL; /* not needed for reads */ + offset = 0; /* These are not used... */ + length = 0; /* ...for osd read requests */ + } - req_data->rq = rq; - req_data->bio = bio; - req_data->pages = pages; - req_data->len = len; + osd_req->r_callback = rbd_osd_req_callback; + osd_req->r_priv = obj_request; - req->r_priv = req_data; + osd_req->r_oid_len = strlen(obj_request->object_name); + rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); + memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); - reqhead = req->r_request->front.iov_base; - reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); + osd_req->r_file_layout = rbd_dev->layout; /* struct */ - strncpy(req->r_oid, object_name, sizeof(req->r_oid)); - req->r_oid_len = strlen(req->r_oid); + /* osd_req will get its own reference to snapc (if non-null) */ - layout = &req->r_file_layout; - memset(layout, 0, sizeof(*layout)); - layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); - layout->fl_stripe_count = cpu_to_le32(1); - layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); - layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id); - ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, - req, ops); - rbd_assert(ret == 0); + ceph_osdc_build_request(osd_req, offset, length, 1, op, + snapc, snap_id, mtime); - ceph_osdc_build_request(req, ofs, &len, - ops, - snapc, - &mtime, - req->r_oid, req->r_oid_len); + return osd_req; +} - if (linger_req) { - ceph_osdc_set_request_linger(osdc, req); - *linger_req = req; - } +static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) +{ + ceph_osdc_put_request(osd_req); +} - ret = ceph_osdc_start_request(osdc, req, false); - if (ret < 0) - goto done_err; - - if (!rbd_cb) { - ret = ceph_osdc_wait_request(osdc, req); - if (ver) - *ver = le64_to_cpu(req->r_reassert_version.version); - dout("reassert_ver=%llu\n", - (unsigned long long) - le64_to_cpu(req->r_reassert_version.version)); - ceph_osdc_put_request(req); +/* object_name is assumed to be a non-null pointer and NUL-terminated */ + +static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, + u64 offset, u64 length, + enum obj_request_type type) +{ + struct rbd_obj_request *obj_request; + size_t size; + char *name; + + rbd_assert(obj_request_type_valid(type)); + + size = strlen(object_name) + 1; + obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL); + if (!obj_request) + return NULL; + + name = (char *)(obj_request + 1); + obj_request->object_name = memcpy(name, object_name, size); + obj_request->offset = offset; + obj_request->length = length; + obj_request->which = BAD_WHICH; + obj_request->type = type; + INIT_LIST_HEAD(&obj_request->links); + obj_request_done_init(obj_request); + init_completion(&obj_request->completion); + kref_init(&obj_request->kref); + + return obj_request; +} + +static void rbd_obj_request_destroy(struct kref *kref) +{ + struct rbd_obj_request *obj_request; + + obj_request = container_of(kref, struct rbd_obj_request, kref); + + rbd_assert(obj_request->img_request == NULL); + rbd_assert(obj_request->which == BAD_WHICH); + + if (obj_request->osd_req) + rbd_osd_req_destroy(obj_request->osd_req); + + rbd_assert(obj_request_type_valid(obj_request->type)); + switch (obj_request->type) { + case OBJ_REQUEST_NODATA: + break; /* Nothing to do */ + case OBJ_REQUEST_BIO: + if (obj_request->bio_list) + bio_chain_put(obj_request->bio_list); + break; + case OBJ_REQUEST_PAGES: + if (obj_request->pages) + ceph_release_page_vector(obj_request->pages, + obj_request->page_count); + break; } - return ret; -done_err: - bio_chain_put(req_data->bio); - ceph_osdc_put_request(req); -done_pages: - rbd_coll_end_req(req_data, ret, len); - kfree(req_data); - return ret; + kfree(obj_request); } /* - * Ceph osd op callback + * Caller is responsible for filling in the list of object requests + * that comprises the image request, and the Linux request pointer + * (if there is one). */ -static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) +struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev, + u64 offset, u64 length, + bool write_request) { - struct rbd_request *req_data = req->r_priv; - struct ceph_osd_reply_head *replyhead; - struct ceph_osd_op *op; - __s32 rc; - u64 bytes; - int read_op; - - /* parse reply */ - replyhead = msg->front.iov_base; - WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); - op = (void *)(replyhead + 1); - rc = le32_to_cpu(replyhead->result); - bytes = le64_to_cpu(op->extent.length); - read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ); - - dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n", - (unsigned long long) bytes, read_op, (int) rc); - - if (rc == -ENOENT && read_op) { - zero_bio_chain(req_data->bio, 0); - rc = 0; - } else if (rc == 0 && read_op && bytes < req_data->len) { - zero_bio_chain(req_data->bio, bytes); - bytes = req_data->len; - } + struct rbd_img_request *img_request; + struct ceph_snap_context *snapc = NULL; - rbd_coll_end_req(req_data, rc, bytes); + img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC); + if (!img_request) + return NULL; - if (req_data->bio) - bio_chain_put(req_data->bio); + if (write_request) { + down_read(&rbd_dev->header_rwsem); + snapc = ceph_get_snap_context(rbd_dev->header.snapc); + up_read(&rbd_dev->header_rwsem); + if (WARN_ON(!snapc)) { + kfree(img_request); + return NULL; /* Shouldn't happen */ + } + } - ceph_osdc_put_request(req); - kfree(req_data); + img_request->rq = NULL; + img_request->rbd_dev = rbd_dev; + img_request->offset = offset; + img_request->length = length; + img_request->write_request = write_request; + if (write_request) + img_request->snapc = snapc; + else + img_request->snap_id = rbd_dev->spec->snap_id; + spin_lock_init(&img_request->completion_lock); + img_request->next_completion = 0; + img_request->callback = NULL; + img_request->obj_request_count = 0; + INIT_LIST_HEAD(&img_request->obj_requests); + kref_init(&img_request->kref); + + rbd_img_request_get(img_request); /* Avoid a warning */ + rbd_img_request_put(img_request); /* TEMPORARY */ + + return img_request; } -static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) +static void rbd_img_request_destroy(struct kref *kref) { - ceph_osdc_put_request(req); + struct rbd_img_request *img_request; + struct rbd_obj_request *obj_request; + struct rbd_obj_request *next_obj_request; + + img_request = container_of(kref, struct rbd_img_request, kref); + + for_each_obj_request_safe(img_request, obj_request, next_obj_request) + rbd_img_obj_request_del(img_request, obj_request); + rbd_assert(img_request->obj_request_count == 0); + + if (img_request->write_request) + ceph_put_snap_context(img_request->snapc); + + kfree(img_request); } -/* - * Do a synchronous ceph osd operation - */ -static int rbd_req_sync_op(struct rbd_device *rbd_dev, - struct ceph_snap_context *snapc, - u64 snapid, - int flags, - struct ceph_osd_req_op *ops, - const char *object_name, - u64 ofs, u64 inbound_size, - char *inbound, - struct ceph_osd_request **linger_req, - u64 *ver) +static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, + struct bio *bio_list) { - int ret; - struct page **pages; - int num_pages; - - rbd_assert(ops != NULL); + struct rbd_device *rbd_dev = img_request->rbd_dev; + struct rbd_obj_request *obj_request = NULL; + struct rbd_obj_request *next_obj_request; + unsigned int bio_offset; + u64 image_offset; + u64 resid; + u16 opcode; + + opcode = img_request->write_request ? CEPH_OSD_OP_WRITE + : CEPH_OSD_OP_READ; + bio_offset = 0; + image_offset = img_request->offset; + rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT); + resid = img_request->length; + while (resid) { + const char *object_name; + unsigned int clone_size; + struct ceph_osd_req_op *op; + u64 offset; + u64 length; + + object_name = rbd_segment_name(rbd_dev, image_offset); + if (!object_name) + goto out_unwind; + offset = rbd_segment_offset(rbd_dev, image_offset); + length = rbd_segment_length(rbd_dev, image_offset, resid); + obj_request = rbd_obj_request_create(object_name, + offset, length, + OBJ_REQUEST_BIO); + kfree(object_name); /* object request has its own copy */ + if (!obj_request) + goto out_unwind; + + rbd_assert(length <= (u64) UINT_MAX); + clone_size = (unsigned int) length; + obj_request->bio_list = bio_chain_clone_range(&bio_list, + &bio_offset, clone_size, + GFP_ATOMIC); + if (!obj_request->bio_list) + goto out_partial; - num_pages = calc_pages_for(ofs, inbound_size); - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); + /* + * Build up the op to use in building the osd + * request. Note that the contents of the op are + * copied by rbd_osd_req_create(). + */ + op = rbd_osd_req_op_create(opcode, offset, length); + if (!op) + goto out_partial; + obj_request->osd_req = rbd_osd_req_create(rbd_dev, + img_request->write_request, + obj_request, op); + rbd_osd_req_op_destroy(op); + if (!obj_request->osd_req) + goto out_partial; + /* status and version are initially zero-filled */ + + rbd_img_obj_request_add(img_request, obj_request); + + image_offset += length; + resid -= length; + } - ret = rbd_do_request(NULL, rbd_dev, snapc, snapid, - object_name, ofs, inbound_size, NULL, - pages, num_pages, - flags, - ops, - NULL, 0, - NULL, - linger_req, ver); - if (ret < 0) - goto done; + return 0; - if ((flags & CEPH_OSD_FLAG_READ) && inbound) - ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret); +out_partial: + rbd_obj_request_put(obj_request); +out_unwind: + for_each_obj_request_safe(img_request, obj_request, next_obj_request) + rbd_obj_request_put(obj_request); -done: - ceph_release_page_vector(pages, num_pages); - return ret; + return -ENOMEM; } -/* - * Do an asynchronous ceph osd operation - */ -static int rbd_do_op(struct request *rq, - struct rbd_device *rbd_dev, - struct ceph_snap_context *snapc, - u64 ofs, u64 len, - struct bio *bio, - struct rbd_req_coll *coll, - int coll_index) -{ - char *seg_name; - u64 seg_ofs; - u64 seg_len; - int ret; - struct ceph_osd_req_op *ops; - u32 payload_len; - int opcode; - int flags; - u64 snapid; - - seg_name = rbd_segment_name(rbd_dev, ofs); - if (!seg_name) - return -ENOMEM; - seg_len = rbd_segment_length(rbd_dev, ofs, len); - seg_ofs = rbd_segment_offset(rbd_dev, ofs); - - if (rq_data_dir(rq) == WRITE) { - opcode = CEPH_OSD_OP_WRITE; - flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK; - snapid = CEPH_NOSNAP; - payload_len = seg_len; - } else { - opcode = CEPH_OSD_OP_READ; - flags = CEPH_OSD_FLAG_READ; - snapc = NULL; - snapid = rbd_dev->spec->snap_id; - payload_len = 0; - } +static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + u32 which = obj_request->which; + bool more = true; + + img_request = obj_request->img_request; + rbd_assert(img_request != NULL); + rbd_assert(img_request->rq != NULL); + rbd_assert(which != BAD_WHICH); + rbd_assert(which < img_request->obj_request_count); + rbd_assert(which >= img_request->next_completion); + + spin_lock_irq(&img_request->completion_lock); + if (which != img_request->next_completion) + goto out; - ret = -ENOMEM; - ops = rbd_create_rw_ops(1, opcode, payload_len); - if (!ops) - goto done; + for_each_obj_request_from(img_request, obj_request) { + unsigned int xferred; + int result; - /* we've taken care of segment sizes earlier when we - cloned the bios. We should never have a segment - truncated at this point */ - rbd_assert(seg_len == len); - - ret = rbd_do_request(rq, rbd_dev, snapc, snapid, - seg_name, seg_ofs, seg_len, - bio, - NULL, 0, - flags, - ops, - coll, coll_index, - rbd_req_cb, 0, NULL); - - rbd_destroy_ops(ops); -done: - kfree(seg_name); - return ret; + rbd_assert(more); + rbd_assert(which < img_request->obj_request_count); + + if (!obj_request_done_test(obj_request)) + break; + + rbd_assert(obj_request->xferred <= (u64) UINT_MAX); + xferred = (unsigned int) obj_request->xferred; + result = (int) obj_request->result; + if (result) + rbd_warn(NULL, "obj_request %s result %d xferred %u\n", + img_request->write_request ? "write" : "read", + result, xferred); + + more = blk_end_request(img_request->rq, result, xferred); + which++; + } + rbd_assert(more ^ (which == img_request->obj_request_count)); + img_request->next_completion = which; +out: + spin_unlock_irq(&img_request->completion_lock); + + if (!more) + rbd_img_request_complete(img_request); } -/* - * Request sync osd read - */ -static int rbd_req_sync_read(struct rbd_device *rbd_dev, - u64 snapid, - const char *object_name, - u64 ofs, u64 len, - char *buf, - u64 *ver) -{ - struct ceph_osd_req_op *ops; - int ret; +static int rbd_img_request_submit(struct rbd_img_request *img_request) +{ + struct rbd_device *rbd_dev = img_request->rbd_dev; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_obj_request *obj_request; - ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0); - if (!ops) - return -ENOMEM; + for_each_obj_request(img_request, obj_request) { + int ret; - ret = rbd_req_sync_op(rbd_dev, NULL, - snapid, - CEPH_OSD_FLAG_READ, - ops, object_name, ofs, len, buf, NULL, ver); - rbd_destroy_ops(ops); + obj_request->callback = rbd_img_obj_callback; + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + return ret; + /* + * The image request has its own reference to each + * of its object requests, so we can safely drop the + * initial one here. + */ + rbd_obj_request_put(obj_request); + } - return ret; + return 0; } -/* - * Request sync osd watch - */ -static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev, - u64 ver, - u64 notify_id) +static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, + u64 ver, u64 notify_id) { - struct ceph_osd_req_op *ops; + struct rbd_obj_request *obj_request; + struct ceph_osd_req_op *op; + struct ceph_osd_client *osdc; int ret; - ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0); - if (!ops) + obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, + OBJ_REQUEST_NODATA); + if (!obj_request) return -ENOMEM; - ops[0].watch.ver = cpu_to_le64(ver); - ops[0].watch.cookie = notify_id; - ops[0].watch.flag = 0; + ret = -ENOMEM; + op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver); + if (!op) + goto out; + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, + obj_request, op); + rbd_osd_req_op_destroy(op); + if (!obj_request->osd_req) + goto out; - ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP, - rbd_dev->header_name, 0, 0, NULL, - NULL, 0, - CEPH_OSD_FLAG_READ, - ops, - NULL, 0, - rbd_simple_req_cb, 0, NULL); + osdc = &rbd_dev->rbd_client->client->osdc; + obj_request->callback = rbd_obj_request_put; + ret = rbd_obj_request_submit(osdc, obj_request); +out: + if (ret) + rbd_obj_request_put(obj_request); - rbd_destroy_ops(ops); return ret; } @@ -1386,90 +1738,98 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) (unsigned int) opcode); rc = rbd_dev_refresh(rbd_dev, &hver); if (rc) - pr_warning(RBD_DRV_NAME "%d got notification but failed to " - " update snaps: %d\n", rbd_dev->major, rc); + rbd_warn(rbd_dev, "got notification but failed to " + " update snaps: %d\n", rc); - rbd_req_sync_notify_ack(rbd_dev, hver, notify_id); + rbd_obj_notify_ack(rbd_dev, hver, notify_id); } /* - * Request sync osd watch + * Request sync osd watch/unwatch. The value of "start" determines + * whether a watch request is being initiated or torn down. */ -static int rbd_req_sync_watch(struct rbd_device *rbd_dev) +static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) { - struct ceph_osd_req_op *ops; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_obj_request *obj_request; + struct ceph_osd_req_op *op; int ret; - ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0); - if (!ops) - return -ENOMEM; + rbd_assert(start ^ !!rbd_dev->watch_event); + rbd_assert(start ^ !!rbd_dev->watch_request); - ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, - (void *)rbd_dev, &rbd_dev->watch_event); - if (ret < 0) - goto fail; + if (start) { + ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, + &rbd_dev->watch_event); + if (ret < 0) + return ret; + rbd_assert(rbd_dev->watch_event != NULL); + } - ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version); - ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie); - ops[0].watch.flag = 1; + ret = -ENOMEM; + obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, + OBJ_REQUEST_NODATA); + if (!obj_request) + goto out_cancel; + + op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH, + rbd_dev->watch_event->cookie, + rbd_dev->header.obj_version, start); + if (!op) + goto out_cancel; + obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, + obj_request, op); + rbd_osd_req_op_destroy(op); + if (!obj_request->osd_req) + goto out_cancel; + + if (start) + ceph_osdc_set_request_linger(osdc, obj_request->osd_req); + else + ceph_osdc_unregister_linger_request(osdc, + rbd_dev->watch_request->osd_req); + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + goto out_cancel; + ret = rbd_obj_request_wait(obj_request); + if (ret) + goto out_cancel; + ret = obj_request->result; + if (ret) + goto out_cancel; - ret = rbd_req_sync_op(rbd_dev, NULL, - CEPH_NOSNAP, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - ops, - rbd_dev->header_name, - 0, 0, NULL, - &rbd_dev->watch_request, NULL); + /* + * A watch request is set to linger, so the underlying osd + * request won't go away until we unregister it. We retain + * a pointer to the object request during that time (in + * rbd_dev->watch_request), so we'll keep a reference to + * it. We'll drop that reference (below) after we've + * unregistered it. + */ + if (start) { + rbd_dev->watch_request = obj_request; - if (ret < 0) - goto fail_event; + return 0; + } - rbd_destroy_ops(ops); - return 0; + /* We have successfully torn down the watch request */ -fail_event: + rbd_obj_request_put(rbd_dev->watch_request); + rbd_dev->watch_request = NULL; +out_cancel: + /* Cancel the event if we're tearing down, or on error */ ceph_osdc_cancel_event(rbd_dev->watch_event); rbd_dev->watch_event = NULL; -fail: - rbd_destroy_ops(ops); - return ret; -} - -/* - * Request sync osd unwatch - */ -static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev) -{ - struct ceph_osd_req_op *ops; - int ret; + if (obj_request) + rbd_obj_request_put(obj_request); - ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0); - if (!ops) - return -ENOMEM; - - ops[0].watch.ver = 0; - ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie); - ops[0].watch.flag = 0; - - ret = rbd_req_sync_op(rbd_dev, NULL, - CEPH_NOSNAP, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - ops, - rbd_dev->header_name, - 0, 0, NULL, NULL, NULL); - - - rbd_destroy_ops(ops); - ceph_osdc_cancel_event(rbd_dev->watch_event); - rbd_dev->watch_event = NULL; return ret; } /* * Synchronous osd object method call */ -static int rbd_req_sync_exec(struct rbd_device *rbd_dev, +static int rbd_obj_method_sync(struct rbd_device *rbd_dev, const char *object_name, const char *class_name, const char *method_name, @@ -1477,169 +1837,143 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev, size_t outbound_size, char *inbound, size_t inbound_size, - int flags, - u64 *ver) + u64 *version) { - struct ceph_osd_req_op *ops; - int class_name_len = strlen(class_name); - int method_name_len = strlen(method_name); - int payload_size; + struct rbd_obj_request *obj_request; + struct ceph_osd_client *osdc; + struct ceph_osd_req_op *op; + struct page **pages; + u32 page_count; int ret; /* - * Any input parameters required by the method we're calling - * will be sent along with the class and method names as - * part of the message payload. That data and its size are - * supplied via the indata and indata_len fields (named from - * the perspective of the server side) in the OSD request - * operation. + * Method calls are ultimately read operations but they + * don't involve object data (so no offset or length). + * The result should placed into the inbound buffer + * provided. They also supply outbound data--parameters for + * the object method. Currently if this is present it will + * be a snapshot id. */ - payload_size = class_name_len + method_name_len + outbound_size; - ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size); - if (!ops) - return -ENOMEM; + page_count = (u32) calc_pages_for(0, inbound_size); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); - ops[0].cls.class_name = class_name; - ops[0].cls.class_len = (__u8) class_name_len; - ops[0].cls.method_name = method_name; - ops[0].cls.method_len = (__u8) method_name_len; - ops[0].cls.argc = 0; - ops[0].cls.indata = outbound; - ops[0].cls.indata_len = outbound_size; + ret = -ENOMEM; + obj_request = rbd_obj_request_create(object_name, 0, 0, + OBJ_REQUEST_PAGES); + if (!obj_request) + goto out; - ret = rbd_req_sync_op(rbd_dev, NULL, - CEPH_NOSNAP, - flags, ops, - object_name, 0, inbound_size, inbound, - NULL, ver); + obj_request->pages = pages; + obj_request->page_count = page_count; - rbd_destroy_ops(ops); + op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name, + method_name, outbound, outbound_size); + if (!op) + goto out; + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, + obj_request, op); + rbd_osd_req_op_destroy(op); + if (!obj_request->osd_req) + goto out; - dout("cls_exec returned %d\n", ret); - return ret; -} + osdc = &rbd_dev->rbd_client->client->osdc; + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + goto out; + ret = rbd_obj_request_wait(obj_request); + if (ret) + goto out; -static struct rbd_req_coll *rbd_alloc_coll(int num_reqs) -{ - struct rbd_req_coll *coll = - kzalloc(sizeof(struct rbd_req_coll) + - sizeof(struct rbd_req_status) * num_reqs, - GFP_ATOMIC); + ret = obj_request->result; + if (ret < 0) + goto out; + ret = 0; + ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); + if (version) + *version = obj_request->version; +out: + if (obj_request) + rbd_obj_request_put(obj_request); + else + ceph_release_page_vector(pages, page_count); - if (!coll) - return NULL; - coll->total = num_reqs; - kref_init(&coll->kref); - return coll; + return ret; } -/* - * block device queue callback - */ -static void rbd_rq_fn(struct request_queue *q) +static void rbd_request_fn(struct request_queue *q) { struct rbd_device *rbd_dev = q->queuedata; + bool read_only = rbd_dev->mapping.read_only; struct request *rq; + int result; while ((rq = blk_fetch_request(q))) { - struct bio *bio; - bool do_write; - unsigned int size; - u64 ofs; - int num_segs, cur_seg = 0; - struct rbd_req_coll *coll; - struct ceph_snap_context *snapc; - unsigned int bio_offset; - - dout("fetched request\n"); - - /* filter out block requests we don't understand */ - if ((rq->cmd_type != REQ_TYPE_FS)) { - __blk_end_request_all(rq, 0); - continue; - } + bool write_request = rq_data_dir(rq) == WRITE; + struct rbd_img_request *img_request; + u64 offset; + u64 length; + + /* Ignore any non-FS requests that filter through. */ - /* deduce our operation (read, write) */ - do_write = (rq_data_dir(rq) == WRITE); - if (do_write && rbd_dev->mapping.read_only) { - __blk_end_request_all(rq, -EROFS); + if (rq->cmd_type != REQ_TYPE_FS) { + __blk_end_request_all(rq, 0); continue; } spin_unlock_irq(q->queue_lock); - down_read(&rbd_dev->header_rwsem); + /* Disallow writes to a read-only device */ - if (!rbd_dev->exists) { - rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); - up_read(&rbd_dev->header_rwsem); - dout("request for non-existent snapshot"); - spin_lock_irq(q->queue_lock); - __blk_end_request_all(rq, -ENXIO); - continue; + if (write_request) { + result = -EROFS; + if (read_only) + goto end_request; + rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); } - snapc = ceph_get_snap_context(rbd_dev->header.snapc); - - up_read(&rbd_dev->header_rwsem); - - size = blk_rq_bytes(rq); - ofs = blk_rq_pos(rq) * SECTOR_SIZE; - bio = rq->bio; - - dout("%s 0x%x bytes at 0x%llx\n", - do_write ? "write" : "read", - size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); - - num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); - if (num_segs <= 0) { - spin_lock_irq(q->queue_lock); - __blk_end_request_all(rq, num_segs); - ceph_put_snap_context(snapc); - continue; - } - coll = rbd_alloc_coll(num_segs); - if (!coll) { - spin_lock_irq(q->queue_lock); - __blk_end_request_all(rq, -ENOMEM); - ceph_put_snap_context(snapc); - continue; + /* + * Quit early if the mapped snapshot no longer + * exists. It's still possible the snapshot will + * have disappeared by the time our request arrives + * at the osd, but there's no sense in sending it if + * we already know. + */ + if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { + dout("request for non-existent snapshot"); + rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); + result = -ENXIO; + goto end_request; } - bio_offset = 0; - do { - u64 limit = rbd_segment_length(rbd_dev, ofs, size); - unsigned int chain_size; - struct bio *bio_chain; + offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; + length = (u64) blk_rq_bytes(rq); - BUG_ON(limit > (u64) UINT_MAX); - chain_size = (unsigned int) limit; - dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); + result = -EINVAL; + if (WARN_ON(offset && length > U64_MAX - offset + 1)) + goto end_request; /* Shouldn't happen */ - kref_get(&coll->kref); + result = -ENOMEM; + img_request = rbd_img_request_create(rbd_dev, offset, length, + write_request); + if (!img_request) + goto end_request; - /* Pass a cloned bio chain via an osd request */ - - bio_chain = bio_chain_clone_range(&bio, - &bio_offset, chain_size, - GFP_ATOMIC); - if (bio_chain) - (void) rbd_do_op(rq, rbd_dev, snapc, - ofs, chain_size, - bio_chain, coll, cur_seg); - else - rbd_coll_end_req_index(rq, coll, cur_seg, - -ENOMEM, chain_size); - size -= chain_size; - ofs += chain_size; - - cur_seg++; - } while (size > 0); - kref_put(&coll->kref, rbd_coll_release); + img_request->rq = rq; + result = rbd_img_request_fill_bio(img_request, rq->bio); + if (!result) + result = rbd_img_request_submit(img_request); + if (result) + rbd_img_request_put(img_request); +end_request: spin_lock_irq(q->queue_lock); - - ceph_put_snap_context(snapc); + if (result < 0) { + rbd_warn(rbd_dev, "obj_request %s result %d\n", + write_request ? "write" : "read", result); + __blk_end_request_all(rq, result); + } } } @@ -1703,6 +2037,71 @@ static void rbd_free_disk(struct rbd_device *rbd_dev) put_disk(disk); } +static int rbd_obj_read_sync(struct rbd_device *rbd_dev, + const char *object_name, + u64 offset, u64 length, + char *buf, u64 *version) + +{ + struct ceph_osd_req_op *op; + struct rbd_obj_request *obj_request; + struct ceph_osd_client *osdc; + struct page **pages = NULL; + u32 page_count; + size_t size; + int ret; + + page_count = (u32) calc_pages_for(offset, length); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) + ret = PTR_ERR(pages); + + ret = -ENOMEM; + obj_request = rbd_obj_request_create(object_name, offset, length, + OBJ_REQUEST_PAGES); + if (!obj_request) + goto out; + + obj_request->pages = pages; + obj_request->page_count = page_count; + + op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length); + if (!op) + goto out; + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, + obj_request, op); + rbd_osd_req_op_destroy(op); + if (!obj_request->osd_req) + goto out; + + osdc = &rbd_dev->rbd_client->client->osdc; + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + goto out; + ret = rbd_obj_request_wait(obj_request); + if (ret) + goto out; + + ret = obj_request->result; + if (ret < 0) + goto out; + + rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); + size = (size_t) obj_request->xferred; + ceph_copy_from_page_vector(pages, buf, 0, size); + rbd_assert(size <= (size_t) INT_MAX); + ret = (int) size; + if (version) + *version = obj_request->version; +out: + if (obj_request) + rbd_obj_request_put(obj_request); + else + ceph_release_page_vector(pages, page_count); + + return ret; +} + /* * Read the complete header for the given rbd device. * @@ -1741,24 +2140,20 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) if (!ondisk) return ERR_PTR(-ENOMEM); - ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP, - rbd_dev->header_name, + ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name, 0, size, (char *) ondisk, version); - if (ret < 0) goto out_err; if (WARN_ON((size_t) ret < size)) { ret = -ENXIO; - pr_warning("short header read for image %s" - " (want %zd got %d)\n", - rbd_dev->spec->image_name, size, ret); + rbd_warn(rbd_dev, "short header read (want %zd got %d)", + size, ret); goto out_err; } if (!rbd_dev_ondisk_valid(ondisk)) { ret = -ENXIO; - pr_warning("invalid header for image %s\n", - rbd_dev->spec->image_name); + rbd_warn(rbd_dev, "invalid header"); goto out_err; } @@ -1895,8 +2290,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) disk->fops = &rbd_bd_ops; disk->private_data = rbd_dev; - /* init rq */ - q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); + q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); if (!q) goto out_disk; @@ -2243,6 +2637,7 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, return NULL; spin_lock_init(&rbd_dev->lock); + rbd_dev->flags = 0; INIT_LIST_HEAD(&rbd_dev->node); INIT_LIST_HEAD(&rbd_dev->snaps); init_rwsem(&rbd_dev->header_rwsem); @@ -2250,6 +2645,13 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->spec = spec; rbd_dev->rbd_client = rbdc; + /* Initialize the layout used for all rbd requests */ + + rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); + rbd_dev->layout.fl_stripe_count = cpu_to_le32(1); + rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); + rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id); + return rbd_dev; } @@ -2360,12 +2762,11 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, __le64 size; } __attribute__ ((packed)) size_buf = { 0 }; - ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_size", (char *) &snapid, sizeof (snapid), - (char *) &size_buf, sizeof (size_buf), - CEPH_OSD_FLAG_READ, NULL); - dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); + (char *) &size_buf, sizeof (size_buf), NULL); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; @@ -2396,15 +2797,13 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) if (!reply_buf) return -ENOMEM; - ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_object_prefix", NULL, 0, - reply_buf, RBD_OBJ_PREFIX_LEN_MAX, - CEPH_OSD_FLAG_READ, NULL); - dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); + reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; - ret = 0; /* rbd_req_sync_exec() can return positive */ p = reply_buf; rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, @@ -2435,12 +2834,12 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, u64 incompat; int ret; - ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_features", (char *) &snapid, sizeof (snapid), (char *) &features_buf, sizeof (features_buf), - CEPH_OSD_FLAG_READ, NULL); - dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); + NULL); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; @@ -2474,7 +2873,6 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) void *end; char *image_id; u64 overlap; - size_t len = 0; int ret; parent_spec = rbd_spec_alloc(); @@ -2492,12 +2890,11 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) } snapid = cpu_to_le64(CEPH_NOSNAP); - ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_parent", (char *) &snapid, sizeof (snapid), - (char *) reply_buf, size, - CEPH_OSD_FLAG_READ, NULL); - dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); + (char *) reply_buf, size, NULL); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out_err; @@ -2508,13 +2905,18 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) if (parent_spec->pool_id == CEPH_NOPOOL) goto out; /* No parent? No problem. */ - image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); + /* The ceph file layout needs to fit pool id in 32 bits */ + + ret = -EIO; + if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX)) + goto out; + + image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); if (IS_ERR(image_id)) { ret = PTR_ERR(image_id); goto out_err; } parent_spec->image_id = image_id; - parent_spec->image_id_len = len; ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); ceph_decode_64_safe(&p, end, overlap, out_err); @@ -2544,26 +2946,25 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev) rbd_assert(!rbd_dev->spec->image_name); - image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len; + len = strlen(rbd_dev->spec->image_id); + image_id_size = sizeof (__le32) + len; image_id = kmalloc(image_id_size, GFP_KERNEL); if (!image_id) return NULL; p = image_id; end = (char *) image_id + image_id_size; - ceph_encode_string(&p, end, rbd_dev->spec->image_id, - (u32) rbd_dev->spec->image_id_len); + ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len); size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; reply_buf = kmalloc(size, GFP_KERNEL); if (!reply_buf) goto out; - ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY, + ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, "rbd", "dir_get_name", image_id, image_id_size, - (char *) reply_buf, size, - CEPH_OSD_FLAG_READ, NULL); + (char *) reply_buf, size, NULL); if (ret < 0) goto out; p = reply_buf; @@ -2602,8 +3003,11 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) osdc = &rbd_dev->rbd_client->client->osdc; name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); - if (!name) - return -EIO; /* pool id too large (>= 2^31) */ + if (!name) { + rbd_warn(rbd_dev, "there is no pool with id %llu", + rbd_dev->spec->pool_id); /* Really a BUG() */ + return -EIO; + } rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); if (!rbd_dev->spec->pool_name) @@ -2612,19 +3016,17 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) /* Fetch the image name; tolerate failure here */ name = rbd_dev_image_name(rbd_dev); - if (name) { - rbd_dev->spec->image_name_len = strlen(name); + if (name) rbd_dev->spec->image_name = (char *) name; - } else { - pr_warning(RBD_DRV_NAME "%d " - "unable to get image name for image id %s\n", - rbd_dev->major, rbd_dev->spec->image_id); - } + else + rbd_warn(rbd_dev, "unable to get image name"); /* Look up the snapshot name. */ name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); if (!name) { + rbd_warn(rbd_dev, "no snapshot with id %llu", + rbd_dev->spec->snap_id); /* Really a BUG() */ ret = -EIO; goto out_err; } @@ -2665,12 +3067,11 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) if (!reply_buf) return -ENOMEM; - ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_snapcontext", NULL, 0, - reply_buf, size, - CEPH_OSD_FLAG_READ, ver); - dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); + reply_buf, size, ver); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; @@ -2735,12 +3136,11 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) return ERR_PTR(-ENOMEM); snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]); - ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, "rbd", "get_snapshot_name", (char *) &snap_id, sizeof (snap_id), - reply_buf, size, - CEPH_OSD_FLAG_READ, NULL); - dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); + reply_buf, size, NULL); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; @@ -2766,7 +3166,7 @@ out: static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which, u64 *snap_size, u64 *snap_features) { - __le64 snap_id; + u64 snap_id; u8 order; int ret; @@ -2865,10 +3265,17 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev) if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) { struct list_head *next = links->next; - /* Existing snapshot not in the new snap context */ - + /* + * A previously-existing snapshot is not in + * the new snap context. + * + * If the now missing snapshot is the one the + * image is mapped to, clear its exists flag + * so we can avoid sending any more requests + * to it. + */ if (rbd_dev->spec->snap_id == snap->id) - rbd_dev->exists = false; + clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); rbd_remove_snap_dev(snap); dout("%ssnap id %llu has been removed\n", rbd_dev->spec->snap_id == snap->id ? @@ -2983,22 +3390,6 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev) device_unregister(&rbd_dev->dev); } -static int rbd_init_watch_dev(struct rbd_device *rbd_dev) -{ - int ret, rc; - - do { - ret = rbd_req_sync_watch(rbd_dev); - if (ret == -ERANGE) { - rc = rbd_dev_refresh(rbd_dev, NULL); - if (rc < 0) - return rc; - } - } while (ret == -ERANGE); - - return ret; -} - static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); /* @@ -3138,11 +3529,9 @@ static inline char *dup_token(const char **buf, size_t *lenp) size_t len; len = next_token(buf); - dup = kmalloc(len + 1, GFP_KERNEL); + dup = kmemdup(*buf, len + 1, GFP_KERNEL); if (!dup) return NULL; - - memcpy(dup, *buf, len); *(dup + len) = '\0'; *buf += len; @@ -3210,8 +3599,10 @@ static int rbd_add_parse_args(const char *buf, /* The first four tokens are required */ len = next_token(&buf); - if (!len) - return -EINVAL; /* Missing monitor address(es) */ + if (!len) { + rbd_warn(NULL, "no monitor address(es) provided"); + return -EINVAL; + } mon_addrs = buf; mon_addrs_size = len + 1; buf += len; @@ -3220,8 +3611,10 @@ static int rbd_add_parse_args(const char *buf, options = dup_token(&buf, NULL); if (!options) return -ENOMEM; - if (!*options) - goto out_err; /* Missing options */ + if (!*options) { + rbd_warn(NULL, "no options provided"); + goto out_err; + } spec = rbd_spec_alloc(); if (!spec) @@ -3230,14 +3623,18 @@ static int rbd_add_parse_args(const char *buf, spec->pool_name = dup_token(&buf, NULL); if (!spec->pool_name) goto out_mem; - if (!*spec->pool_name) - goto out_err; /* Missing pool name */ + if (!*spec->pool_name) { + rbd_warn(NULL, "no pool name provided"); + goto out_err; + } - spec->image_name = dup_token(&buf, &spec->image_name_len); + spec->image_name = dup_token(&buf, NULL); if (!spec->image_name) goto out_mem; - if (!*spec->image_name) - goto out_err; /* Missing image name */ + if (!*spec->image_name) { + rbd_warn(NULL, "no image name provided"); + goto out_err; + } /* * Snapshot name is optional; default is to use "-" @@ -3251,10 +3648,9 @@ static int rbd_add_parse_args(const char *buf, ret = -ENAMETOOLONG; goto out_err; } - spec->snap_name = kmalloc(len + 1, GFP_KERNEL); + spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL); if (!spec->snap_name) goto out_mem; - memcpy(spec->snap_name, buf, len); *(spec->snap_name + len) = '\0'; /* Initialize all rbd options to the defaults */ @@ -3323,7 +3719,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) * First, see if the format 2 image id file exists, and if * so, get the image's persistent id from it. */ - size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len; + size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name); object_name = kmalloc(size, GFP_NOIO); if (!object_name) return -ENOMEM; @@ -3339,21 +3735,18 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) goto out; } - ret = rbd_req_sync_exec(rbd_dev, object_name, + ret = rbd_obj_method_sync(rbd_dev, object_name, "rbd", "get_id", NULL, 0, - response, RBD_IMAGE_ID_LEN_MAX, - CEPH_OSD_FLAG_READ, NULL); - dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); + response, RBD_IMAGE_ID_LEN_MAX, NULL); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; - ret = 0; /* rbd_req_sync_exec() can return positive */ p = response; rbd_dev->spec->image_id = ceph_extract_encoded_string(&p, p + RBD_IMAGE_ID_LEN_MAX, - &rbd_dev->spec->image_id_len, - GFP_NOIO); + NULL, GFP_NOIO); if (IS_ERR(rbd_dev->spec->image_id)) { ret = PTR_ERR(rbd_dev->spec->image_id); rbd_dev->spec->image_id = NULL; @@ -3377,11 +3770,10 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL); if (!rbd_dev->spec->image_id) return -ENOMEM; - rbd_dev->spec->image_id_len = 0; /* Record the header object name for this rbd image. */ - size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX); + size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX); rbd_dev->header_name = kmalloc(size, GFP_KERNEL); if (!rbd_dev->header_name) { ret = -ENOMEM; @@ -3427,7 +3819,7 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) * Image id was filled in by the caller. Record the header * object name for this rbd image. */ - size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len; + size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id); rbd_dev->header_name = kmalloc(size, GFP_KERNEL); if (!rbd_dev->header_name) return -ENOMEM; @@ -3542,7 +3934,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) if (ret) goto err_out_bus; - ret = rbd_init_watch_dev(rbd_dev); + ret = rbd_dev_header_watch_sync(rbd_dev, 1); if (ret) goto err_out_bus; @@ -3638,6 +4030,13 @@ static ssize_t rbd_add(struct bus_type *bus, goto err_out_client; spec->pool_id = (u64) rc; + /* The ceph file layout needs to fit pool id in 32 bits */ + + if (WARN_ON(spec->pool_id > (u64) U32_MAX)) { + rc = -EIO; + goto err_out_client; + } + rbd_dev = rbd_dev_create(rbdc, spec); if (!rbd_dev) goto err_out_client; @@ -3691,15 +4090,8 @@ static void rbd_dev_release(struct device *dev) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - if (rbd_dev->watch_request) { - struct ceph_client *client = rbd_dev->rbd_client->client; - - ceph_osdc_unregister_linger_request(&client->osdc, - rbd_dev->watch_request); - } if (rbd_dev->watch_event) - rbd_req_sync_unwatch(rbd_dev); - + rbd_dev_header_watch_sync(rbd_dev, 0); /* clean up and free blkdev */ rbd_free_disk(rbd_dev); @@ -3743,10 +4135,14 @@ static ssize_t rbd_remove(struct bus_type *bus, goto done; } - if (rbd_dev->open_count) { + spin_lock_irq(&rbd_dev->lock); + if (rbd_dev->open_count) ret = -EBUSY; + else + set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags); + spin_unlock_irq(&rbd_dev->lock); + if (ret < 0) goto done; - } rbd_remove_all_snaps(rbd_dev); rbd_bus_del_dev(rbd_dev); @@ -3786,6 +4182,11 @@ int __init rbd_init(void) { int rc; + if (!libceph_compatible(NULL)) { + rbd_warn(NULL, "libceph incompatibility (quitting)"); + + return -EINVAL; + } rc = rbd_sysfs_init(); if (rc) return rc; diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 064d1a6..fc61371 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -315,7 +315,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, 0, ci->i_truncate_seq, ci->i_truncate_size, - NULL, false, 1, 0); + NULL, false, 0); if (IS_ERR(req)) return PTR_ERR(req); @@ -492,8 +492,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) &ci->i_layout, snapc, page_off, len, ci->i_truncate_seq, ci->i_truncate_size, - &inode->i_mtime, - &page, 1, 0, 0, true); + &inode->i_mtime, &page, 1); if (err < 0) { dout("writepage setting page/mapping error %d %p\n", err, page); SetPageError(page); @@ -838,7 +837,7 @@ get_more_pages: snapc, do_sync, ci->i_truncate_seq, ci->i_truncate_size, - &inode->i_mtime, true, 1, 0); + &inode->i_mtime, true, 0); if (IS_ERR(req)) { rc = PTR_ERR(req); diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index a1d9bb3..1e1e020 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -611,8 +611,16 @@ retry: if (flags & CEPH_CAP_FLAG_AUTH) ci->i_auth_cap = cap; - else if (ci->i_auth_cap == cap) + else if (ci->i_auth_cap == cap) { ci->i_auth_cap = NULL; + spin_lock(&mdsc->cap_dirty_lock); + if (!list_empty(&ci->i_dirty_item)) { + dout(" moving %p to cap_dirty_migrating\n", inode); + list_move(&ci->i_dirty_item, + &mdsc->cap_dirty_migrating); + } + spin_unlock(&mdsc->cap_dirty_lock); + } dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", inode, ceph_vinop(inode), cap, ceph_cap_string(issued), @@ -1460,7 +1468,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; - int file_wanted, used; + int file_wanted, used, cap_used; int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ int issued, implemented, want, retain, revoking, flushing = 0; int mds = -1; /* keep track of how far we've gone through i_caps list @@ -1563,9 +1571,14 @@ retry_locked: /* NOTE: no side-effects allowed, until we take s_mutex */ + cap_used = used; + if (ci->i_auth_cap && cap != ci->i_auth_cap) + cap_used &= ~ci->i_auth_cap->issued; + revoking = cap->implemented & ~cap->issued; - dout(" mds%d cap %p issued %s implemented %s revoking %s\n", + dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n", cap->mds, cap, ceph_cap_string(cap->issued), + ceph_cap_string(cap_used), ceph_cap_string(cap->implemented), ceph_cap_string(revoking)); @@ -1593,7 +1606,7 @@ retry_locked: } /* completed revocation? going down and there are no caps? */ - if (revoking && (revoking & used) == 0) { + if (revoking && (revoking & cap_used) == 0) { dout("completed revocation of %s\n", ceph_cap_string(cap->implemented & ~cap->issued)); goto ack; @@ -1670,8 +1683,8 @@ ack: sent++; /* __send_cap drops i_ceph_lock */ - delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want, - retain, flushing, NULL); + delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, + want, retain, flushing, NULL); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -2416,7 +2429,9 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, dout("mds wanted %s -> %s\n", ceph_cap_string(le32_to_cpu(grant->wanted)), ceph_cap_string(wanted)); - grant->wanted = cpu_to_le32(wanted); + /* imported cap may not have correct mds_wanted */ + if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) + check_caps = 1; } cap->seq = seq; @@ -2820,6 +2835,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, (unsigned)seq); + if (op == CEPH_CAP_OP_IMPORT) + ceph_add_cap_releases(mdsc, session); + /* lookup ino */ inode = ceph_find_inode(sb, vino); ci = ceph_inode(inode); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index e51558f..9c4325e 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -243,6 +243,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, err = ceph_mdsc_do_request(mdsc, (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, req); + if (err) + goto out_err; + err = ceph_handle_snapdir(req, dentry, err); if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); @@ -263,6 +266,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, err = finish_no_open(file, dn); } else { dout("atomic_open finish_open on dn %p\n", dn); + if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) { + *opened |= FILE_CREATED; + } err = finish_open(file, dentry, ceph_open, opened); } @@ -535,7 +541,7 @@ more: ci->i_snap_realm->cached_context, do_sync, ci->i_truncate_seq, ci->i_truncate_size, - &mtime, false, 2, page_align); + &mtime, false, page_align); if (IS_ERR(req)) return PTR_ERR(req); diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 36549a4..3b22150 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -194,7 +194,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) return -EFAULT; down_read(&osdc->map_sem); - r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len, + r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len, &dl.object_no, &dl.object_offset, &olen); if (r < 0) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 9165eb8..d958420 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -233,6 +233,30 @@ bad: } /* + * parse create results + */ +static int parse_reply_info_create(void **p, void *end, + struct ceph_mds_reply_info_parsed *info, + int features) +{ + if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { + if (*p == end) { + info->has_create_ino = false; + } else { + info->has_create_ino = true; + info->ino = ceph_decode_64(p); + } + } + + if (unlikely(*p != end)) + goto bad; + return 0; + +bad: + return -EIO; +} + +/* * parse extra results */ static int parse_reply_info_extra(void **p, void *end, @@ -241,8 +265,12 @@ static int parse_reply_info_extra(void **p, void *end, { if (info->head->op == CEPH_MDS_OP_GETFILELOCK) return parse_reply_info_filelock(p, end, info, features); - else + else if (info->head->op == CEPH_MDS_OP_READDIR) return parse_reply_info_dir(p, end, info, features); + else if (info->head->op == CEPH_MDS_OP_CREATE) + return parse_reply_info_create(p, end, info, features); + else + return -EIO; } /* @@ -2170,7 +2198,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) mutex_lock(&req->r_fill_mutex); err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); if (err == 0) { - if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK && + if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || + req->r_op == CEPH_MDS_OP_LSSNAP) && rinfo->dir_nr) ceph_readdir_prepopulate(req, req->r_session); ceph_unreserve_caps(mdsc, &req->r_caps_reservation); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index dd26846..567f7c6 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -74,6 +74,12 @@ struct ceph_mds_reply_info_parsed { struct ceph_mds_reply_info_in *dir_in; u8 dir_complete, dir_end; }; + + /* for create results */ + struct { + bool has_create_ino; + u64 ino; + }; }; /* encoded blob describing snapshot contexts for certain diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c index cd5097d..89fa4a9 100644 --- a/fs/ceph/strings.c +++ b/fs/ceph/strings.c @@ -15,6 +15,7 @@ const char *ceph_mds_state_name(int s) case CEPH_MDS_STATE_BOOT: return "up:boot"; case CEPH_MDS_STATE_STANDBY: return "up:standby"; case CEPH_MDS_STATE_STANDBY_REPLAY: return "up:standby-replay"; + case CEPH_MDS_STATE_REPLAYONCE: return "up:oneshot-replay"; case CEPH_MDS_STATE_CREATING: return "up:creating"; case CEPH_MDS_STATE_STARTING: return "up:starting"; /* up and in */ @@ -50,10 +51,13 @@ const char *ceph_mds_op_name(int op) case CEPH_MDS_OP_LOOKUP: return "lookup"; case CEPH_MDS_OP_LOOKUPHASH: return "lookuphash"; case CEPH_MDS_OP_LOOKUPPARENT: return "lookupparent"; + case CEPH_MDS_OP_LOOKUPINO: return "lookupino"; case CEPH_MDS_OP_GETATTR: return "getattr"; case CEPH_MDS_OP_SETXATTR: return "setxattr"; case CEPH_MDS_OP_SETATTR: return "setattr"; case CEPH_MDS_OP_RMXATTR: return "rmxattr"; + case CEPH_MDS_OP_SETLAYOUT: return "setlayou"; + case CEPH_MDS_OP_SETDIRLAYOUT: return "setdirlayout"; case CEPH_MDS_OP_READDIR: return "readdir"; case CEPH_MDS_OP_MKNOD: return "mknod"; case CEPH_MDS_OP_LINK: return "link"; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 66ebe72..9861cce 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -798,13 +798,7 @@ extern int ceph_mmap(struct file *file, struct vm_area_struct *vma); /* file.c */ extern const struct file_operations ceph_file_fops; extern const struct address_space_operations ceph_aops; -extern int ceph_copy_to_page_vector(struct page **pages, - const char *data, - loff_t off, size_t len); -extern int ceph_copy_from_page_vector(struct page **pages, - char *data, - loff_t off, size_t len); -extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags); + extern int ceph_open(struct inode *inode, struct file *file); extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, struct file *file, unsigned flags, umode_t mode, diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 2c2ae5b..2135817 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -29,9 +29,94 @@ struct ceph_vxattr { size_t name_size; /* strlen(name) + 1 (for '\0') */ size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val, size_t size); - bool readonly; + bool readonly, hidden; + bool (*exists_cb)(struct ceph_inode_info *ci); }; +/* layouts */ + +static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci) +{ + size_t s; + char *p = (char *)&ci->i_layout; + + for (s = 0; s < sizeof(ci->i_layout); s++, p++) + if (*p) + return true; + return false; +} + +static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, + size_t size) +{ + int ret; + struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); + struct ceph_osd_client *osdc = &fsc->client->osdc; + s64 pool = ceph_file_layout_pg_pool(ci->i_layout); + const char *pool_name; + + dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode); + down_read(&osdc->map_sem); + pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); + if (pool_name) + ret = snprintf(val, size, + "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%s", + (unsigned long long)ceph_file_layout_su(ci->i_layout), + (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), + (unsigned long long)ceph_file_layout_object_size(ci->i_layout), + pool_name); + else + ret = snprintf(val, size, + "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld", + (unsigned long long)ceph_file_layout_su(ci->i_layout), + (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), + (unsigned long long)ceph_file_layout_object_size(ci->i_layout), + (unsigned long long)pool); + + up_read(&osdc->map_sem); + return ret; +} + +static size_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci, + char *val, size_t size) +{ + return snprintf(val, size, "%lld", + (unsigned long long)ceph_file_layout_su(ci->i_layout)); +} + +static size_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci, + char *val, size_t size) +{ + return snprintf(val, size, "%lld", + (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout)); +} + +static size_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci, + char *val, size_t size) +{ + return snprintf(val, size, "%lld", + (unsigned long long)ceph_file_layout_object_size(ci->i_layout)); +} + +static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci, + char *val, size_t size) +{ + int ret; + struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); + struct ceph_osd_client *osdc = &fsc->client->osdc; + s64 pool = ceph_file_layout_pg_pool(ci->i_layout); + const char *pool_name; + + down_read(&osdc->map_sem); + pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); + if (pool_name) + ret = snprintf(val, size, "%s", pool_name); + else + ret = snprintf(val, size, "%lld", (unsigned long long)pool); + up_read(&osdc->map_sem); + return ret; +} + /* directories */ static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val, @@ -83,17 +168,43 @@ static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val, (long)ci->i_rctime.tv_nsec); } -#define CEPH_XATTR_NAME(_type, _name) XATTR_CEPH_PREFIX #_type "." #_name -#define XATTR_NAME_CEPH(_type, _name) \ - { \ - .name = CEPH_XATTR_NAME(_type, _name), \ - .name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \ - .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \ - .readonly = true, \ - } +#define CEPH_XATTR_NAME(_type, _name) XATTR_CEPH_PREFIX #_type "." #_name +#define CEPH_XATTR_NAME2(_type, _name, _name2) \ + XATTR_CEPH_PREFIX #_type "." #_name "." #_name2 + +#define XATTR_NAME_CEPH(_type, _name) \ + { \ + .name = CEPH_XATTR_NAME(_type, _name), \ + .name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \ + .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \ + .readonly = true, \ + .hidden = false, \ + .exists_cb = NULL, \ + } +#define XATTR_LAYOUT_FIELD(_type, _name, _field) \ + { \ + .name = CEPH_XATTR_NAME2(_type, _name, _field), \ + .name_size = sizeof (CEPH_XATTR_NAME2(_type, _name, _field)), \ + .getxattr_cb = ceph_vxattrcb_ ## _name ## _ ## _field, \ + .readonly = false, \ + .hidden = true, \ + .exists_cb = ceph_vxattrcb_layout_exists, \ + } static struct ceph_vxattr ceph_dir_vxattrs[] = { + { + .name = "ceph.dir.layout", + .name_size = sizeof("ceph.dir.layout"), + .getxattr_cb = ceph_vxattrcb_layout, + .readonly = false, + .hidden = false, + .exists_cb = ceph_vxattrcb_layout_exists, + }, + XATTR_LAYOUT_FIELD(dir, layout, stripe_unit), + XATTR_LAYOUT_FIELD(dir, layout, stripe_count), + XATTR_LAYOUT_FIELD(dir, layout, object_size), + XATTR_LAYOUT_FIELD(dir, layout, pool), XATTR_NAME_CEPH(dir, entries), XATTR_NAME_CEPH(dir, files), XATTR_NAME_CEPH(dir, subdirs), @@ -108,28 +219,19 @@ static size_t ceph_dir_vxattrs_name_size; /* total size of all names */ /* files */ -static size_t ceph_vxattrcb_file_layout(struct ceph_inode_info *ci, char *val, - size_t size) -{ - int ret; - - ret = snprintf(val, size, - "chunk_bytes=%lld\nstripe_count=%lld\nobject_size=%lld\n", - (unsigned long long)ceph_file_layout_su(ci->i_layout), - (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), - (unsigned long long)ceph_file_layout_object_size(ci->i_layout)); - return ret; -} - static struct ceph_vxattr ceph_file_vxattrs[] = { - XATTR_NAME_CEPH(file, layout), - /* The following extended attribute name is deprecated */ { - .name = XATTR_CEPH_PREFIX "layout", - .name_size = sizeof (XATTR_CEPH_PREFIX "layout"), - .getxattr_cb = ceph_vxattrcb_file_layout, - .readonly = true, + .name = "ceph.file.layout", + .name_size = sizeof("ceph.file.layout"), + .getxattr_cb = ceph_vxattrcb_layout, + .readonly = false, + .hidden = false, + .exists_cb = ceph_vxattrcb_layout_exists, }, + XATTR_LAYOUT_FIELD(file, layout, stripe_unit), + XATTR_LAYOUT_FIELD(file, layout, stripe_count), + XATTR_LAYOUT_FIELD(file, layout, object_size), + XATTR_LAYOUT_FIELD(file, layout, pool), { 0 } /* Required table terminator */ }; static size_t ceph_file_vxattrs_name_size; /* total size of all names */ @@ -164,7 +266,8 @@ static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs) size_t size = 0; for (vxattr = vxattrs; vxattr->name; vxattr++) - size += vxattr->name_size; + if (!vxattr->hidden) + size += vxattr->name_size; return size; } @@ -572,13 +675,17 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, if (!ceph_is_valid_xattr(name)) return -ENODATA; - /* let's see if a virtual xattr was requested */ - vxattr = ceph_match_vxattr(inode, name); - spin_lock(&ci->i_ceph_lock); dout("getxattr %p ver=%lld index_ver=%lld\n", inode, ci->i_xattrs.version, ci->i_xattrs.index_version); + /* let's see if a virtual xattr was requested */ + vxattr = ceph_match_vxattr(inode, name); + if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { + err = vxattr->getxattr_cb(ci, value, size); + goto out; + } + if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) && (ci->i_xattrs.index_version >= ci->i_xattrs.version)) { goto get_xattr; @@ -592,11 +699,6 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, spin_lock(&ci->i_ceph_lock); - if (vxattr && vxattr->readonly) { - err = vxattr->getxattr_cb(ci, value, size); - goto out; - } - err = __build_xattrs(inode); if (err < 0) goto out; @@ -604,11 +706,8 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, get_xattr: err = -ENODATA; /* == ENOATTR */ xattr = __get_xattr(ci, name); - if (!xattr) { - if (vxattr) - err = vxattr->getxattr_cb(ci, value, size); + if (!xattr) goto out; - } err = -ERANGE; if (size && size < xattr->val_len) @@ -664,23 +763,30 @@ list_xattr: vir_namelen = ceph_vxattrs_name_size(vxattrs); /* adding 1 byte per each variable due to the null termination */ - namelen = vir_namelen + ci->i_xattrs.names_size + ci->i_xattrs.count; + namelen = ci->i_xattrs.names_size + ci->i_xattrs.count; err = -ERANGE; - if (size && namelen > size) + if (size && vir_namelen + namelen > size) goto out; - err = namelen; + err = namelen + vir_namelen; if (size == 0) goto out; names = __copy_xattr_names(ci, names); /* virtual xattr names, too */ - if (vxattrs) + err = namelen; + if (vxattrs) { for (i = 0; vxattrs[i].name; i++) { - len = sprintf(names, "%s", vxattrs[i].name); - names += len + 1; + if (!vxattrs[i].hidden && + !(vxattrs[i].exists_cb && + !vxattrs[i].exists_cb(ci))) { + len = sprintf(names, "%s", vxattrs[i].name); + names += len + 1; + err += len + 1; + } } + } out: spin_unlock(&ci->i_ceph_lock); @@ -782,6 +888,10 @@ int ceph_setxattr(struct dentry *dentry, const char *name, if (vxattr && vxattr->readonly) return -EOPNOTSUPP; + /* pass any unhandled ceph.* xattrs through to the MDS */ + if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN)) + goto do_sync_unlocked; + /* preallocate memory for xattr name, value, index node */ err = -ENOMEM; newname = kmemdup(name, name_len + 1, GFP_NOFS); @@ -838,6 +948,7 @@ retry: do_sync: spin_unlock(&ci->i_ceph_lock); +do_sync_unlocked: err = ceph_sync_setxattr(dentry, name, value, size, flags); out: kfree(newname); @@ -892,6 +1003,10 @@ int ceph_removexattr(struct dentry *dentry, const char *name) if (vxattr && vxattr->readonly) return -EOPNOTSUPP; + /* pass any unhandled ceph.* xattrs through to the MDS */ + if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN)) + goto do_sync_unlocked; + err = -ENOMEM; spin_lock(&ci->i_ceph_lock); retry: @@ -931,6 +1046,7 @@ retry: return err; do_sync: spin_unlock(&ci->i_ceph_lock); +do_sync_unlocked: err = ceph_send_removexattr(dentry, name); out: return err; diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index dad579b..2160aab 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -14,13 +14,19 @@ #define CEPH_FEATURE_DIRLAYOUTHASH (1<<7) /* bits 8-17 defined by user-space; not supported yet here */ #define CEPH_FEATURE_CRUSH_TUNABLES (1<<18) +/* bits 19-24 defined by user-space; not supported yet here */ +#define CEPH_FEATURE_CRUSH_TUNABLES2 (1<<25) +/* bit 26 defined by user-space; not supported yet here */ +#define CEPH_FEATURE_REPLY_CREATE_INODE (1<<27) /* * Features supported. */ #define CEPH_FEATURES_SUPPORTED_DEFAULT \ (CEPH_FEATURE_NOSRCADDR | \ - CEPH_FEATURE_CRUSH_TUNABLES) + CEPH_FEATURE_CRUSH_TUNABLES | \ + CEPH_FEATURE_CRUSH_TUNABLES2 | \ + CEPH_FEATURE_REPLY_CREATE_INODE) #define CEPH_FEATURES_REQUIRED_DEFAULT \ (CEPH_FEATURE_NOSRCADDR) diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index cf6f4d9..2ad7b86 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -21,16 +21,14 @@ * internal cluster protocols separately from the public, * client-facing protocol. */ -#define CEPH_OSD_PROTOCOL 8 /* cluster internal */ -#define CEPH_MDS_PROTOCOL 12 /* cluster internal */ -#define CEPH_MON_PROTOCOL 5 /* cluster internal */ #define CEPH_OSDC_PROTOCOL 24 /* server/client */ #define CEPH_MDSC_PROTOCOL 32 /* server/client */ #define CEPH_MONC_PROTOCOL 15 /* server/client */ -#define CEPH_INO_ROOT 1 -#define CEPH_INO_CEPH 2 /* hidden .ceph dir */ +#define CEPH_INO_ROOT 1 +#define CEPH_INO_CEPH 2 /* hidden .ceph dir */ +#define CEPH_INO_DOTDOT 3 /* used by ceph fuse for parent (..) */ /* arbitrary limit on max # of monitors (cluster of 3 is typical) */ #define CEPH_MAX_MON 31 @@ -51,7 +49,7 @@ struct ceph_file_layout { __le32 fl_object_stripe_unit; /* UNUSED. for per-object parity, if any */ /* object -> pg layout */ - __le32 fl_unused; /* unused; used to be preferred primary (-1) */ + __le32 fl_unused; /* unused; used to be preferred primary for pg (-1 for none) */ __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ } __attribute__ ((packed)); @@ -101,6 +99,8 @@ struct ceph_dir_layout { #define CEPH_MSG_MON_SUBSCRIBE_ACK 16 #define CEPH_MSG_AUTH 17 #define CEPH_MSG_AUTH_REPLY 18 +#define CEPH_MSG_MON_GET_VERSION 19 +#define CEPH_MSG_MON_GET_VERSION_REPLY 20 /* client <-> mds */ #define CEPH_MSG_MDS_MAP 21 @@ -221,6 +221,11 @@ struct ceph_mon_subscribe_ack { } __attribute__ ((packed)); /* + * mdsmap flags + */ +#define CEPH_MDSMAP_DOWN (1<<0) /* cluster deliberately down */ + +/* * mds states * > 0 -> in * <= 0 -> out @@ -233,6 +238,7 @@ struct ceph_mon_subscribe_ack { #define CEPH_MDS_STATE_CREATING -6 /* up, creating MDS instance. */ #define CEPH_MDS_STATE_STARTING -7 /* up, starting previously stopped mds */ #define CEPH_MDS_STATE_STANDBY_REPLAY -8 /* up, tailing active node's journal */ +#define CEPH_MDS_STATE_REPLAYONCE -9 /* up, replaying an active node's journal */ #define CEPH_MDS_STATE_REPLAY 8 /* up, replaying journal. */ #define CEPH_MDS_STATE_RESOLVE 9 /* up, disambiguating distributed @@ -264,6 +270,7 @@ extern const char *ceph_mds_state_name(int s); #define CEPH_LOCK_IXATTR 2048 #define CEPH_LOCK_IFLOCK 4096 /* advisory file locks */ #define CEPH_LOCK_INO 8192 /* immutable inode bits; not a lock */ +#define CEPH_LOCK_IPOLICY 16384 /* policy lock on dirs. MDS internal */ /* client_session ops */ enum { @@ -338,6 +345,12 @@ extern const char *ceph_mds_op_name(int op); #define CEPH_SETATTR_SIZE 32 #define CEPH_SETATTR_CTIME 64 +/* + * Ceph setxattr request flags. + */ +#define CEPH_XATTR_CREATE 1 +#define CEPH_XATTR_REPLACE 2 + union ceph_mds_request_args { struct { __le32 mask; /* CEPH_CAP_* */ @@ -522,14 +535,17 @@ int ceph_flags_to_mode(int flags); #define CEPH_CAP_GWREXTEND 64 /* (file) client can extend EOF */ #define CEPH_CAP_GLAZYIO 128 /* (file) client can perform lazy io */ +#define CEPH_CAP_SIMPLE_BITS 2 +#define CEPH_CAP_FILE_BITS 8 + /* per-lock shift */ #define CEPH_CAP_SAUTH 2 #define CEPH_CAP_SLINK 4 #define CEPH_CAP_SXATTR 6 #define CEPH_CAP_SFILE 8 -#define CEPH_CAP_SFLOCK 20 +#define CEPH_CAP_SFLOCK 20 -#define CEPH_CAP_BITS 22 +#define CEPH_CAP_BITS 22 /* composed values */ #define CEPH_CAP_AUTH_SHARED (CEPH_CAP_GSHARED << CEPH_CAP_SAUTH) diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h index 63d0928..360d9d0 100644 --- a/include/linux/ceph/decode.h +++ b/include/linux/ceph/decode.h @@ -52,10 +52,10 @@ static inline int ceph_has_room(void **p, void *end, size_t n) return end >= *p && n <= end - *p; } -#define ceph_decode_need(p, end, n, bad) \ - do { \ - if (!likely(ceph_has_room(p, end, n))) \ - goto bad; \ +#define ceph_decode_need(p, end, n, bad) \ + do { \ + if (!likely(ceph_has_room(p, end, n))) \ + goto bad; \ } while (0) #define ceph_decode_64_safe(p, end, v, bad) \ @@ -99,8 +99,8 @@ static inline int ceph_has_room(void **p, void *end, size_t n) * * There are two possible failures: * - converting the string would require accessing memory at or - * beyond the "end" pointer provided (-E - * - memory could not be allocated for the result + * beyond the "end" pointer provided (-ERANGE) + * - memory could not be allocated for the result (-ENOMEM) */ static inline char *ceph_extract_encoded_string(void **p, void *end, size_t *lenp, gfp_t gfp) @@ -217,10 +217,10 @@ static inline void ceph_encode_string(void **p, void *end, *p += len; } -#define ceph_encode_need(p, end, n, bad) \ - do { \ - if (!likely(ceph_has_room(p, end, n))) \ - goto bad; \ +#define ceph_encode_need(p, end, n, bad) \ + do { \ + if (!likely(ceph_has_room(p, end, n))) \ + goto bad; \ } while (0) #define ceph_encode_64_safe(p, end, v, bad) \ @@ -231,12 +231,17 @@ static inline void ceph_encode_string(void **p, void *end, #define ceph_encode_32_safe(p, end, v, bad) \ do { \ ceph_encode_need(p, end, sizeof(u32), bad); \ - ceph_encode_32(p, v); \ + ceph_encode_32(p, v); \ } while (0) #define ceph_encode_16_safe(p, end, v, bad) \ do { \ ceph_encode_need(p, end, sizeof(u16), bad); \ - ceph_encode_16(p, v); \ + ceph_encode_16(p, v); \ + } while (0) +#define ceph_encode_8_safe(p, end, v, bad) \ + do { \ + ceph_encode_need(p, end, sizeof(u8), bad); \ + ceph_encode_8(p, v); \ } while (0) #define ceph_encode_copy_safe(p, end, pv, n, bad) \ diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 084d3c6..29818fc 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -193,6 +193,8 @@ static inline int calc_pages_for(u64 off, u64 len) } /* ceph_common.c */ +extern bool libceph_compatible(void *data); + extern const char *ceph_msg_type_name(int type); extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid); extern struct kmem_cache *ceph_inode_cachep; @@ -220,7 +222,7 @@ extern int ceph_open_session(struct ceph_client *client); /* pagevec.c */ extern void ceph_release_page_vector(struct page **pages, int num_pages); -extern struct page **ceph_get_direct_page_vector(const char __user *data, +extern struct page **ceph_get_direct_page_vector(const void __user *data, int num_pages, bool write_page); extern void ceph_put_page_vector(struct page **pages, int num_pages, @@ -228,15 +230,15 @@ extern void ceph_put_page_vector(struct page **pages, int num_pages, extern void ceph_release_page_vector(struct page **pages, int num_pages); extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags); extern int ceph_copy_user_to_page_vector(struct page **pages, - const char __user *data, + const void __user *data, loff_t off, size_t len); -extern int ceph_copy_to_page_vector(struct page **pages, - const char *data, +extern void ceph_copy_to_page_vector(struct page **pages, + const void *data, loff_t off, size_t len); -extern int ceph_copy_from_page_vector(struct page **pages, - char *data, +extern void ceph_copy_from_page_vector(struct page **pages, + void *data, loff_t off, size_t len); -extern int ceph_copy_page_vector_to_user(struct page **pages, char __user *data, +extern int ceph_copy_page_vector_to_user(struct page **pages, void __user *data, loff_t off, size_t len); extern void ceph_zero_page_vector_range(int off, int len, struct page **pages); diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 14ba5ee..60903e0 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -83,9 +83,11 @@ struct ceph_msg { struct list_head list_head; struct kref kref; +#ifdef CONFIG_BLOCK struct bio *bio; /* instead of pages/pagelist */ struct bio *bio_iter; /* bio iterator */ int bio_seg; /* current bio segment */ +#endif /* CONFIG_BLOCK */ struct ceph_pagelist *trail; /* the trailing part of the data */ bool front_is_vmalloc; bool more_to_follow; diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index d9b880e..388158f 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -10,6 +10,7 @@ #include <linux/ceph/osdmap.h> #include <linux/ceph/messenger.h> #include <linux/ceph/auth.h> +#include <linux/ceph/pagelist.h> /* * Maximum object name size @@ -22,7 +23,6 @@ struct ceph_snap_context; struct ceph_osd_request; struct ceph_osd_client; struct ceph_authorizer; -struct ceph_pagelist; /* * completion callback for async writepages @@ -95,7 +95,7 @@ struct ceph_osd_request { struct bio *r_bio; /* instead of pages */ #endif - struct ceph_pagelist *r_trail; /* trailing part of the data */ + struct ceph_pagelist r_trail; /* trailing part of the data */ }; struct ceph_osd_event { @@ -107,7 +107,6 @@ struct ceph_osd_event { struct rb_node node; struct list_head osd_node; struct kref kref; - struct completion completion; }; struct ceph_osd_event_work { @@ -157,7 +156,7 @@ struct ceph_osd_client { struct ceph_osd_req_op { u16 op; /* CEPH_OSD_OP_* */ - u32 flags; /* CEPH_OSD_FLAG_* */ + u32 payload_len; union { struct { u64 offset, length; @@ -166,23 +165,24 @@ struct ceph_osd_req_op { } extent; struct { const char *name; - u32 name_len; const char *val; + u32 name_len; u32 value_len; __u8 cmp_op; /* CEPH_OSD_CMPXATTR_OP_* */ __u8 cmp_mode; /* CEPH_OSD_CMPXATTR_MODE_* */ } xattr; struct { const char *class_name; - __u8 class_len; const char *method_name; - __u8 method_len; - __u8 argc; const char *indata; u32 indata_len; + __u8 class_len; + __u8 method_len; + __u8 argc; } cls; struct { - u64 cookie, count; + u64 cookie; + u64 count; } pgls; struct { u64 snapid; @@ -190,12 +190,11 @@ struct ceph_osd_req_op { struct { u64 cookie; u64 ver; - __u8 flag; u32 prot_ver; u32 timeout; + __u8 flag; } watch; }; - u32 payload_len; }; extern int ceph_osdc_init(struct ceph_osd_client *osdc, @@ -207,29 +206,19 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg); -extern int ceph_calc_raw_layout(struct ceph_osd_client *osdc, - struct ceph_file_layout *layout, - u64 snapid, - u64 off, u64 *plen, u64 *bno, - struct ceph_osd_request *req, - struct ceph_osd_req_op *op); - extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, - int flags, struct ceph_snap_context *snapc, - struct ceph_osd_req_op *ops, + unsigned int num_op, bool use_mempool, - gfp_t gfp_flags, - struct page **pages, - struct bio *bio); + gfp_t gfp_flags); extern void ceph_osdc_build_request(struct ceph_osd_request *req, - u64 off, u64 *plen, + u64 off, u64 len, + unsigned int num_op, struct ceph_osd_req_op *src_ops, struct ceph_snap_context *snapc, - struct timespec *mtime, - const char *oid, - int oid_len); + u64 snap_id, + struct timespec *mtime); extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, struct ceph_file_layout *layout, @@ -239,8 +228,7 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, int do_sync, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - bool use_mempool, int num_reply, - int page_align); + bool use_mempool, int page_align); extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, struct ceph_osd_request *req); @@ -279,17 +267,13 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, u64 off, u64 len, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - struct page **pages, int nr_pages, - int flags, int do_sync, bool nofail); + struct page **pages, int nr_pages); /* watch/notify events */ extern int ceph_osdc_create_event(struct ceph_osd_client *osdc, void (*event_cb)(u64, u64, u8, void *), - int one_shot, void *data, - struct ceph_osd_event **pevent); + void *data, struct ceph_osd_event **pevent); extern void ceph_osdc_cancel_event(struct ceph_osd_event *event); -extern int ceph_osdc_wait_event(struct ceph_osd_event *event, - unsigned long timeout); extern void ceph_osdc_put_event(struct ceph_osd_event *event); #endif diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 10a417f..c83a838 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -110,7 +110,7 @@ extern void ceph_osdmap_destroy(struct ceph_osdmap *map); /* calculate mapping of a file extent to an object */ extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, - u64 off, u64 *plen, + u64 off, u64 len, u64 *bno, u64 *oxoff, u64 *oxlen); /* calculate mapping of object to a placement group */ diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 2c04afe..b65182a 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -145,8 +145,12 @@ struct ceph_eversion { */ /* status bits */ -#define CEPH_OSD_EXISTS 1 -#define CEPH_OSD_UP 2 +#define CEPH_OSD_EXISTS (1<<0) +#define CEPH_OSD_UP (1<<1) +#define CEPH_OSD_AUTOOUT (1<<2) /* osd was automatically marked out */ +#define CEPH_OSD_NEW (1<<3) /* osd is new, never marked in */ + +extern const char *ceph_osd_state_name(int s); /* osd weights. fixed point value: 0x10000 == 1.0 ("in"), 0 == "out" */ #define CEPH_OSD_IN 0x10000 @@ -161,9 +165,25 @@ struct ceph_eversion { #define CEPH_OSDMAP_PAUSERD (1<<2) /* pause all reads */ #define CEPH_OSDMAP_PAUSEWR (1<<3) /* pause all writes */ #define CEPH_OSDMAP_PAUSEREC (1<<4) /* pause recovery */ +#define CEPH_OSDMAP_NOUP (1<<5) /* block osd boot */ +#define CEPH_OSDMAP_NODOWN (1<<6) /* block osd mark-down/failure */ +#define CEPH_OSDMAP_NOOUT (1<<7) /* block osd auto mark-out */ +#define CEPH_OSDMAP_NOIN (1<<8) /* block osd auto mark-in */ +#define CEPH_OSDMAP_NOBACKFILL (1<<9) /* block osd backfill */ +#define CEPH_OSDMAP_NORECOVER (1<<10) /* block osd recovery and backfill */ + +/* + * The error code to return when an OSD can't handle a write + * because it is too large. + */ +#define OSD_WRITETOOBIG EMSGSIZE /* * osd ops + * + * WARNING: do not use these op codes directly. Use the helpers + * defined below instead. In certain cases, op code behavior was + * redefined, resulting in special-cases in the helpers. */ #define CEPH_OSD_OP_MODE 0xf000 #define CEPH_OSD_OP_MODE_RD 0x1000 @@ -177,6 +197,7 @@ struct ceph_eversion { #define CEPH_OSD_OP_TYPE_ATTR 0x0300 #define CEPH_OSD_OP_TYPE_EXEC 0x0400 #define CEPH_OSD_OP_TYPE_PG 0x0500 +#define CEPH_OSD_OP_TYPE_MULTI 0x0600 /* multiobject */ enum { /** data **/ @@ -217,6 +238,23 @@ enum { CEPH_OSD_OP_WATCH = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 15, + /* omap */ + CEPH_OSD_OP_OMAPGETKEYS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 17, + CEPH_OSD_OP_OMAPGETVALS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 18, + CEPH_OSD_OP_OMAPGETHEADER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 19, + CEPH_OSD_OP_OMAPGETVALSBYKEYS = + CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 20, + CEPH_OSD_OP_OMAPSETVALS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 21, + CEPH_OSD_OP_OMAPSETHEADER = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 22, + CEPH_OSD_OP_OMAPCLEAR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 23, + CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24, + CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25, + + /** multi **/ + CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1, + CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2, + CEPH_OSD_OP_SRC_CMPXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 3, + /** attrs **/ /* read */ CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1, @@ -238,6 +276,7 @@ enum { CEPH_OSD_OP_SCRUB_RESERVE = CEPH_OSD_OP_MODE_SUB | 6, CEPH_OSD_OP_SCRUB_UNRESERVE = CEPH_OSD_OP_MODE_SUB | 7, CEPH_OSD_OP_SCRUB_STOP = CEPH_OSD_OP_MODE_SUB | 8, + CEPH_OSD_OP_SCRUB_MAP = CEPH_OSD_OP_MODE_SUB | 9, /** lock **/ CEPH_OSD_OP_WRLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1, @@ -248,10 +287,12 @@ enum { CEPH_OSD_OP_DNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6, /** exec **/ + /* note: the RD bit here is wrong; see special-case below in helper */ CEPH_OSD_OP_CALL = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_EXEC | 1, /** pg **/ CEPH_OSD_OP_PGLS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 1, + CEPH_OSD_OP_PGLS_FILTER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 2, }; static inline int ceph_osd_op_type_lock(int op) @@ -274,6 +315,10 @@ static inline int ceph_osd_op_type_pg(int op) { return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_PG; } +static inline int ceph_osd_op_type_multi(int op) +{ + return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_MULTI; +} static inline int ceph_osd_op_mode_subop(int op) { @@ -281,11 +326,12 @@ static inline int ceph_osd_op_mode_subop(int op) } static inline int ceph_osd_op_mode_read(int op) { - return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_RD; + return (op & CEPH_OSD_OP_MODE_RD) && + op != CEPH_OSD_OP_CALL; } static inline int ceph_osd_op_mode_modify(int op) { - return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_WR; + return op & CEPH_OSD_OP_MODE_WR; } /* @@ -294,34 +340,38 @@ static inline int ceph_osd_op_mode_modify(int op) */ #define CEPH_OSD_TMAP_HDR 'h' #define CEPH_OSD_TMAP_SET 's' +#define CEPH_OSD_TMAP_CREATE 'c' /* create key */ #define CEPH_OSD_TMAP_RM 'r' +#define CEPH_OSD_TMAP_RMSLOPPY 'R' extern const char *ceph_osd_op_name(int op); - /* * osd op flags * * An op may be READ, WRITE, or READ|WRITE. */ enum { - CEPH_OSD_FLAG_ACK = 1, /* want (or is) "ack" ack */ - CEPH_OSD_FLAG_ONNVRAM = 2, /* want (or is) "onnvram" ack */ - CEPH_OSD_FLAG_ONDISK = 4, /* want (or is) "ondisk" ack */ - CEPH_OSD_FLAG_RETRY = 8, /* resend attempt */ - CEPH_OSD_FLAG_READ = 16, /* op may read */ - CEPH_OSD_FLAG_WRITE = 32, /* op may write */ - CEPH_OSD_FLAG_ORDERSNAP = 64, /* EOLDSNAP if snapc is out of order */ - CEPH_OSD_FLAG_PEERSTAT = 128, /* msg includes osd_peer_stat */ - CEPH_OSD_FLAG_BALANCE_READS = 256, - CEPH_OSD_FLAG_PARALLELEXEC = 512, /* execute op in parallel */ - CEPH_OSD_FLAG_PGOP = 1024, /* pg op, no object */ - CEPH_OSD_FLAG_EXEC = 2048, /* op may exec */ - CEPH_OSD_FLAG_EXEC_PUBLIC = 4096, /* op may exec (public) */ + CEPH_OSD_FLAG_ACK = 0x0001, /* want (or is) "ack" ack */ + CEPH_OSD_FLAG_ONNVRAM = 0x0002, /* want (or is) "onnvram" ack */ + CEPH_OSD_FLAG_ONDISK = 0x0004, /* want (or is) "ondisk" ack */ + CEPH_OSD_FLAG_RETRY = 0x0008, /* resend attempt */ + CEPH_OSD_FLAG_READ = 0x0010, /* op may read */ + CEPH_OSD_FLAG_WRITE = 0x0020, /* op may write */ + CEPH_OSD_FLAG_ORDERSNAP = 0x0040, /* EOLDSNAP if snapc is out of order */ + CEPH_OSD_FLAG_PEERSTAT_OLD = 0x0080, /* DEPRECATED msg includes osd_peer_stat */ + CEPH_OSD_FLAG_BALANCE_READS = 0x0100, + CEPH_OSD_FLAG_PARALLELEXEC = 0x0200, /* execute op in parallel */ + CEPH_OSD_FLAG_PGOP = 0x0400, /* pg op, no object */ + CEPH_OSD_FLAG_EXEC = 0x0800, /* op may exec */ + CEPH_OSD_FLAG_EXEC_PUBLIC = 0x1000, /* DEPRECATED op may exec (public) */ + CEPH_OSD_FLAG_LOCALIZE_READS = 0x2000, /* read from nearby replica, if any */ + CEPH_OSD_FLAG_RWORDERED = 0x4000, /* order wrt concurrent reads */ }; enum { CEPH_OSD_OP_FLAG_EXCL = 1, /* EXCL object create */ + CEPH_OSD_OP_FLAG_FAILOK = 2, /* continue despite failure */ }; #define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/ @@ -381,7 +431,11 @@ struct ceph_osd_op { __le64 ver; __u8 flag; /* 0 = unwatch, 1 = watch */ } __attribute__ ((packed)) watch; -}; + struct { + __le64 offset, length; + __le64 src_offset; + } __attribute__ ((packed)) clonerange; + }; __le32 payload_len; } __attribute__ ((packed)); @@ -424,5 +478,4 @@ struct ceph_osd_reply_head { } __attribute__ ((packed)); - #endif diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 25baa28..6a1101f 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -162,6 +162,8 @@ struct crush_map { __u32 choose_local_fallback_tries; /* choose attempts before giving up */ __u32 choose_total_tries; + /* attempt chooseleaf inner descent once; on failure retry outer descent */ + __u32 chooseleaf_descend_once; }; diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index ee71ea2..c236c235 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -26,6 +26,22 @@ #include "crypto.h" +/* + * Module compatibility interface. For now it doesn't do anything, + * but its existence signals a certain level of functionality. + * + * The data buffer is used to pass information both to and from + * libceph. The return value indicates whether libceph determines + * it is compatible with the caller (from another kernel module), + * given the provided data. + * + * The data pointer can be null. + */ +bool libceph_compatible(void *data) +{ + return true; +} +EXPORT_SYMBOL(libceph_compatible); /* * find filename portion of a path (/foo/bar/baz -> baz) diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c index 3fbda04..1348df9 100644 --- a/net/ceph/ceph_strings.c +++ b/net/ceph/ceph_strings.c @@ -21,9 +21,15 @@ const char *ceph_osd_op_name(int op) switch (op) { case CEPH_OSD_OP_READ: return "read"; case CEPH_OSD_OP_STAT: return "stat"; + case CEPH_OSD_OP_MAPEXT: return "mapext"; + case CEPH_OSD_OP_SPARSE_READ: return "sparse-read"; + case CEPH_OSD_OP_NOTIFY: return "notify"; + case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack"; + case CEPH_OSD_OP_ASSERT_VER: return "assert-version"; case CEPH_OSD_OP_MASKTRUNC: return "masktrunc"; + case CEPH_OSD_OP_CREATE: return "create"; case CEPH_OSD_OP_WRITE: return "write"; case CEPH_OSD_OP_DELETE: return "delete"; case CEPH_OSD_OP_TRUNCATE: return "truncate"; @@ -39,6 +45,11 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_TMAPUP: return "tmapup"; case CEPH_OSD_OP_TMAPGET: return "tmapget"; case CEPH_OSD_OP_TMAPPUT: return "tmapput"; + case CEPH_OSD_OP_WATCH: return "watch"; + + case CEPH_OSD_OP_CLONERANGE: return "clonerange"; + case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version"; + case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr"; case CEPH_OSD_OP_GETXATTR: return "getxattr"; case CEPH_OSD_OP_GETXATTRS: return "getxattrs"; @@ -53,6 +64,10 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_BALANCEREADS: return "balance-reads"; case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads"; case CEPH_OSD_OP_SCRUB: return "scrub"; + case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve"; + case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve"; + case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop"; + case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map"; case CEPH_OSD_OP_WRLOCK: return "wrlock"; case CEPH_OSD_OP_WRUNLOCK: return "wrunlock"; @@ -64,10 +79,34 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_CALL: return "call"; case CEPH_OSD_OP_PGLS: return "pgls"; + case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter"; + case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys"; + case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals"; + case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header"; + case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys"; + case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals"; + case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header"; + case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear"; + case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys"; } return "???"; } +const char *ceph_osd_state_name(int s) +{ + switch (s) { + case CEPH_OSD_EXISTS: + return "exists"; + case CEPH_OSD_UP: + return "up"; + case CEPH_OSD_AUTOOUT: + return "autoout"; + case CEPH_OSD_NEW: + return "new"; + default: + return "???"; + } +} const char *ceph_pool_op_name(int op) { diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 35fce75..cbd06a9 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in * @outpos: our position in that vector * @firstn: true if choosing "first n" items, false if choosing "indep" * @recurse_to_leaf: true if we want one device under each item of given type + * @descend_once: true if we should only try one descent before giving up * @out2: second output vector for leaf items (if @recurse_to_leaf) */ static int crush_choose(const struct crush_map *map, @@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map, int x, int numrep, int type, int *out, int outpos, int firstn, int recurse_to_leaf, - int *out2) + int descend_once, int *out2) { int rep; unsigned int ftotal, flocal; @@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map, } reject = 0; - if (recurse_to_leaf) { + if (!collide && recurse_to_leaf) { if (item < 0) { if (crush_choose(map, map->buckets[-1-item], @@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map, x, outpos+1, 0, out2, outpos, firstn, 0, + map->chooseleaf_descend_once, NULL) <= outpos) /* didn't get leaf */ reject = 1; @@ -422,7 +424,10 @@ reject: ftotal++; flocal++; - if (collide && flocal <= map->choose_local_tries) + if (reject && descend_once) + /* let outer call try again */ + skip_rep = 1; + else if (collide && flocal <= map->choose_local_tries) /* retry locally a few times */ retry_bucket = 1; else if (map->choose_local_fallback_tries > 0 && @@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map, int i, j; int numrep; int firstn; + const int descend_once = 0; if ((__u32)ruleno >= map->max_rules) { dprintk(" bad ruleno %d\n", ruleno); @@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map, curstep->arg2, o+osize, j, firstn, - recurse_to_leaf, c+osize); + recurse_to_leaf, + descend_once, c+osize); } if (recurse_to_leaf) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5ccf87e..8a62a55 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -9,8 +9,9 @@ #include <linux/slab.h> #include <linux/socket.h> #include <linux/string.h> +#ifdef CONFIG_BLOCK #include <linux/bio.h> -#include <linux/blkdev.h> +#endif /* CONFIG_BLOCK */ #include <linux/dns_resolver.h> #include <net/tcp.h> @@ -2651,9 +2652,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, m->page_alignment = 0; m->pages = NULL; m->pagelist = NULL; +#ifdef CONFIG_BLOCK m->bio = NULL; m->bio_iter = NULL; m->bio_seg = 0; +#endif /* CONFIG_BLOCK */ m->trail = NULL; /* front */ diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index eb9a444..39629b6 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -23,7 +23,7 @@ static const struct ceph_connection_operations osd_con_ops; -static void send_queued(struct ceph_osd_client *osdc); +static void __send_queued(struct ceph_osd_client *osdc); static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); static void __register_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); @@ -32,64 +32,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc, static void __send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); -static int op_needs_trail(int op) -{ - switch (op) { - case CEPH_OSD_OP_GETXATTR: - case CEPH_OSD_OP_SETXATTR: - case CEPH_OSD_OP_CMPXATTR: - case CEPH_OSD_OP_CALL: - case CEPH_OSD_OP_NOTIFY: - return 1; - default: - return 0; - } -} - static int op_has_extent(int op) { return (op == CEPH_OSD_OP_READ || op == CEPH_OSD_OP_WRITE); } -int ceph_calc_raw_layout(struct ceph_osd_client *osdc, - struct ceph_file_layout *layout, - u64 snapid, - u64 off, u64 *plen, u64 *bno, - struct ceph_osd_request *req, - struct ceph_osd_req_op *op) -{ - struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; - u64 orig_len = *plen; - u64 objoff, objlen; /* extent in object */ - int r; - - reqhead->snapid = cpu_to_le64(snapid); - - /* object extent? */ - r = ceph_calc_file_object_mapping(layout, off, plen, bno, - &objoff, &objlen); - if (r < 0) - return r; - if (*plen < orig_len) - dout(" skipping last %llu, final file extent %llu~%llu\n", - orig_len - *plen, off, *plen); - - if (op_has_extent(op->op)) { - op->extent.offset = objoff; - op->extent.length = objlen; - } - req->r_num_pages = calc_pages_for(off, *plen); - req->r_page_alignment = off & ~PAGE_MASK; - if (op->op == CEPH_OSD_OP_WRITE) - op->payload_len = *plen; - - dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", - *bno, objoff, objlen, req->r_num_pages); - return 0; -} -EXPORT_SYMBOL(ceph_calc_raw_layout); - /* * Implement client access to distributed object storage cluster. * @@ -115,20 +63,48 @@ EXPORT_SYMBOL(ceph_calc_raw_layout); * * fill osd op in request message. */ -static int calc_layout(struct ceph_osd_client *osdc, - struct ceph_vino vino, +static int calc_layout(struct ceph_vino vino, struct ceph_file_layout *layout, u64 off, u64 *plen, struct ceph_osd_request *req, struct ceph_osd_req_op *op) { - u64 bno; + u64 orig_len = *plen; + u64 bno = 0; + u64 objoff = 0; + u64 objlen = 0; int r; - r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, - plen, &bno, req, op); + /* object extent? */ + r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno, + &objoff, &objlen); if (r < 0) return r; + if (objlen < orig_len) { + *plen = objlen; + dout(" skipping last %llu, final file extent %llu~%llu\n", + orig_len - *plen, off, *plen); + } + + if (op_has_extent(op->op)) { + u32 osize = le32_to_cpu(layout->fl_object_size); + op->extent.offset = objoff; + op->extent.length = objlen; + if (op->extent.truncate_size <= off - objoff) { + op->extent.truncate_size = 0; + } else { + op->extent.truncate_size -= off - objoff; + if (op->extent.truncate_size > osize) + op->extent.truncate_size = osize; + } + } + req->r_num_pages = calc_pages_for(off, *plen); + req->r_page_alignment = off & ~PAGE_MASK; + if (op->op == CEPH_OSD_OP_WRITE) + op->payload_len = *plen; + + dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", + bno, objoff, objlen, req->r_num_pages); snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); req->r_oid_len = strlen(req->r_oid); @@ -148,25 +124,19 @@ void ceph_osdc_release_request(struct kref *kref) if (req->r_request) ceph_msg_put(req->r_request); if (req->r_con_filling_msg) { - dout("%s revoking pages %p from con %p\n", __func__, - req->r_pages, req->r_con_filling_msg); + dout("%s revoking msg %p from con %p\n", __func__, + req->r_reply, req->r_con_filling_msg); ceph_msg_revoke_incoming(req->r_reply); req->r_con_filling_msg->ops->put(req->r_con_filling_msg); + req->r_con_filling_msg = NULL; } if (req->r_reply) ceph_msg_put(req->r_reply); if (req->r_own_pages) ceph_release_page_vector(req->r_pages, req->r_num_pages); -#ifdef CONFIG_BLOCK - if (req->r_bio) - bio_put(req->r_bio); -#endif ceph_put_snap_context(req->r_snapc); - if (req->r_trail) { - ceph_pagelist_release(req->r_trail); - kfree(req->r_trail); - } + ceph_pagelist_release(&req->r_trail); if (req->r_mempool) mempool_free(req, req->r_osdc->req_mempool); else @@ -174,34 +144,14 @@ void ceph_osdc_release_request(struct kref *kref) } EXPORT_SYMBOL(ceph_osdc_release_request); -static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail) -{ - int i = 0; - - if (needs_trail) - *needs_trail = 0; - while (ops[i].op) { - if (needs_trail && op_needs_trail(ops[i].op)) - *needs_trail = 1; - i++; - } - - return i; -} - struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, - int flags, struct ceph_snap_context *snapc, - struct ceph_osd_req_op *ops, + unsigned int num_op, bool use_mempool, - gfp_t gfp_flags, - struct page **pages, - struct bio *bio) + gfp_t gfp_flags) { struct ceph_osd_request *req; struct ceph_msg *msg; - int needs_trail; - int num_op = get_num_ops(ops, &needs_trail); size_t msg_size = sizeof(struct ceph_osd_request_head); msg_size += num_op*sizeof(struct ceph_osd_op); @@ -228,10 +178,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); - req->r_flags = flags; - - WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); - /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); @@ -244,15 +190,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, } req->r_reply = msg; - /* allocate space for the trailing data */ - if (needs_trail) { - req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags); - if (!req->r_trail) { - ceph_osdc_put_request(req); - return NULL; - } - ceph_pagelist_init(req->r_trail); - } + ceph_pagelist_init(&req->r_trail); /* create request message; allow space for oid */ msg_size += MAX_OBJ_NAME_SIZE; @@ -270,13 +208,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, memset(msg->front.iov_base, 0, msg->front.iov_len); req->r_request = msg; - req->r_pages = pages; -#ifdef CONFIG_BLOCK - if (bio) { - req->r_bio = bio; - bio_get(req->r_bio); - } -#endif return req; } @@ -289,6 +220,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req, dst->op = cpu_to_le16(src->op); switch (src->op) { + case CEPH_OSD_OP_STAT: + break; case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: dst->extent.offset = @@ -300,52 +233,20 @@ static void osd_req_encode_op(struct ceph_osd_request *req, dst->extent.truncate_seq = cpu_to_le32(src->extent.truncate_seq); break; - - case CEPH_OSD_OP_GETXATTR: - case CEPH_OSD_OP_SETXATTR: - case CEPH_OSD_OP_CMPXATTR: - BUG_ON(!req->r_trail); - - dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); - dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); - dst->xattr.cmp_op = src->xattr.cmp_op; - dst->xattr.cmp_mode = src->xattr.cmp_mode; - ceph_pagelist_append(req->r_trail, src->xattr.name, - src->xattr.name_len); - ceph_pagelist_append(req->r_trail, src->xattr.val, - src->xattr.value_len); - break; case CEPH_OSD_OP_CALL: - BUG_ON(!req->r_trail); - dst->cls.class_len = src->cls.class_len; dst->cls.method_len = src->cls.method_len; dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); - ceph_pagelist_append(req->r_trail, src->cls.class_name, + ceph_pagelist_append(&req->r_trail, src->cls.class_name, src->cls.class_len); - ceph_pagelist_append(req->r_trail, src->cls.method_name, + ceph_pagelist_append(&req->r_trail, src->cls.method_name, src->cls.method_len); - ceph_pagelist_append(req->r_trail, src->cls.indata, + ceph_pagelist_append(&req->r_trail, src->cls.indata, src->cls.indata_len); break; - case CEPH_OSD_OP_ROLLBACK: - dst->snap.snapid = cpu_to_le64(src->snap.snapid); - break; case CEPH_OSD_OP_STARTSYNC: break; - case CEPH_OSD_OP_NOTIFY: - { - __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); - __le32 timeout = cpu_to_le32(src->watch.timeout); - - BUG_ON(!req->r_trail); - - ceph_pagelist_append(req->r_trail, - &prot_ver, sizeof(prot_ver)); - ceph_pagelist_append(req->r_trail, - &timeout, sizeof(timeout)); - } case CEPH_OSD_OP_NOTIFY_ACK: case CEPH_OSD_OP_WATCH: dst->watch.cookie = cpu_to_le64(src->watch.cookie); @@ -356,6 +257,64 @@ static void osd_req_encode_op(struct ceph_osd_request *req, pr_err("unrecognized osd opcode %d\n", dst->op); WARN_ON(1); break; + case CEPH_OSD_OP_MAPEXT: + case CEPH_OSD_OP_MASKTRUNC: + case CEPH_OSD_OP_SPARSE_READ: + case CEPH_OSD_OP_NOTIFY: + case CEPH_OSD_OP_ASSERT_VER: + case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_TRUNCATE: + case CEPH_OSD_OP_ZERO: + case CEPH_OSD_OP_DELETE: + case CEPH_OSD_OP_APPEND: + case CEPH_OSD_OP_SETTRUNC: + case CEPH_OSD_OP_TRIMTRUNC: + case CEPH_OSD_OP_TMAPUP: + case CEPH_OSD_OP_TMAPPUT: + case CEPH_OSD_OP_TMAPGET: + case CEPH_OSD_OP_CREATE: + case CEPH_OSD_OP_ROLLBACK: + case CEPH_OSD_OP_OMAPGETKEYS: + case CEPH_OSD_OP_OMAPGETVALS: + case CEPH_OSD_OP_OMAPGETHEADER: + case CEPH_OSD_OP_OMAPGETVALSBYKEYS: + case CEPH_OSD_OP_MODE_RD: + case CEPH_OSD_OP_OMAPSETVALS: + case CEPH_OSD_OP_OMAPSETHEADER: + case CEPH_OSD_OP_OMAPCLEAR: + case CEPH_OSD_OP_OMAPRMKEYS: + case CEPH_OSD_OP_OMAP_CMP: + case CEPH_OSD_OP_CLONERANGE: + case CEPH_OSD_OP_ASSERT_SRC_VERSION: + case CEPH_OSD_OP_SRC_CMPXATTR: + case CEPH_OSD_OP_GETXATTR: + case CEPH_OSD_OP_GETXATTRS: + case CEPH_OSD_OP_CMPXATTR: + case CEPH_OSD_OP_SETXATTR: + case CEPH_OSD_OP_SETXATTRS: + case CEPH_OSD_OP_RESETXATTRS: + case CEPH_OSD_OP_RMXATTR: + case CEPH_OSD_OP_PULL: + case CEPH_OSD_OP_PUSH: + case CEPH_OSD_OP_BALANCEREADS: + case CEPH_OSD_OP_UNBALANCEREADS: + case CEPH_OSD_OP_SCRUB: + case CEPH_OSD_OP_SCRUB_RESERVE: + case CEPH_OSD_OP_SCRUB_UNRESERVE: + case CEPH_OSD_OP_SCRUB_STOP: + case CEPH_OSD_OP_SCRUB_MAP: + case CEPH_OSD_OP_WRLOCK: + case CEPH_OSD_OP_WRUNLOCK: + case CEPH_OSD_OP_RDLOCK: + case CEPH_OSD_OP_RDUNLOCK: + case CEPH_OSD_OP_UPLOCK: + case CEPH_OSD_OP_DNLOCK: + case CEPH_OSD_OP_PGLS: + case CEPH_OSD_OP_PGLS_FILTER: + pr_err("unsupported osd opcode %s\n", + ceph_osd_op_name(dst->op)); + WARN_ON(1); + break; } dst->payload_len = cpu_to_le32(src->payload_len); } @@ -365,25 +324,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req, * */ void ceph_osdc_build_request(struct ceph_osd_request *req, - u64 off, u64 *plen, + u64 off, u64 len, unsigned int num_op, struct ceph_osd_req_op *src_ops, - struct ceph_snap_context *snapc, - struct timespec *mtime, - const char *oid, - int oid_len) + struct ceph_snap_context *snapc, u64 snap_id, + struct timespec *mtime) { struct ceph_msg *msg = req->r_request; struct ceph_osd_request_head *head; struct ceph_osd_req_op *src_op; struct ceph_osd_op *op; void *p; - int num_op = get_num_ops(src_ops, NULL); size_t msg_size = sizeof(*head) + num_op*sizeof(*op); int flags = req->r_flags; - u64 data_len = 0; + u64 data_len; int i; + WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); + head = msg->front.iov_base; + head->snapid = cpu_to_le64(snap_id); op = (void *)(head + 1); p = (void *)(op + num_op); @@ -393,23 +352,17 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, head->flags = cpu_to_le32(flags); if (flags & CEPH_OSD_FLAG_WRITE) ceph_encode_timespec(&head->mtime, mtime); + BUG_ON(num_op > (unsigned int) ((u16) -1)); head->num_ops = cpu_to_le16(num_op); - /* fill in oid */ - head->object_len = cpu_to_le32(oid_len); - memcpy(p, oid, oid_len); - p += oid_len; + head->object_len = cpu_to_le32(req->r_oid_len); + memcpy(p, req->r_oid, req->r_oid_len); + p += req->r_oid_len; src_op = src_ops; - while (src_op->op) { - osd_req_encode_op(req, op, src_op); - src_op++; - op++; - } - - if (req->r_trail) - data_len += req->r_trail->length; + while (num_op--) + osd_req_encode_op(req, op++, src_op++); if (snapc) { head->snap_seq = cpu_to_le64(snapc->seq); @@ -420,14 +373,12 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, } } + data_len = req->r_trail.length; if (flags & CEPH_OSD_FLAG_WRITE) { req->r_request->hdr.data_off = cpu_to_le16(off); - req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); - } else if (data_len) { - req->r_request->hdr.data_off = 0; - req->r_request->hdr.data_len = cpu_to_le32(data_len); + data_len += len; } - + req->r_request->hdr.data_len = cpu_to_le32(data_len); req->r_request->page_alignment = req->r_page_alignment; BUG_ON(p > msg->front.iov_base + msg->front.iov_len); @@ -459,34 +410,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - bool use_mempool, int num_reply, + bool use_mempool, int page_align) { - struct ceph_osd_req_op ops[3]; + struct ceph_osd_req_op ops[2]; struct ceph_osd_request *req; + unsigned int num_op = 1; int r; + memset(&ops, 0, sizeof ops); + ops[0].op = opcode; ops[0].extent.truncate_seq = truncate_seq; ops[0].extent.truncate_size = truncate_size; - ops[0].payload_len = 0; if (do_sync) { ops[1].op = CEPH_OSD_OP_STARTSYNC; - ops[1].payload_len = 0; - ops[2].op = 0; - } else - ops[1].op = 0; - - req = ceph_osdc_alloc_request(osdc, flags, - snapc, ops, - use_mempool, - GFP_NOFS, NULL, NULL); + num_op++; + } + + req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, + GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); + req->r_flags = flags; /* calculate max write size */ - r = calc_layout(osdc, vino, layout, off, plen, req, ops); + r = calc_layout(vino, layout, off, plen, req, ops); if (r < 0) return ERR_PTR(r); req->r_file_layout = *layout; /* keep a copy */ @@ -496,10 +446,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req->r_num_pages = calc_pages_for(page_align, *plen); req->r_page_alignment = page_align; - ceph_osdc_build_request(req, off, plen, ops, - snapc, - mtime, - req->r_oid, req->r_oid_len); + ceph_osdc_build_request(req, off, *plen, num_op, ops, + snapc, vino.snap, mtime); return req; } @@ -623,8 +571,8 @@ static void osd_reset(struct ceph_connection *con) down_read(&osdc->map_sem); mutex_lock(&osdc->request_mutex); __kick_osd_requests(osdc, osd); + __send_queued(osdc); mutex_unlock(&osdc->request_mutex); - send_queued(osdc); up_read(&osdc->map_sem); } @@ -739,31 +687,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc) */ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) { - struct ceph_osd_request *req; - int ret = 0; + struct ceph_entity_addr *peer_addr; dout("__reset_osd %p osd%d\n", osd, osd->o_osd); if (list_empty(&osd->o_requests) && list_empty(&osd->o_linger_requests)) { __remove_osd(osdc, osd); - ret = -ENODEV; - } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], - &osd->o_con.peer_addr, - sizeof(osd->o_con.peer_addr)) == 0 && - !ceph_con_opened(&osd->o_con)) { + + return -ENODEV; + } + + peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; + if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && + !ceph_con_opened(&osd->o_con)) { + struct ceph_osd_request *req; + dout(" osd addr hasn't changed and connection never opened," " letting msgr retry"); /* touch each r_stamp for handle_timeout()'s benfit */ list_for_each_entry(req, &osd->o_requests, r_osd_item) req->r_stamp = jiffies; - ret = -EAGAIN; - } else { - ceph_con_close(&osd->o_con); - ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, - &osdc->osdmap->osd_addr[osd->o_osd]); - osd->o_incarnation++; + + return -EAGAIN; } - return ret; + + ceph_con_close(&osd->o_con); + ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); + osd->o_incarnation++; + + return 0; } static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) @@ -1062,16 +1014,13 @@ static void __send_request(struct ceph_osd_client *osdc, /* * Send any requests in the queue (req_unsent). */ -static void send_queued(struct ceph_osd_client *osdc) +static void __send_queued(struct ceph_osd_client *osdc) { struct ceph_osd_request *req, *tmp; - dout("send_queued\n"); - mutex_lock(&osdc->request_mutex); - list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { + dout("__send_queued\n"); + list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) __send_request(osdc, req); - } - mutex_unlock(&osdc->request_mutex); } /* @@ -1123,8 +1072,8 @@ static void handle_timeout(struct work_struct *work) } __schedule_osd_timeout(osdc); + __send_queued(osdc); mutex_unlock(&osdc->request_mutex); - send_queued(osdc); up_read(&osdc->map_sem); } @@ -1462,7 +1411,9 @@ done: if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) ceph_monc_request_next_osdmap(&osdc->client->monc); - send_queued(osdc); + mutex_lock(&osdc->request_mutex); + __send_queued(osdc); + mutex_unlock(&osdc->request_mutex); up_read(&osdc->map_sem); wake_up_all(&osdc->client->auth_wq); return; @@ -1556,8 +1507,7 @@ static void __remove_event(struct ceph_osd_event *event) int ceph_osdc_create_event(struct ceph_osd_client *osdc, void (*event_cb)(u64, u64, u8, void *), - int one_shot, void *data, - struct ceph_osd_event **pevent) + void *data, struct ceph_osd_event **pevent) { struct ceph_osd_event *event; @@ -1567,14 +1517,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc, dout("create_event %p\n", event); event->cb = event_cb; - event->one_shot = one_shot; + event->one_shot = 0; event->data = data; event->osdc = osdc; INIT_LIST_HEAD(&event->osd_node); RB_CLEAR_NODE(&event->node); kref_init(&event->kref); /* one ref for us */ kref_get(&event->kref); /* one ref for the caller */ - init_completion(&event->completion); spin_lock(&osdc->event_lock); event->cookie = ++osdc->event_count; @@ -1610,7 +1559,6 @@ static void do_event_work(struct work_struct *work) dout("do_event_work completing %p\n", event); event->cb(ver, notify_id, opcode, event->data); - complete(&event->completion); dout("do_event_work completed %p\n", event); ceph_osdc_put_event(event); kfree(event_work); @@ -1620,7 +1568,8 @@ static void do_event_work(struct work_struct *work) /* * Process osd watch notifications */ -void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) +static void handle_watch_notify(struct ceph_osd_client *osdc, + struct ceph_msg *msg) { void *p, *end; u8 proto_ver; @@ -1641,9 +1590,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) spin_lock(&osdc->event_lock); event = __find_event(osdc, cookie); if (event) { + BUG_ON(event->one_shot); get_event(event); - if (event->one_shot) - __remove_event(event); } spin_unlock(&osdc->event_lock); dout("handle_watch_notify cookie %lld ver %lld event %p\n", @@ -1668,7 +1616,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) return; done_err: - complete(&event->completion); ceph_osdc_put_event(event); return; @@ -1677,21 +1624,6 @@ bad: return; } -int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout) -{ - int err; - - dout("wait_event %p\n", event); - err = wait_for_completion_interruptible_timeout(&event->completion, - timeout * HZ); - ceph_osdc_put_event(event); - if (err > 0) - err = 0; - dout("wait_event %p returns %d\n", event, err); - return err; -} -EXPORT_SYMBOL(ceph_osdc_wait_event); - /* * Register request, send initial attempt. */ @@ -1706,7 +1638,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, #ifdef CONFIG_BLOCK req->r_request->bio = req->r_bio; #endif - req->r_request->trail = req->r_trail; + req->r_request->trail = &req->r_trail; register_request(osdc, req); @@ -1865,7 +1797,6 @@ out_mempool: out: return err; } -EXPORT_SYMBOL(ceph_osdc_init); void ceph_osdc_stop(struct ceph_osd_client *osdc) { @@ -1882,7 +1813,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op_reply); } -EXPORT_SYMBOL(ceph_osdc_stop); /* * Read some contiguous pages. If we cross a stripe boundary, shorten @@ -1902,7 +1832,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, req = ceph_osdc_new_request(osdc, layout, vino, off, plen, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, 0, truncate_seq, truncate_size, NULL, - false, 1, page_align); + false, page_align); if (IS_ERR(req)) return PTR_ERR(req); @@ -1931,8 +1861,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, u64 off, u64 len, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - struct page **pages, int num_pages, - int flags, int do_sync, bool nofail) + struct page **pages, int num_pages) { struct ceph_osd_request *req; int rc = 0; @@ -1941,11 +1870,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, BUG_ON(vino.snap != CEPH_NOSNAP); req = ceph_osdc_new_request(osdc, layout, vino, off, &len, CEPH_OSD_OP_WRITE, - flags | CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE, - snapc, do_sync, + CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, + snapc, 0, truncate_seq, truncate_size, mtime, - nofail, 1, page_align); + true, page_align); if (IS_ERR(req)) return PTR_ERR(req); @@ -1954,7 +1882,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, dout("writepages %llu~%llu (%d pages)\n", off, len, req->r_num_pages); - rc = ceph_osdc_start_request(osdc, req, nofail); + rc = ceph_osdc_start_request(osdc, req, true); if (!rc) rc = ceph_osdc_wait_request(osdc, req); @@ -2047,7 +1975,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, if (data_len > 0) { int want = calc_pages_for(req->r_page_alignment, data_len); - if (unlikely(req->r_num_pages < want)) { + if (req->r_pages && unlikely(req->r_num_pages < want)) { pr_warning("tid %lld reply has %d bytes %d pages, we" " had only %d pages ready\n", tid, data_len, want, req->r_num_pages); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index de73214..3c61e216 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -13,26 +13,18 @@ char *ceph_osdmap_state_str(char *str, int len, int state) { - int flag = 0; - if (!len) - goto done; - - *str = '\0'; - if (state) { - if (state & CEPH_OSD_EXISTS) { - snprintf(str, len, "exists"); - flag = 1; - } - if (state & CEPH_OSD_UP) { - snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""), - "up"); - flag = 1; - } - } else { + return str; + + if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP)) + snprintf(str, len, "exists, up"); + else if (state & CEPH_OSD_EXISTS) + snprintf(str, len, "exists"); + else if (state & CEPH_OSD_UP) + snprintf(str, len, "up"); + else snprintf(str, len, "doesn't exist"); - } -done: + return str; } @@ -170,6 +162,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end) c->choose_local_tries = 2; c->choose_local_fallback_tries = 5; c->choose_total_tries = 19; + c->chooseleaf_descend_once = 0; ceph_decode_need(p, end, 4*sizeof(u32), bad); magic = ceph_decode_32(p); @@ -336,6 +329,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end) dout("crush decode tunable choose_total_tries = %d", c->choose_total_tries); + ceph_decode_need(p, end, sizeof(u32), done); + c->chooseleaf_descend_once = ceph_decode_32(p); + dout("crush decode tunable chooseleaf_descend_once = %d", + c->chooseleaf_descend_once); + done: dout("crush_decode success\n"); return c; @@ -1010,7 +1008,7 @@ bad: * pass a stride back to the caller. */ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, - u64 off, u64 *plen, + u64 off, u64 len, u64 *ono, u64 *oxoff, u64 *oxlen) { @@ -1021,7 +1019,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, u32 su_per_object; u64 t, su_offset; - dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen, + dout("mapping %llu~%llu osize %u fl_su %u\n", off, len, osize, su); if (su == 0 || sc == 0) goto invalid; @@ -1054,11 +1052,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, /* * Calculate the length of the extent being written to the selected - * object. This is the minimum of the full length requested (plen) or + * object. This is the minimum of the full length requested (len) or * the remainder of the current stripe being written to. */ - *oxlen = min_t(u64, *plen, su - su_offset); - *plen = *oxlen; + *oxlen = min_t(u64, len, su - su_offset); dout(" obj extent %llu~%llu\n", *oxoff, *oxlen); return 0; diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index cd9c21d..815a224 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -12,7 +12,7 @@ /* * build a vector of user pages */ -struct page **ceph_get_direct_page_vector(const char __user *data, +struct page **ceph_get_direct_page_vector(const void __user *data, int num_pages, bool write_page) { struct page **pages; @@ -93,7 +93,7 @@ EXPORT_SYMBOL(ceph_alloc_page_vector); * copy user data into a page vector */ int ceph_copy_user_to_page_vector(struct page **pages, - const char __user *data, + const void __user *data, loff_t off, size_t len) { int i = 0; @@ -118,17 +118,17 @@ int ceph_copy_user_to_page_vector(struct page **pages, } EXPORT_SYMBOL(ceph_copy_user_to_page_vector); -int ceph_copy_to_page_vector(struct page **pages, - const char *data, +void ceph_copy_to_page_vector(struct page **pages, + const void *data, loff_t off, size_t len) { int i = 0; size_t po = off & ~PAGE_CACHE_MASK; size_t left = len; - size_t l; while (left > 0) { - l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + memcpy(page_address(pages[i]) + po, data, l); data += l; left -= l; @@ -138,21 +138,20 @@ int ceph_copy_to_page_vector(struct page **pages, i++; } } - return len; } EXPORT_SYMBOL(ceph_copy_to_page_vector); -int ceph_copy_from_page_vector(struct page **pages, - char *data, +void ceph_copy_from_page_vector(struct page **pages, + void *data, loff_t off, size_t len) { int i = 0; size_t po = off & ~PAGE_CACHE_MASK; size_t left = len; - size_t l; while (left > 0) { - l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + memcpy(data, page_address(pages[i]) + po, l); data += l; left -= l; @@ -162,7 +161,6 @@ int ceph_copy_from_page_vector(struct page **pages, i++; } } - return len; } EXPORT_SYMBOL(ceph_copy_from_page_vector); @@ -170,7 +168,7 @@ EXPORT_SYMBOL(ceph_copy_from_page_vector); * copy user data from a page vector into a user pointer */ int ceph_copy_page_vector_to_user(struct page **pages, - char __user *data, + void __user *data, loff_t off, size_t len) { int i = 0; |