diff options
-rw-r--r-- | Documentation/ABI/testing/sysfs-bus-rbd | 4 | ||||
-rw-r--r-- | drivers/block/rbd.c | 72 | ||||
-rw-r--r-- | fs/ceph/file.c | 1 | ||||
-rw-r--r-- | fs/ceph/ioctl.c | 102 | ||||
-rw-r--r-- | fs/ceph/ioctl.h | 2 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 54 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 5 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 9 | ||||
-rw-r--r-- | include/linux/ceph/auth.h | 12 | ||||
-rw-r--r-- | include/linux/ceph/ceph_fs.h | 4 | ||||
-rw-r--r-- | include/linux/ceph/decode.h | 9 | ||||
-rw-r--r-- | include/linux/ceph/messenger.h | 6 | ||||
-rw-r--r-- | include/linux/ceph/osd_client.h | 11 | ||||
-rw-r--r-- | include/linux/ceph/osdmap.h | 2 | ||||
-rw-r--r-- | include/linux/crush/crush.h | 18 | ||||
-rw-r--r-- | include/linux/crush/mapper.h | 7 | ||||
-rw-r--r-- | net/ceph/auth_none.c | 15 | ||||
-rw-r--r-- | net/ceph/auth_x.c | 15 | ||||
-rw-r--r-- | net/ceph/crush/crush.c | 39 | ||||
-rw-r--r-- | net/ceph/crush/mapper.c | 124 | ||||
-rw-r--r-- | net/ceph/messenger.c | 182 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 63 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 73 |
23 files changed, 376 insertions, 453 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index dbedafb..bcd88eb 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -65,11 +65,11 @@ snap_* Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name> ------------------------------------------------------------- -id +snap_id The rados internal snapshot id assigned for this snapshot -size +snap_size The size of the image when this snapshot was taken. diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 013c7a5..65665c9 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -141,7 +141,7 @@ struct rbd_request { struct rbd_snap { struct device dev; const char *name; - size_t size; + u64 size; struct list_head node; u64 id; }; @@ -175,8 +175,7 @@ struct rbd_device { /* protects updating the header */ struct rw_semaphore header_rwsem; char snap_name[RBD_MAX_SNAP_NAME_LEN]; - u32 cur_snap; /* index+1 of current snapshot within snap context - 0 - for the head */ + u64 snap_id; /* current snapshot id */ int read_only; struct list_head node; @@ -241,7 +240,7 @@ static void rbd_put_dev(struct rbd_device *rbd_dev) put_device(&rbd_dev->dev); } -static int __rbd_update_snaps(struct rbd_device *rbd_dev); +static int __rbd_refresh_header(struct rbd_device *rbd_dev); static int rbd_open(struct block_device *bdev, fmode_t mode) { @@ -450,7 +449,9 @@ static void rbd_client_release(struct kref *kref) struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); dout("rbd_release_client %p\n", rbdc); + spin_lock(&rbd_client_list_lock); list_del(&rbdc->node); + spin_unlock(&rbd_client_list_lock); ceph_destroy_client(rbdc->client); kfree(rbdc->rbd_opts); @@ -463,9 +464,7 @@ static void rbd_client_release(struct kref *kref) */ static void rbd_put_client(struct rbd_device *rbd_dev) { - spin_lock(&rbd_client_list_lock); kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); - spin_unlock(&rbd_client_list_lock); rbd_dev->rbd_client = NULL; } @@ -487,16 +486,18 @@ static void rbd_coll_release(struct kref *kref) */ static int rbd_header_from_disk(struct rbd_image_header *header, struct rbd_image_header_ondisk *ondisk, - int allocated_snaps, + u32 allocated_snaps, gfp_t gfp_flags) { - int i; - u32 snap_count; + u32 i, snap_count; if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) return -ENXIO; snap_count = le32_to_cpu(ondisk->snap_count); + if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context)) + / sizeof (*ondisk)) + return -EINVAL; header->snapc = kmalloc(sizeof(struct ceph_snap_context) + snap_count * sizeof (*ondisk), gfp_flags); @@ -506,11 +507,11 @@ static int rbd_header_from_disk(struct rbd_image_header *header, header->snap_names_len = le64_to_cpu(ondisk->snap_names_len); if (snap_count) { header->snap_names = kmalloc(header->snap_names_len, - GFP_KERNEL); + gfp_flags); if (!header->snap_names) goto err_snapc; header->snap_sizes = kmalloc(snap_count * sizeof(u64), - GFP_KERNEL); + gfp_flags); if (!header->snap_sizes) goto err_names; } else { @@ -552,21 +553,6 @@ err_snapc: return -ENOMEM; } -static int snap_index(struct rbd_image_header *header, int snap_num) -{ - return header->total_snaps - snap_num; -} - -static u64 cur_snap_id(struct rbd_device *rbd_dev) -{ - struct rbd_image_header *header = &rbd_dev->header; - - if (!rbd_dev->cur_snap) - return 0; - - return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)]; -} - static int snap_by_name(struct rbd_image_header *header, const char *snap_name, u64 *seq, u64 *size) { @@ -605,7 +591,7 @@ static int rbd_header_set_snap(struct rbd_device *dev, u64 *size) snapc->seq = header->snap_seq; else snapc->seq = 0; - dev->cur_snap = 0; + dev->snap_id = CEPH_NOSNAP; dev->read_only = 0; if (size) *size = header->image_size; @@ -613,8 +599,7 @@ static int rbd_header_set_snap(struct rbd_device *dev, u64 *size) ret = snap_by_name(header, dev->snap_name, &snapc->seq, size); if (ret < 0) goto done; - - dev->cur_snap = header->total_snaps - ret; + dev->snap_id = snapc->seq; dev->read_only = 1; } @@ -935,7 +920,6 @@ static int rbd_do_request(struct request *rq, layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); layout->fl_stripe_count = cpu_to_le32(1); layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); - layout->fl_pg_preferred = cpu_to_le32(-1); layout->fl_pg_pool = cpu_to_le32(dev->poolid); ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, req, ops); @@ -1168,7 +1152,7 @@ static int rbd_req_read(struct request *rq, int coll_index) { return rbd_do_op(rq, rbd_dev, NULL, - (snapid ? snapid : CEPH_NOSNAP), + snapid, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2, @@ -1187,7 +1171,7 @@ static int rbd_req_sync_read(struct rbd_device *dev, u64 *ver) { return rbd_req_sync_op(dev, NULL, - (snapid ? snapid : CEPH_NOSNAP), + snapid, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, @@ -1238,7 +1222,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, notify_id, (int)opcode); mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - rc = __rbd_update_snaps(dev); + rc = __rbd_refresh_header(dev); mutex_unlock(&ctl_mutex); if (rc) pr_warning(RBD_DRV_NAME "%d got notification but failed to " @@ -1521,7 +1505,7 @@ static void rbd_rq_fn(struct request_queue *q) coll, cur_seg); else rbd_req_read(rq, rbd_dev, - cur_snap_id(rbd_dev), + rbd_dev->snap_id, ofs, op_size, bio, coll, cur_seg); @@ -1592,7 +1576,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, { ssize_t rc; struct rbd_image_header_ondisk *dh; - int snap_count = 0; + u32 snap_count = 0; u64 ver; size_t len; @@ -1656,7 +1640,7 @@ static int rbd_header_add_snap(struct rbd_device *dev, struct ceph_mon_client *monc; /* we should create a snapshot only if we're pointing at the head */ - if (dev->cur_snap) + if (dev->snap_id != CEPH_NOSNAP) return -EINVAL; monc = &dev->rbd_client->client->monc; @@ -1683,7 +1667,9 @@ static int rbd_header_add_snap(struct rbd_device *dev, if (ret < 0) return ret; - dev->header.snapc->seq = new_snapid; + down_write(&dev->header_rwsem); + dev->header.snapc->seq = new_snapid; + up_write(&dev->header_rwsem); return 0; bad: @@ -1703,7 +1689,7 @@ static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) /* * only read the first part of the ondisk header, without the snaps info */ -static int __rbd_update_snaps(struct rbd_device *rbd_dev) +static int __rbd_refresh_header(struct rbd_device *rbd_dev) { int ret; struct rbd_image_header h; @@ -1890,7 +1876,7 @@ static ssize_t rbd_image_refresh(struct device *dev, mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - rc = __rbd_update_snaps(rbd_dev); + rc = __rbd_refresh_header(rbd_dev); if (rc < 0) ret = rc; @@ -1949,7 +1935,7 @@ static ssize_t rbd_snap_size_show(struct device *dev, { struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); - return sprintf(buf, "%zd\n", snap->size); + return sprintf(buf, "%llu\n", (unsigned long long)snap->size); } static ssize_t rbd_snap_id_show(struct device *dev, @@ -1958,7 +1944,7 @@ static ssize_t rbd_snap_id_show(struct device *dev, { struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); - return sprintf(buf, "%llu\n", (unsigned long long) snap->id); + return sprintf(buf, "%llu\n", (unsigned long long)snap->id); } static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); @@ -2173,7 +2159,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev) rbd_dev->header.obj_version); if (ret == -ERANGE) { mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - rc = __rbd_update_snaps(rbd_dev); + rc = __rbd_refresh_header(rbd_dev); mutex_unlock(&ctl_mutex); if (rc < 0) return rc; @@ -2558,7 +2544,7 @@ static ssize_t rbd_snap_add(struct device *dev, if (ret < 0) goto err_unlock; - ret = __rbd_update_snaps(rbd_dev); + ret = __rbd_refresh_header(rbd_dev); if (ret < 0) goto err_unlock; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index ed72428..988d4f3 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -54,7 +54,6 @@ prepare_open_request(struct super_block *sb, int flags, int create_mode) req->r_fmode = ceph_flags_to_mode(flags); req->r_args.open.flags = cpu_to_le32(flags); req->r_args.open.mode = cpu_to_le32(create_mode); - req->r_args.open.preferred = cpu_to_le32(-1); out: return req; } diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 790914a59..8e3fb69 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -26,8 +26,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg) l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); l.object_size = ceph_file_layout_object_size(ci->i_layout); l.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool); - l.preferred_osd = - (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred); + l.preferred_osd = (s32)-1; if (copy_to_user(arg, &l, sizeof(l))) return -EFAULT; } @@ -35,6 +34,32 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg) return err; } +static long __validate_layout(struct ceph_mds_client *mdsc, + struct ceph_ioctl_layout *l) +{ + int i, err; + + /* validate striping parameters */ + if ((l->object_size & ~PAGE_MASK) || + (l->stripe_unit & ~PAGE_MASK) || + ((unsigned)l->object_size % (unsigned)l->stripe_unit)) + return -EINVAL; + + /* make sure it's a valid data pool */ + mutex_lock(&mdsc->mutex); + err = -EINVAL; + for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++) + if (mdsc->mdsmap->m_data_pg_pools[i] == l->data_pool) { + err = 0; + break; + } + mutex_unlock(&mdsc->mutex); + if (err) + return err; + + return 0; +} + static long ceph_ioctl_set_layout(struct file *file, void __user *arg) { struct inode *inode = file->f_dentry->d_inode; @@ -44,52 +69,40 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) struct ceph_ioctl_layout l; struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode); struct ceph_ioctl_layout nl; - int err, i; + int err; if (copy_from_user(&l, arg, sizeof(l))) return -EFAULT; /* validate changed params against current layout */ err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT); - if (!err) { - nl.stripe_unit = ceph_file_layout_su(ci->i_layout); - nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); - nl.object_size = ceph_file_layout_object_size(ci->i_layout); - nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool); - nl.preferred_osd = - (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred); - } else + if (err) return err; + memset(&nl, 0, sizeof(nl)); if (l.stripe_count) nl.stripe_count = l.stripe_count; + else + nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); if (l.stripe_unit) nl.stripe_unit = l.stripe_unit; + else + nl.stripe_unit = ceph_file_layout_su(ci->i_layout); if (l.object_size) nl.object_size = l.object_size; + else + nl.object_size = ceph_file_layout_object_size(ci->i_layout); if (l.data_pool) nl.data_pool = l.data_pool; - if (l.preferred_osd) - nl.preferred_osd = l.preferred_osd; + else + nl.data_pool = ceph_file_layout_pg_pool(ci->i_layout); - if ((nl.object_size & ~PAGE_MASK) || - (nl.stripe_unit & ~PAGE_MASK) || - ((unsigned)nl.object_size % (unsigned)nl.stripe_unit)) - return -EINVAL; + /* this is obsolete, and always -1 */ + nl.preferred_osd = le64_to_cpu(-1); - /* make sure it's a valid data pool */ - if (l.data_pool > 0) { - mutex_lock(&mdsc->mutex); - err = -EINVAL; - for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++) - if (mdsc->mdsmap->m_data_pg_pools[i] == l.data_pool) { - err = 0; - break; - } - mutex_unlock(&mdsc->mutex); - if (err) - return err; - } + err = __validate_layout(mdsc, &nl); + if (err) + return err; req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETLAYOUT, USE_AUTH_MDS); @@ -106,8 +119,6 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) req->r_args.setlayout.layout.fl_object_size = cpu_to_le32(l.object_size); req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool); - req->r_args.setlayout.layout.fl_pg_preferred = - cpu_to_le32(l.preferred_osd); parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); err = ceph_mdsc_do_request(mdsc, parent_inode, req); @@ -127,33 +138,16 @@ static long ceph_ioctl_set_layout_policy (struct file *file, void __user *arg) struct inode *inode = file->f_dentry->d_inode; struct ceph_mds_request *req; struct ceph_ioctl_layout l; - int err, i; + int err; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; /* copy and validate */ if (copy_from_user(&l, arg, sizeof(l))) return -EFAULT; - if ((l.object_size & ~PAGE_MASK) || - (l.stripe_unit & ~PAGE_MASK) || - !l.stripe_unit || - (l.object_size && - (unsigned)l.object_size % (unsigned)l.stripe_unit)) - return -EINVAL; - - /* make sure it's a valid data pool */ - if (l.data_pool > 0) { - mutex_lock(&mdsc->mutex); - err = -EINVAL; - for (i = 0; i < mdsc->mdsmap->m_num_data_pg_pools; i++) - if (mdsc->mdsmap->m_data_pg_pools[i] == l.data_pool) { - err = 0; - break; - } - mutex_unlock(&mdsc->mutex); - if (err) - return err; - } + err = __validate_layout(mdsc, &l); + if (err) + return err; req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETDIRLAYOUT, USE_AUTH_MDS); @@ -171,8 +165,6 @@ static long ceph_ioctl_set_layout_policy (struct file *file, void __user *arg) cpu_to_le32(l.object_size); req->r_args.setlayout.layout.fl_pg_pool = cpu_to_le32(l.data_pool); - req->r_args.setlayout.layout.fl_pg_preferred = - cpu_to_le32(l.preferred_osd); err = ceph_mdsc_do_request(mdsc, inode, req); ceph_mdsc_put_request(req); diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h index be4a604..c77028a 100644 --- a/fs/ceph/ioctl.h +++ b/fs/ceph/ioctl.h @@ -34,6 +34,8 @@ struct ceph_ioctl_layout { __u64 stripe_unit, stripe_count, object_size; __u64 data_pool; + + /* obsolete. new values ignored, always return -1 */ __s64 preferred_osd; }; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 89971e1..200bc87 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -334,10 +334,10 @@ void ceph_put_mds_session(struct ceph_mds_session *s) dout("mdsc put_session %p %d -> %d\n", s, atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); if (atomic_dec_and_test(&s->s_ref)) { - if (s->s_authorizer) + if (s->s_auth.authorizer) s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer( s->s_mdsc->fsc->client->monc.auth, - s->s_authorizer); + s->s_auth.authorizer); kfree(s); } } @@ -3395,39 +3395,33 @@ out: /* * authentication */ -static int get_authorizer(struct ceph_connection *con, - void **buf, int *len, int *proto, - void **reply_buf, int *reply_len, int force_new) + +/* + * Note: returned pointer is the address of a structure that's + * managed separately. Caller must *not* attempt to free it. + */ +static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, + int *proto, int force_new) { struct ceph_mds_session *s = con->private; struct ceph_mds_client *mdsc = s->s_mdsc; struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; - int ret = 0; - - if (force_new && s->s_authorizer) { - ac->ops->destroy_authorizer(ac, s->s_authorizer); - s->s_authorizer = NULL; - } - if (s->s_authorizer == NULL) { - if (ac->ops->create_authorizer) { - ret = ac->ops->create_authorizer( - ac, CEPH_ENTITY_TYPE_MDS, - &s->s_authorizer, - &s->s_authorizer_buf, - &s->s_authorizer_buf_len, - &s->s_authorizer_reply_buf, - &s->s_authorizer_reply_buf_len); - if (ret) - return ret; - } - } + struct ceph_auth_handshake *auth = &s->s_auth; + if (force_new && auth->authorizer) { + if (ac->ops && ac->ops->destroy_authorizer) + ac->ops->destroy_authorizer(ac, auth->authorizer); + auth->authorizer = NULL; + } + if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { + int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, + auth); + if (ret) + return ERR_PTR(ret); + } *proto = ac->protocol; - *buf = s->s_authorizer_buf; - *len = s->s_authorizer_buf_len; - *reply_buf = s->s_authorizer_reply_buf; - *reply_len = s->s_authorizer_reply_buf_len; - return 0; + + return auth; } @@ -3437,7 +3431,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len) struct ceph_mds_client *mdsc = s->s_mdsc; struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; - return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len); + return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len); } static int invalidate_authorizer(struct ceph_connection *con) diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 8c7c04e..dd26846 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -11,6 +11,7 @@ #include <linux/ceph/types.h> #include <linux/ceph/messenger.h> #include <linux/ceph/mdsmap.h> +#include <linux/ceph/auth.h> /* * Some lock dependencies: @@ -113,9 +114,7 @@ struct ceph_mds_session { struct ceph_connection s_con; - struct ceph_authorizer *s_authorizer; - void *s_authorizer_buf, *s_authorizer_reply_buf; - size_t s_authorizer_buf_len, s_authorizer_reply_buf_len; + struct ceph_auth_handshake s_auth; /* protected by s_gen_ttl_lock */ spinlock_t s_gen_ttl_lock; diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 35b8633..785cb30 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -118,15 +118,6 @@ static size_t ceph_vxattrcb_file_layout(struct ceph_inode_info *ci, char *val, (unsigned long long)ceph_file_layout_su(ci->i_layout), (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), (unsigned long long)ceph_file_layout_object_size(ci->i_layout)); - - if (ceph_file_layout_pg_preferred(ci->i_layout) >= 0) { - val += ret; - size -= ret; - ret += snprintf(val, size, "preferred_osd=%lld\n", - (unsigned long long)ceph_file_layout_pg_preferred( - ci->i_layout)); - } - return ret; } diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h index aa13392..d4080f3 100644 --- a/include/linux/ceph/auth.h +++ b/include/linux/ceph/auth.h @@ -14,6 +14,14 @@ struct ceph_auth_client; struct ceph_authorizer; +struct ceph_auth_handshake { + struct ceph_authorizer *authorizer; + void *authorizer_buf; + size_t authorizer_buf_len; + void *authorizer_reply_buf; + size_t authorizer_reply_buf_len; +}; + struct ceph_auth_client_ops { const char *name; @@ -43,9 +51,7 @@ struct ceph_auth_client_ops { * the response to authenticate the service. */ int (*create_authorizer)(struct ceph_auth_client *ac, int peer_type, - struct ceph_authorizer **a, - void **buf, size_t *len, - void **reply_buf, size_t *reply_len); + struct ceph_auth_handshake *auth); int (*verify_authorizer_reply)(struct ceph_auth_client *ac, struct ceph_authorizer *a, size_t len); void (*destroy_authorizer)(struct ceph_auth_client *ac, diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index b8c6069..e81ab30 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -65,7 +65,7 @@ struct ceph_file_layout { __le32 fl_object_stripe_unit; /* UNUSED. for per-object parity, if any */ /* object -> pg layout */ - __le32 fl_pg_preferred; /* preferred primary for pg (-1 for none) */ + __le32 fl_unused; /* unused; used to be preferred primary (-1) */ __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ } __attribute__ ((packed)); @@ -384,7 +384,7 @@ union ceph_mds_request_args { __le32 stripe_count; /* ... */ __le32 object_size; __le32 file_replication; - __le32 preferred; + __le32 unused; /* used to be preferred osd */ } __attribute__ ((packed)) open; struct { __le32 flags; diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h index 220ae21..d8615de 100644 --- a/include/linux/ceph/decode.h +++ b/include/linux/ceph/decode.h @@ -46,9 +46,14 @@ static inline void ceph_decode_copy(void **p, void *pv, size_t n) /* * bounds check input. */ +static inline int ceph_has_room(void **p, void *end, size_t n) +{ + return end >= *p && n <= end - *p; +} + #define ceph_decode_need(p, end, n, bad) \ do { \ - if (unlikely(*(p) + (n) > (end))) \ + if (!likely(ceph_has_room(p, end, n))) \ goto bad; \ } while (0) @@ -167,7 +172,7 @@ static inline void ceph_encode_string(void **p, void *end, #define ceph_encode_need(p, end, n, bad) \ do { \ - if (unlikely(*(p) + (n) > (end))) \ + if (!likely(ceph_has_room(p, end, n))) \ goto bad; \ } while (0) diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 3bff047..2521a95 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -25,9 +25,9 @@ struct ceph_connection_operations { void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m); /* authorize an outgoing connection */ - int (*get_authorizer) (struct ceph_connection *con, - void **buf, int *len, int *proto, - void **reply_buf, int *reply_len, int force_new); + struct ceph_auth_handshake *(*get_authorizer) ( + struct ceph_connection *con, + int *proto, int force_new); int (*verify_authorizer_reply) (struct ceph_connection *con, int len); int (*invalidate_authorizer)(struct ceph_connection *con); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 7c05ac2..cedfb1a 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -6,9 +6,10 @@ #include <linux/mempool.h> #include <linux/rbtree.h> -#include "types.h" -#include "osdmap.h" -#include "messenger.h" +#include <linux/ceph/types.h> +#include <linux/ceph/osdmap.h> +#include <linux/ceph/messenger.h> +#include <linux/ceph/auth.h> /* * Maximum object name size @@ -40,9 +41,7 @@ struct ceph_osd { struct list_head o_requests; struct list_head o_linger_requests; struct list_head o_osd_lru; - struct ceph_authorizer *o_authorizer; - void *o_authorizer_buf, *o_authorizer_reply_buf; - size_t o_authorizer_buf_len, o_authorizer_reply_buf_len; + struct ceph_auth_handshake o_auth; unsigned long lru_ttl; int o_marked_for_keepalive; struct list_head o_keepalive_item; diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index ba4c205..311ef8d 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -65,8 +65,6 @@ struct ceph_osdmap { #define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash)) #define ceph_file_layout_object_su(l) \ ((__s32)le32_to_cpu((l).fl_object_stripe_unit)) -#define ceph_file_layout_pg_preferred(l) \ - ((__s32)le32_to_cpu((l).fl_pg_preferred)) #define ceph_file_layout_pg_pool(l) \ ((__s32)le32_to_cpu((l).fl_pg_pool)) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index 97e435b..7c47508 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -151,16 +151,6 @@ struct crush_map { struct crush_bucket **buckets; struct crush_rule **rules; - /* - * Parent pointers to identify the parent bucket a device or - * bucket in the hierarchy. If an item appears more than - * once, this is the _last_ time it appeared (where buckets - * are processed in bucket id order, from -1 on down to - * -max_buckets. - */ - __u32 *bucket_parents; - __u32 *device_parents; - __s32 max_buckets; __u32 max_rules; __s32 max_devices; @@ -168,8 +158,7 @@ struct crush_map { /* crush.c */ -extern int crush_get_bucket_item_weight(struct crush_bucket *b, int pos); -extern void crush_calc_parents(struct crush_map *map); +extern int crush_get_bucket_item_weight(const struct crush_bucket *b, int pos); extern void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b); extern void crush_destroy_bucket_list(struct crush_bucket_list *b); extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b); @@ -177,4 +166,9 @@ extern void crush_destroy_bucket_straw(struct crush_bucket_straw *b); extern void crush_destroy_bucket(struct crush_bucket *b); extern void crush_destroy(struct crush_map *map); +static inline int crush_calc_tree_node(int i) +{ + return ((i+1) << 1)-1; +} + #endif diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h index c46b99c..71d79f4 100644 --- a/include/linux/crush/mapper.h +++ b/include/linux/crush/mapper.h @@ -10,11 +10,10 @@ #include "crush.h" -extern int crush_find_rule(struct crush_map *map, int pool, int type, int size); -extern int crush_do_rule(struct crush_map *map, +extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size); +extern int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - int forcefeed, /* -1 for none */ - __u32 *weights); + const __u32 *weights); #endif diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c index 214c2bb..925ca58 100644 --- a/net/ceph/auth_none.c +++ b/net/ceph/auth_none.c @@ -59,9 +59,7 @@ static int handle_reply(struct ceph_auth_client *ac, int result, */ static int ceph_auth_none_create_authorizer( struct ceph_auth_client *ac, int peer_type, - struct ceph_authorizer **a, - void **buf, size_t *len, - void **reply_buf, size_t *reply_len) + struct ceph_auth_handshake *auth) { struct ceph_auth_none_info *ai = ac->private; struct ceph_none_authorizer *au = &ai->au; @@ -82,11 +80,12 @@ static int ceph_auth_none_create_authorizer( dout("built authorizer len %d\n", au->buf_len); } - *a = (struct ceph_authorizer *)au; - *buf = au->buf; - *len = au->buf_len; - *reply_buf = au->reply_buf; - *reply_len = sizeof(au->reply_buf); + auth->authorizer = (struct ceph_authorizer *) au; + auth->authorizer_buf = au->buf; + auth->authorizer_buf_len = au->buf_len; + auth->authorizer_reply_buf = au->reply_buf; + auth->authorizer_reply_buf_len = sizeof (au->reply_buf); + return 0; bad2: diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c index 1587dc6..a16bf14 100644 --- a/net/ceph/auth_x.c +++ b/net/ceph/auth_x.c @@ -526,9 +526,7 @@ static int ceph_x_handle_reply(struct ceph_auth_client *ac, int result, static int ceph_x_create_authorizer( struct ceph_auth_client *ac, int peer_type, - struct ceph_authorizer **a, - void **buf, size_t *len, - void **reply_buf, size_t *reply_len) + struct ceph_auth_handshake *auth) { struct ceph_x_authorizer *au; struct ceph_x_ticket_handler *th; @@ -548,11 +546,12 @@ static int ceph_x_create_authorizer( return ret; } - *a = (struct ceph_authorizer *)au; - *buf = au->buf->vec.iov_base; - *len = au->buf->vec.iov_len; - *reply_buf = au->reply_buf; - *reply_len = sizeof(au->reply_buf); + auth->authorizer = (struct ceph_authorizer *) au; + auth->authorizer_buf = au->buf->vec.iov_base; + auth->authorizer_buf_len = au->buf->vec.iov_len; + auth->authorizer_reply_buf = au->reply_buf; + auth->authorizer_reply_buf_len = sizeof (au->reply_buf); + return 0; } diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index d6ebb13..0896132 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -26,9 +26,9 @@ const char *crush_bucket_alg_name(int alg) * @b: bucket pointer * @p: item index in bucket */ -int crush_get_bucket_item_weight(struct crush_bucket *b, int p) +int crush_get_bucket_item_weight(const struct crush_bucket *b, int p) { - if (p >= b->size) + if ((__u32)p >= b->size) return 0; switch (b->alg) { @@ -37,38 +37,13 @@ int crush_get_bucket_item_weight(struct crush_bucket *b, int p) case CRUSH_BUCKET_LIST: return ((struct crush_bucket_list *)b)->item_weights[p]; case CRUSH_BUCKET_TREE: - if (p & 1) - return ((struct crush_bucket_tree *)b)->node_weights[p]; - return 0; + return ((struct crush_bucket_tree *)b)->node_weights[crush_calc_tree_node(p)]; case CRUSH_BUCKET_STRAW: return ((struct crush_bucket_straw *)b)->item_weights[p]; } return 0; } -/** - * crush_calc_parents - Calculate parent vectors for the given crush map. - * @map: crush_map pointer - */ -void crush_calc_parents(struct crush_map *map) -{ - int i, b, c; - - for (b = 0; b < map->max_buckets; b++) { - if (map->buckets[b] == NULL) - continue; - for (i = 0; i < map->buckets[b]->size; i++) { - c = map->buckets[b]->items[i]; - BUG_ON(c >= map->max_devices || - c < -map->max_buckets); - if (c >= 0) - map->device_parents[c] = map->buckets[b]->id; - else - map->bucket_parents[-1-c] = map->buckets[b]->id; - } - } -} - void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b) { kfree(b->h.perm); @@ -87,6 +62,8 @@ void crush_destroy_bucket_list(struct crush_bucket_list *b) void crush_destroy_bucket_tree(struct crush_bucket_tree *b) { + kfree(b->h.perm); + kfree(b->h.items); kfree(b->node_weights); kfree(b); } @@ -124,10 +101,9 @@ void crush_destroy_bucket(struct crush_bucket *b) */ void crush_destroy(struct crush_map *map) { - int b; - /* buckets */ if (map->buckets) { + __s32 b; for (b = 0; b < map->max_buckets; b++) { if (map->buckets[b] == NULL) continue; @@ -138,13 +114,12 @@ void crush_destroy(struct crush_map *map) /* rules */ if (map->rules) { + __u32 b; for (b = 0; b < map->max_rules; b++) kfree(map->rules[b]); kfree(map->rules); } - kfree(map->bucket_parents); - kfree(map->device_parents); kfree(map); } diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 363f8f7..d7edc24 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -33,9 +33,9 @@ * @type: storage ruleset type (user defined) * @size: output set size */ -int crush_find_rule(struct crush_map *map, int ruleset, int type, int size) +int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size) { - int i; + __u32 i; for (i = 0; i < map->max_rules; i++) { if (map->rules[i] && @@ -73,7 +73,7 @@ static int bucket_perm_choose(struct crush_bucket *bucket, unsigned int i, s; /* start a new permutation if @x has changed */ - if (bucket->perm_x != x || bucket->perm_n == 0) { + if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) { dprintk("bucket %d new x=%d\n", bucket->id, x); bucket->perm_x = x; @@ -153,8 +153,8 @@ static int bucket_list_choose(struct crush_bucket_list *bucket, return bucket->h.items[i]; } - BUG_ON(1); - return 0; + dprintk("bad list sums for bucket %d\n", bucket->h.id); + return bucket->h.items[0]; } @@ -220,7 +220,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket, static int bucket_straw_choose(struct crush_bucket_straw *bucket, int x, int r) { - int i; + __u32 i; int high = 0; __u64 high_draw = 0; __u64 draw; @@ -240,6 +240,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket, static int crush_bucket_choose(struct crush_bucket *in, int x, int r) { dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r); + BUG_ON(in->size == 0); switch (in->alg) { case CRUSH_BUCKET_UNIFORM: return bucket_uniform_choose((struct crush_bucket_uniform *)in, @@ -254,7 +255,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r) return bucket_straw_choose((struct crush_bucket_straw *)in, x, r); default: - BUG_ON(1); + dprintk("unknown bucket %d alg %d\n", in->id, in->alg); return in->items[0]; } } @@ -263,7 +264,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r) * true if device is marked "out" (failed, fully offloaded) * of the cluster */ -static int is_out(struct crush_map *map, __u32 *weight, int item, int x) +static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x) { if (weight[item] >= 0x10000) return 0; @@ -288,16 +289,16 @@ static int is_out(struct crush_map *map, __u32 *weight, int item, int x) * @recurse_to_leaf: true if we want one device under each item of given type * @out2: second output vector for leaf items (if @recurse_to_leaf) */ -static int crush_choose(struct crush_map *map, +static int crush_choose(const struct crush_map *map, struct crush_bucket *bucket, - __u32 *weight, + const __u32 *weight, int x, int numrep, int type, int *out, int outpos, int firstn, int recurse_to_leaf, int *out2) { int rep; - int ftotal, flocal; + unsigned int ftotal, flocal; int retry_descent, retry_bucket, skip_rep; struct crush_bucket *in = bucket; int r; @@ -305,7 +306,7 @@ static int crush_choose(struct crush_map *map, int item = 0; int itemtype; int collide, reject; - const int orig_tries = 5; /* attempts before we fall back to search */ + const unsigned int orig_tries = 5; /* attempts before we fall back to search */ dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", bucket->id, x, outpos, numrep); @@ -326,7 +327,7 @@ static int crush_choose(struct crush_map *map, r = rep; if (in->alg == CRUSH_BUCKET_UNIFORM) { /* be careful */ - if (firstn || numrep >= in->size) + if (firstn || (__u32)numrep >= in->size) /* r' = r + f_total */ r += ftotal; else if (in->size % numrep == 0) @@ -355,7 +356,11 @@ static int crush_choose(struct crush_map *map, item = bucket_perm_choose(in, x, r); else item = crush_bucket_choose(in, x, r); - BUG_ON(item >= map->max_devices); + if (item >= map->max_devices) { + dprintk(" bad item %d\n", item); + skip_rep = 1; + break; + } /* desired type? */ if (item < 0) @@ -366,8 +371,12 @@ static int crush_choose(struct crush_map *map, /* keep going? */ if (itemtype != type) { - BUG_ON(item >= 0 || - (-1-item) >= map->max_buckets); + if (item >= 0 || + (-1-item) >= map->max_buckets) { + dprintk(" bad item type %d\n", type); + skip_rep = 1; + break; + } in = map->buckets[-1-item]; retry_bucket = 1; continue; @@ -416,7 +425,7 @@ reject: if (collide && flocal < 3) /* retry locally a few times */ retry_bucket = 1; - else if (flocal < in->size + orig_tries) + else if (flocal <= in->size + orig_tries) /* exhaustive bucket search */ retry_bucket = 1; else if (ftotal < 20) @@ -426,7 +435,7 @@ reject: /* else give up */ skip_rep = 1; dprintk(" reject %d collide %d " - "ftotal %d flocal %d\n", + "ftotal %u flocal %u\n", reject, collide, ftotal, flocal); } @@ -455,15 +464,12 @@ reject: * @x: hash input * @result: pointer to result vector * @result_max: maximum result size - * @force: force initial replica choice; -1 for none */ -int crush_do_rule(struct crush_map *map, +int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, - int force, __u32 *weight) + const __u32 *weight) { int result_len; - int force_context[CRUSH_MAX_DEPTH]; - int force_pos = -1; int a[CRUSH_MAX_SET]; int b[CRUSH_MAX_SET]; int c[CRUSH_MAX_SET]; @@ -474,66 +480,44 @@ int crush_do_rule(struct crush_map *map, int osize; int *tmp; struct crush_rule *rule; - int step; + __u32 step; int i, j; int numrep; int firstn; - BUG_ON(ruleno >= map->max_rules); + if ((__u32)ruleno >= map->max_rules) { + dprintk(" bad ruleno %d\n", ruleno); + return 0; + } rule = map->rules[ruleno]; result_len = 0; w = a; o = b; - /* - * determine hierarchical context of force, if any. note - * that this may or may not correspond to the specific types - * referenced by the crush rule. - */ - if (force >= 0 && - force < map->max_devices && - map->device_parents[force] != 0 && - !is_out(map, weight, force, x)) { - while (1) { - force_context[++force_pos] = force; - if (force >= 0) - force = map->device_parents[force]; - else - force = map->bucket_parents[-1-force]; - if (force == 0) - break; - } - } - for (step = 0; step < rule->len; step++) { + struct crush_rule_step *curstep = &rule->steps[step]; + firstn = 0; - switch (rule->steps[step].op) { + switch (curstep->op) { case CRUSH_RULE_TAKE: - w[0] = rule->steps[step].arg1; - - /* find position in force_context/hierarchy */ - while (force_pos >= 0 && - force_context[force_pos] != w[0]) - force_pos--; - /* and move past it */ - if (force_pos >= 0) - force_pos--; - + w[0] = curstep->arg1; wsize = 1; break; case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: case CRUSH_RULE_CHOOSE_FIRSTN: firstn = 1; + /* fall through */ case CRUSH_RULE_CHOOSE_LEAF_INDEP: case CRUSH_RULE_CHOOSE_INDEP: - BUG_ON(wsize == 0); + if (wsize == 0) + break; recurse_to_leaf = - rule->steps[step].op == + curstep->op == CRUSH_RULE_CHOOSE_LEAF_FIRSTN || - rule->steps[step].op == + curstep->op == CRUSH_RULE_CHOOSE_LEAF_INDEP; /* reset output */ @@ -545,32 +529,18 @@ int crush_do_rule(struct crush_map *map, * basically, numrep <= 0 means relative to * the provided result_max */ - numrep = rule->steps[step].arg1; + numrep = curstep->arg1; if (numrep <= 0) { numrep += result_max; if (numrep <= 0) continue; } j = 0; - if (osize == 0 && force_pos >= 0) { - /* skip any intermediate types */ - while (force_pos && - force_context[force_pos] < 0 && - rule->steps[step].arg2 != - map->buckets[-1 - - force_context[force_pos]]->type) - force_pos--; - o[osize] = force_context[force_pos]; - if (recurse_to_leaf) - c[osize] = force_context[0]; - j++; - force_pos--; - } osize += crush_choose(map, map->buckets[-1-w[i]], weight, x, numrep, - rule->steps[step].arg2, + curstep->arg2, o+osize, j, firstn, recurse_to_leaf, c+osize); @@ -597,7 +567,9 @@ int crush_do_rule(struct crush_map *map, break; default: - BUG_ON(1); + dprintk(" unknown op %d at step %d\n", + curstep->op, step); + break; } } return result_len; diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 36fa6bf..524f4e4 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -653,54 +653,57 @@ static void prepare_write_keepalive(struct ceph_connection *con) * Connection negotiation. */ -static int prepare_connect_authorizer(struct ceph_connection *con) +static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con, + int *auth_proto) { - void *auth_buf; - int auth_len = 0; - int auth_protocol = 0; + struct ceph_auth_handshake *auth; + + if (!con->ops->get_authorizer) { + con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; + con->out_connect.authorizer_len = 0; + + return NULL; + } + + /* Can't hold the mutex while getting authorizer */ mutex_unlock(&con->mutex); - if (con->ops->get_authorizer) - con->ops->get_authorizer(con, &auth_buf, &auth_len, - &auth_protocol, &con->auth_reply_buf, - &con->auth_reply_buf_len, - con->auth_retry); + + auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); + mutex_lock(&con->mutex); - if (test_bit(CLOSED, &con->state) || - test_bit(OPENING, &con->state)) - return -EAGAIN; + if (IS_ERR(auth)) + return auth; + if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) + return ERR_PTR(-EAGAIN); - con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); - con->out_connect.authorizer_len = cpu_to_le32(auth_len); + con->auth_reply_buf = auth->authorizer_reply_buf; + con->auth_reply_buf_len = auth->authorizer_reply_buf_len; - if (auth_len) - ceph_con_out_kvec_add(con, auth_len, auth_buf); - return 0; + return auth; } /* * We connected to a peer and are saying hello. */ -static void prepare_write_banner(struct ceph_messenger *msgr, - struct ceph_connection *con) +static void prepare_write_banner(struct ceph_connection *con) { - ceph_con_out_kvec_reset(con); ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); - ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr), - &msgr->my_enc_addr); + ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), + &con->msgr->my_enc_addr); con->out_more = 0; set_bit(WRITE_PENDING, &con->state); } -static int prepare_write_connect(struct ceph_messenger *msgr, - struct ceph_connection *con, - int include_banner) +static int prepare_write_connect(struct ceph_connection *con) { unsigned int global_seq = get_global_seq(con->msgr, 0); int proto; + int auth_proto; + struct ceph_auth_handshake *auth; switch (con->peer_name.type) { case CEPH_ENTITY_TYPE_MON: @@ -719,23 +722,32 @@ static int prepare_write_connect(struct ceph_messenger *msgr, dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, con->connect_seq, global_seq, proto); - con->out_connect.features = cpu_to_le64(msgr->supported_features); + con->out_connect.features = cpu_to_le64(con->msgr->supported_features); con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); con->out_connect.global_seq = cpu_to_le32(global_seq); con->out_connect.protocol_version = cpu_to_le32(proto); con->out_connect.flags = 0; - if (include_banner) - prepare_write_banner(msgr, con); - else - ceph_con_out_kvec_reset(con); - ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); + auth_proto = CEPH_AUTH_UNKNOWN; + auth = get_connect_authorizer(con, &auth_proto); + if (IS_ERR(auth)) + return PTR_ERR(auth); + + con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); + con->out_connect.authorizer_len = auth ? + cpu_to_le32(auth->authorizer_buf_len) : 0; + + ceph_con_out_kvec_add(con, sizeof (con->out_connect), + &con->out_connect); + if (auth && auth->authorizer_buf_len) + ceph_con_out_kvec_add(con, auth->authorizer_buf_len, + auth->authorizer_buf); con->out_more = 0; set_bit(WRITE_PENDING, &con->state); - return prepare_connect_authorizer(con); + return 0; } /* @@ -992,11 +1004,10 @@ static int prepare_read_message(struct ceph_connection *con) static int read_partial(struct ceph_connection *con, - int *to, int size, void *object) + int end, int size, void *object) { - *to += size; - while (con->in_base_pos < *to) { - int left = *to - con->in_base_pos; + while (con->in_base_pos < end) { + int left = end - con->in_base_pos; int have = size - left; int ret = ceph_tcp_recvmsg(con->sock, object + have, left); if (ret <= 0) @@ -1012,37 +1023,52 @@ static int read_partial(struct ceph_connection *con, */ static int read_partial_banner(struct ceph_connection *con) { - int ret, to = 0; + int size; + int end; + int ret; dout("read_partial_banner %p at %d\n", con, con->in_base_pos); /* peer's banner */ - ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); + size = strlen(CEPH_BANNER); + end = size; + ret = read_partial(con, end, size, con->in_banner); if (ret <= 0) goto out; - ret = read_partial(con, &to, sizeof(con->actual_peer_addr), - &con->actual_peer_addr); + + size = sizeof (con->actual_peer_addr); + end += size; + ret = read_partial(con, end, size, &con->actual_peer_addr); if (ret <= 0) goto out; - ret = read_partial(con, &to, sizeof(con->peer_addr_for_me), - &con->peer_addr_for_me); + + size = sizeof (con->peer_addr_for_me); + end += size; + ret = read_partial(con, end, size, &con->peer_addr_for_me); if (ret <= 0) goto out; + out: return ret; } static int read_partial_connect(struct ceph_connection *con) { - int ret, to = 0; + int size; + int end; + int ret; dout("read_partial_connect %p at %d\n", con, con->in_base_pos); - ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); + size = sizeof (con->in_reply); + end = size; + ret = read_partial(con, end, size, &con->in_reply); if (ret <= 0) goto out; - ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len), - con->auth_reply_buf); + + size = le32_to_cpu(con->in_reply.authorizer_len); + end += size; + ret = read_partial(con, end, size, con->auth_reply_buf); if (ret <= 0) goto out; @@ -1377,7 +1403,8 @@ static int process_connect(struct ceph_connection *con) return -1; } con->auth_retry = 1; - ret = prepare_write_connect(con->msgr, con, 0); + ceph_con_out_kvec_reset(con); + ret = prepare_write_connect(con); if (ret < 0) return ret; prepare_read_connect(con); @@ -1397,7 +1424,10 @@ static int process_connect(struct ceph_connection *con) ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr)); reset_connection(con); - prepare_write_connect(con->msgr, con, 0); + ceph_con_out_kvec_reset(con); + ret = prepare_write_connect(con); + if (ret < 0) + return ret; prepare_read_connect(con); /* Tell ceph about it. */ @@ -1420,7 +1450,10 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->out_connect.connect_seq), le32_to_cpu(con->in_connect.connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); - prepare_write_connect(con->msgr, con, 0); + ceph_con_out_kvec_reset(con); + ret = prepare_write_connect(con); + if (ret < 0) + return ret; prepare_read_connect(con); break; @@ -1434,7 +1467,10 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_connect.global_seq)); get_global_seq(con->msgr, le32_to_cpu(con->in_connect.global_seq)); - prepare_write_connect(con->msgr, con, 0); + ceph_con_out_kvec_reset(con); + ret = prepare_write_connect(con); + if (ret < 0) + return ret; prepare_read_connect(con); break; @@ -1491,10 +1527,10 @@ static int process_connect(struct ceph_connection *con) */ static int read_partial_ack(struct ceph_connection *con) { - int to = 0; + int size = sizeof (con->in_temp_ack); + int end = size; - return read_partial(con, &to, sizeof(con->in_temp_ack), - &con->in_temp_ack); + return read_partial(con, end, size, &con->in_temp_ack); } @@ -1627,8 +1663,9 @@ static int read_partial_message_bio(struct ceph_connection *con, static int read_partial_message(struct ceph_connection *con) { struct ceph_msg *m = con->in_msg; + int size; + int end; int ret; - int to, left; unsigned int front_len, middle_len, data_len; bool do_datacrc = !con->msgr->nocrc; int skip; @@ -1638,15 +1675,11 @@ static int read_partial_message(struct ceph_connection *con) dout("read_partial_message con %p msg %p\n", con, m); /* header */ - while (con->in_base_pos < sizeof(con->in_hdr)) { - left = sizeof(con->in_hdr) - con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->in_hdr + con->in_base_pos, - left); - if (ret <= 0) - return ret; - con->in_base_pos += ret; - } + size = sizeof (con->in_hdr); + end = size; + ret = read_partial(con, end, size, &con->in_hdr); + if (ret <= 0) + return ret; crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); if (cpu_to_le32(crc) != con->in_hdr.crc) { @@ -1759,16 +1792,12 @@ static int read_partial_message(struct ceph_connection *con) } /* footer */ - to = sizeof(m->hdr) + sizeof(m->footer); - while (con->in_base_pos < to) { - left = to - con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + - (con->in_base_pos - sizeof(m->hdr)), - left); - if (ret <= 0) - return ret; - con->in_base_pos += ret; - } + size = sizeof (m->footer); + end += size; + ret = read_partial(con, end, size, &m->footer); + if (ret <= 0) + return ret; + dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", m, front_len, m->footer.front_crc, middle_len, m->footer.middle_crc, data_len, m->footer.data_crc); @@ -1835,7 +1864,6 @@ static void process_message(struct ceph_connection *con) */ static int try_write(struct ceph_connection *con) { - struct ceph_messenger *msgr = con->msgr; int ret = 1; dout("try_write start %p state %lu nref %d\n", con, con->state, @@ -1846,7 +1874,11 @@ more: /* open the socket first? */ if (con->sock == NULL) { - prepare_write_connect(msgr, con, 1); + ceph_con_out_kvec_reset(con); + prepare_write_banner(con); + ret = prepare_write_connect(con); + if (ret < 0) + goto out; prepare_read_banner(con); set_bit(CONNECTING, &con->state); clear_bit(NEGOTIATING, &con->state); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 1b0ef3c..1ffebed 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -278,7 +278,7 @@ static void osd_req_encode_op(struct ceph_osd_request *req, { dst->op = cpu_to_le16(src->op); - switch (dst->op) { + switch (src->op) { case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: dst->extent.offset = @@ -664,11 +664,11 @@ static void put_osd(struct ceph_osd *osd) { dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), atomic_read(&osd->o_ref) - 1); - if (atomic_dec_and_test(&osd->o_ref)) { + if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; - if (osd->o_authorizer) - ac->ops->destroy_authorizer(ac, osd->o_authorizer); + if (ac->ops && ac->ops->destroy_authorizer) + ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer); kfree(osd); } } @@ -841,6 +841,12 @@ static void register_request(struct ceph_osd_client *osdc, static void __unregister_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { + if (RB_EMPTY_NODE(&req->r_node)) { + dout("__unregister_request %p tid %lld not registered\n", + req, req->r_tid); + return; + } + dout("__unregister_request %p tid %lld\n", req, req->r_tid); rb_erase(&req->r_node, &osdc->requests); osdc->num_requests--; @@ -2108,37 +2114,32 @@ static void put_osd_con(struct ceph_connection *con) /* * authentication */ -static int get_authorizer(struct ceph_connection *con, - void **buf, int *len, int *proto, - void **reply_buf, int *reply_len, int force_new) +/* + * Note: returned pointer is the address of a structure that's + * managed separately. Caller must *not* attempt to free it. + */ +static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, + int *proto, int force_new) { struct ceph_osd *o = con->private; struct ceph_osd_client *osdc = o->o_osdc; struct ceph_auth_client *ac = osdc->client->monc.auth; - int ret = 0; + struct ceph_auth_handshake *auth = &o->o_auth; - if (force_new && o->o_authorizer) { - ac->ops->destroy_authorizer(ac, o->o_authorizer); - o->o_authorizer = NULL; - } - if (o->o_authorizer == NULL) { - ret = ac->ops->create_authorizer( - ac, CEPH_ENTITY_TYPE_OSD, - &o->o_authorizer, - &o->o_authorizer_buf, - &o->o_authorizer_buf_len, - &o->o_authorizer_reply_buf, - &o->o_authorizer_reply_buf_len); + if (force_new && auth->authorizer) { + if (ac->ops && ac->ops->destroy_authorizer) + ac->ops->destroy_authorizer(ac, auth->authorizer); + auth->authorizer = NULL; + } + if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { + int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, + auth); if (ret) - return ret; + return ERR_PTR(ret); } - *proto = ac->protocol; - *buf = o->o_authorizer_buf; - *len = o->o_authorizer_buf_len; - *reply_buf = o->o_authorizer_reply_buf; - *reply_len = o->o_authorizer_reply_buf_len; - return 0; + + return auth; } @@ -2148,7 +2149,11 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len) struct ceph_osd_client *osdc = o->o_osdc; struct ceph_auth_client *ac = osdc->client->monc.auth; - return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len); + /* + * XXX If ac->ops or ac->ops->verify_authorizer_reply is null, + * XXX which do we do: succeed or fail? + */ + return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len); } static int invalidate_authorizer(struct ceph_connection *con) @@ -2157,7 +2162,7 @@ static int invalidate_authorizer(struct ceph_connection *con) struct ceph_osd_client *osdc = o->o_osdc; struct ceph_auth_client *ac = osdc->client->monc.auth; - if (ac->ops->invalidate_authorizer) + if (ac->ops && ac->ops->invalidate_authorizer) ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); return ceph_monc_validate_auth(&osdc->client->monc); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 56e561a..81e3b84 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -161,13 +161,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end) c->max_rules = ceph_decode_32(p); c->max_devices = ceph_decode_32(p); - c->device_parents = kcalloc(c->max_devices, sizeof(u32), GFP_NOFS); - if (c->device_parents == NULL) - goto badmem; - c->bucket_parents = kcalloc(c->max_buckets, sizeof(u32), GFP_NOFS); - if (c->bucket_parents == NULL) - goto badmem; - c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS); if (c->buckets == NULL) goto badmem; @@ -890,8 +883,12 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, pglen = ceph_decode_32(p); if (pglen) { - /* insert */ ceph_decode_need(p, end, pglen*sizeof(u32), bad); + + /* removing existing (if any) */ + (void) __remove_pg_mapping(&map->pg_temp, pgid); + + /* insert */ pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); if (!pg) { err = -ENOMEM; @@ -1000,7 +997,6 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol, { unsigned int num, num_mask; struct ceph_pg pgid; - s32 preferred = (s32)le32_to_cpu(fl->fl_pg_preferred); int poolid = le32_to_cpu(fl->fl_pg_pool); struct ceph_pg_pool_info *pool; unsigned int ps; @@ -1011,23 +1007,13 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol, if (!pool) return -EIO; ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); - if (preferred >= 0) { - ps += preferred; - num = le32_to_cpu(pool->v.lpg_num); - num_mask = pool->lpg_num_mask; - } else { - num = le32_to_cpu(pool->v.pg_num); - num_mask = pool->pg_num_mask; - } + num = le32_to_cpu(pool->v.pg_num); + num_mask = pool->pg_num_mask; pgid.ps = cpu_to_le16(ps); - pgid.preferred = cpu_to_le16(preferred); + pgid.preferred = cpu_to_le16(-1); pgid.pool = fl->fl_pg_pool; - if (preferred >= 0) - dout("calc_object_layout '%s' pgid %d.%xp%d\n", oid, poolid, ps, - (int)preferred); - else - dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps); + dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps); ol->ol_pgid = pgid; ol->ol_stripe_unit = fl->fl_object_stripe_unit; @@ -1045,24 +1031,18 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, struct ceph_pg_mapping *pg; struct ceph_pg_pool_info *pool; int ruleno; - unsigned int poolid, ps, pps, t; - int preferred; + unsigned int poolid, ps, pps, t, r; poolid = le32_to_cpu(pgid.pool); ps = le16_to_cpu(pgid.ps); - preferred = (s16)le16_to_cpu(pgid.preferred); pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); if (!pool) return NULL; /* pg_temp? */ - if (preferred >= 0) - t = ceph_stable_mod(ps, le32_to_cpu(pool->v.lpg_num), - pool->lpgp_num_mask); - else - t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), - pool->pgp_num_mask); + t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), + pool->pgp_num_mask); pgid.ps = cpu_to_le16(t); pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { @@ -1080,23 +1060,20 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, return NULL; } - /* don't forcefeed bad device ids to crush */ - if (preferred >= osdmap->max_osd || - preferred >= osdmap->crush->max_devices) - preferred = -1; - - if (preferred >= 0) - pps = ceph_stable_mod(ps, - le32_to_cpu(pool->v.lpgp_num), - pool->lpgp_num_mask); - else - pps = ceph_stable_mod(ps, - le32_to_cpu(pool->v.pgp_num), - pool->pgp_num_mask); + pps = ceph_stable_mod(ps, + le32_to_cpu(pool->v.pgp_num), + pool->pgp_num_mask); pps += poolid; - *num = crush_do_rule(osdmap->crush, ruleno, pps, osds, - min_t(int, pool->v.size, *num), - preferred, osdmap->osd_weight); + r = crush_do_rule(osdmap->crush, ruleno, pps, osds, + min_t(int, pool->v.size, *num), + osdmap->osd_weight); + if (r < 0) { + pr_err("error %d from crush rule: pool %d ruleset %d type %d" + " size %d\n", r, poolid, pool->v.crush_ruleset, + pool->v.type, pool->v.size); + return NULL; + } + *num = r; return osds; } |