diff options
Diffstat (limited to 'net/ceph')
-rw-r--r-- | net/ceph/debugfs.c | 61 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 355 |
2 files changed, 216 insertions, 200 deletions
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 0c11ab5..6d3ff71 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -145,6 +145,43 @@ static int monc_show(struct seq_file *s, void *p) return 0; } +static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t) +{ + int i; + + seq_printf(s, "osd%d\t%llu.%x\t[", t->osd, t->pgid.pool, t->pgid.seed); + for (i = 0; i < t->up.size; i++) + seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]); + seq_printf(s, "]/%d\t[", t->up.primary); + for (i = 0; i < t->acting.size; i++) + seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]); + seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary, + t->target_oid.name_len, t->target_oid.name, t->flags); + if (t->paused) + seq_puts(s, "\tP"); +} + +static void dump_request(struct seq_file *s, struct ceph_osd_request *req) +{ + int i; + + seq_printf(s, "%llu\t", req->r_tid); + dump_target(s, &req->r_t); + + seq_printf(s, "\t%d\t%u'%llu", req->r_attempts, + le32_to_cpu(req->r_replay_version.epoch), + le64_to_cpu(req->r_replay_version.version)); + + for (i = 0; i < req->r_num_ops; i++) { + struct ceph_osd_req_op *op = &req->r_ops[i]; + + seq_printf(s, "%s%s", (i == 0 ? "\t" : ","), + ceph_osd_op_name(op->op)); + } + + seq_putc(s, '\n'); +} + static int osdc_show(struct seq_file *s, void *pp) { struct ceph_client *client = s->private; @@ -154,32 +191,10 @@ static int osdc_show(struct seq_file *s, void *pp) mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { struct ceph_osd_request *req; - unsigned int i; - int opcode; req = rb_entry(p, struct ceph_osd_request, r_node); - seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid, - req->r_osd ? req->r_osd->o_osd : -1, - req->r_t.pgid.pool, req->r_t.pgid.seed); - - seq_printf(s, "%*pE", req->r_base_oid.name_len, - req->r_base_oid.name); - - if (req->r_reassert_version.epoch) - seq_printf(s, "\t%u'%llu", - (unsigned int)le32_to_cpu(req->r_reassert_version.epoch), - le64_to_cpu(req->r_reassert_version.version)); - else - seq_printf(s, "\t"); - - for (i = 0; i < req->r_num_ops; i++) { - opcode = req->r_ops[i].op; - seq_printf(s, "%s%s", (i == 0 ? "\t" : ","), - ceph_osd_op_name(opcode)); - } - - seq_printf(s, "\n"); + dump_request(s, req); } mutex_unlock(&osdc->request_mutex); return 0; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 0131015..8a008f0 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -34,8 +34,6 @@ static void __unregister_request(struct ceph_osd_client *osdc, static void __unregister_linger_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); static void __enqueue_request(struct ceph_osd_request *req); -static void __send_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req); /* * Implement client access to distributed object storage cluster. @@ -209,6 +207,8 @@ void osd_req_op_cls_request_data_pagelist( osd_data = osd_req_op_data(osd_req, which, cls, request_data); ceph_osd_data_pagelist_init(osd_data, pagelist); + osd_req->r_ops[which].cls.indata_len += pagelist->length; + osd_req->r_ops[which].indata_len += pagelist->length; } EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); @@ -221,6 +221,8 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, osd_data = osd_req_op_data(osd_req, which, cls, request_data); ceph_osd_data_pages_init(osd_data, pages, length, alignment, pages_from_pool, own_pages); + osd_req->r_ops[which].cls.indata_len += length; + osd_req->r_ops[which].indata_len += length; } EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); @@ -610,8 +612,6 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); - op->cls.argc = 0; /* currently unused */ - op->indata_len = payload_len; } EXPORT_SYMBOL(osd_req_op_cls_init); @@ -709,16 +709,9 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, } } -static u64 osd_req_encode_op(struct ceph_osd_request *req, - struct ceph_osd_op *dst, unsigned int which) +static u32 osd_req_encode_op(struct ceph_osd_op *dst, + const struct ceph_osd_req_op *src) { - struct ceph_osd_req_op *src; - struct ceph_osd_data *osd_data; - u64 request_data_len = 0; - u64 data_length; - - BUG_ON(which >= req->r_num_ops); - src = &req->r_ops[which]; if (WARN_ON(!osd_req_opcode_valid(src->op))) { pr_err("unrecognized osd opcode %d\n", src->op); @@ -727,49 +720,23 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, switch (src->op) { case CEPH_OSD_OP_STAT: - osd_data = &src->raw_data_in; - ceph_osdc_msg_data_add(req->r_reply, osd_data); break; case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_ZERO: case CEPH_OSD_OP_TRUNCATE: - if (src->op == CEPH_OSD_OP_WRITE || - src->op == CEPH_OSD_OP_WRITEFULL) - request_data_len = src->extent.length; dst->extent.offset = cpu_to_le64(src->extent.offset); dst->extent.length = cpu_to_le64(src->extent.length); dst->extent.truncate_size = cpu_to_le64(src->extent.truncate_size); dst->extent.truncate_seq = cpu_to_le32(src->extent.truncate_seq); - osd_data = &src->extent.osd_data; - if (src->op == CEPH_OSD_OP_WRITE || - src->op == CEPH_OSD_OP_WRITEFULL) - ceph_osdc_msg_data_add(req->r_request, osd_data); - else - ceph_osdc_msg_data_add(req->r_reply, osd_data); break; case CEPH_OSD_OP_CALL: dst->cls.class_len = src->cls.class_len; dst->cls.method_len = src->cls.method_len; - osd_data = &src->cls.request_info; - ceph_osdc_msg_data_add(req->r_request, osd_data); - BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST); - request_data_len = osd_data->pagelist->length; - - osd_data = &src->cls.request_data; - data_length = ceph_osd_data_length(osd_data); - if (data_length) { - BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); - dst->cls.indata_len = cpu_to_le32(data_length); - ceph_osdc_msg_data_add(req->r_request, osd_data); - src->indata_len += data_length; - request_data_len += data_length; - } - osd_data = &src->cls.response_data; - ceph_osdc_msg_data_add(req->r_reply, osd_data); + dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); break; case CEPH_OSD_OP_STARTSYNC: break; @@ -791,9 +758,6 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); dst->xattr.cmp_op = src->xattr.cmp_op; dst->xattr.cmp_mode = src->xattr.cmp_mode; - osd_data = &src->xattr.osd_data; - ceph_osdc_msg_data_add(req->r_request, osd_data); - request_data_len = osd_data->pagelist->length; break; case CEPH_OSD_OP_CREATE: case CEPH_OSD_OP_DELETE: @@ -810,7 +774,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, dst->flags = cpu_to_le32(src->flags); dst->payload_len = cpu_to_le32(src->indata_len); - return request_data_len; + return src->indata_len; } /* @@ -852,8 +816,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, goto fail; } - req->r_flags = flags; - /* calculate max write size */ r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); if (r) @@ -877,9 +839,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, truncate_size, truncate_seq); } + req->r_flags = flags; req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); + req->r_snapid = vino.snap; + if (flags & CEPH_OSD_FLAG_WRITE) + req->r_data_offset = off; + r = ceph_osdc_alloc_messages(req, GFP_NOFS); if (r) goto fail; @@ -1509,37 +1476,173 @@ out: return err; } -/* - * caller should hold map_sem (for read) and request_mutex - */ -static void __send_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +static void setup_request_data(struct ceph_osd_request *req, + struct ceph_msg *msg) { - void *p; + u32 data_len = 0; + int i; + + if (!list_empty(&msg->data)) + return; - dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", - req, req->r_tid, req->r_osd->o_osd, req->r_flags, - req->r_t.pgid.pool, req->r_t.pgid.seed); + WARN_ON(msg->data_length); + for (i = 0; i < req->r_num_ops; i++) { + struct ceph_osd_req_op *op = &req->r_ops[i]; + + switch (op->op) { + /* request */ + case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_WRITEFULL: + WARN_ON(op->indata_len != op->extent.length); + ceph_osdc_msg_data_add(msg, &op->extent.osd_data); + break; + case CEPH_OSD_OP_SETXATTR: + case CEPH_OSD_OP_CMPXATTR: + WARN_ON(op->indata_len != op->xattr.name_len + + op->xattr.value_len); + ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); + break; + + /* reply */ + case CEPH_OSD_OP_STAT: + ceph_osdc_msg_data_add(req->r_reply, + &op->raw_data_in); + break; + case CEPH_OSD_OP_READ: + ceph_osdc_msg_data_add(req->r_reply, + &op->extent.osd_data); + break; + + /* both */ + case CEPH_OSD_OP_CALL: + WARN_ON(op->indata_len != op->cls.class_len + + op->cls.method_len + + op->cls.indata_len); + ceph_osdc_msg_data_add(msg, &op->cls.request_info); + /* optional, can be NONE */ + ceph_osdc_msg_data_add(msg, &op->cls.request_data); + /* optional, can be NONE */ + ceph_osdc_msg_data_add(req->r_reply, + &op->cls.response_data); + break; + } + + data_len += op->indata_len; + } - /* fill in message content that changes each time we send it */ - put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); - put_unaligned_le32(req->r_flags, req->r_request_flags); - put_unaligned_le64(req->r_t.target_oloc.pool, req->r_request_pool); - p = req->r_request_pgid; + WARN_ON(data_len != msg->data_length); +} + +static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg) +{ + void *p = msg->front.iov_base; + void *const end = p + msg->front_alloc_len; + u32 data_len = 0; + int i; + + if (req->r_flags & CEPH_OSD_FLAG_WRITE) { + /* snapshots aren't writeable */ + WARN_ON(req->r_snapid != CEPH_NOSNAP); + } else { + WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec || + req->r_data_offset || req->r_snapc); + } + + setup_request_data(req, msg); + + ceph_encode_32(&p, 1); /* client_inc, always 1 */ + ceph_encode_32(&p, req->r_osdc->osdmap->epoch); + ceph_encode_32(&p, req->r_flags); + ceph_encode_timespec(p, &req->r_mtime); + p += sizeof(struct ceph_timespec); + /* aka reassert_version */ + memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version)); + p += sizeof(req->r_replay_version); + + /* oloc */ + ceph_encode_8(&p, 4); + ceph_encode_8(&p, 4); + ceph_encode_32(&p, 8 + 4 + 4); + ceph_encode_64(&p, req->r_t.target_oloc.pool); + ceph_encode_32(&p, -1); /* preferred */ + ceph_encode_32(&p, 0); /* key len */ + + /* pgid */ + ceph_encode_8(&p, 1); ceph_encode_64(&p, req->r_t.pgid.pool); ceph_encode_32(&p, req->r_t.pgid.seed); - put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ - memcpy(req->r_request_reassert_version, &req->r_reassert_version, - sizeof(req->r_reassert_version)); + ceph_encode_32(&p, -1); /* preferred */ - req->r_stamp = jiffies; - list_move_tail(&req->r_req_lru_item, &osdc->req_lru); + /* oid */ + ceph_encode_32(&p, req->r_t.target_oid.name_len); + memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len); + p += req->r_t.target_oid.name_len; - ceph_msg_get(req->r_request); /* send consumes a ref */ + /* ops, can imply data */ + ceph_encode_16(&p, req->r_num_ops); + for (i = 0; i < req->r_num_ops; i++) { + data_len += osd_req_encode_op(p, &req->r_ops[i]); + p += sizeof(struct ceph_osd_op); + } - req->r_sent = req->r_osd->o_incarnation; + ceph_encode_64(&p, req->r_snapid); /* snapid */ + if (req->r_snapc) { + ceph_encode_64(&p, req->r_snapc->seq); + ceph_encode_32(&p, req->r_snapc->num_snaps); + for (i = 0; i < req->r_snapc->num_snaps; i++) + ceph_encode_64(&p, req->r_snapc->snaps[i]); + } else { + ceph_encode_64(&p, 0); /* snap_seq */ + ceph_encode_32(&p, 0); /* snaps len */ + } + + ceph_encode_32(&p, req->r_attempts); /* retry_attempt */ + + BUG_ON(p > end); + msg->front.iov_len = p - msg->front.iov_base; + msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */ + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + msg->hdr.data_len = cpu_to_le32(data_len); + /* + * The header "data_off" is a hint to the receiver allowing it + * to align received data into its buffers such that there's no + * need to re-copy it before writing it to disk (direct I/O). + */ + msg->hdr.data_off = cpu_to_le16(req->r_data_offset); - ceph_con_send(&req->r_osd->o_con, req->r_request); + dout("%s req %p oid %*pE oid_len %d front %zu data %u\n", __func__, + req, req->r_t.target_oid.name_len, req->r_t.target_oid.name, + req->r_t.target_oid.name_len, msg->front.iov_len, data_len); +} + +/* + * @req has to be assigned a tid and registered. + */ +static void send_request(struct ceph_osd_request *req) +{ + struct ceph_osd *osd = req->r_osd; + + WARN_ON(osd->o_osd != req->r_t.osd); + + req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR; + if (req->r_attempts) + req->r_flags |= CEPH_OSD_FLAG_RETRY; + else + WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY); + + encode_request(req, req->r_request); + + dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n", + __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, + req->r_t.osd, req->r_flags, req->r_attempts); + + req->r_t.paused = false; + req->r_stamp = jiffies; + req->r_attempts++; + + req->r_sent = osd->o_incarnation; + req->r_request->hdr.tid = cpu_to_le64(req->r_tid); + ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request)); } /* @@ -1550,8 +1653,10 @@ static void __send_queued(struct ceph_osd_client *osdc) struct ceph_osd_request *req, *tmp; dout("__send_queued\n"); - list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) - __send_request(osdc, req); + list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { + list_move_tail(&req->r_req_lru_item, &osdc->req_lru); + send_request(req); + } } /* @@ -1915,8 +2020,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) req->r_result = bytes; /* in case this is a write and we need to replay, */ - req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); - req->r_reassert_version.version = cpu_to_le64(reassert_version); + req->r_replay_version.epoch = cpu_to_le32(reassert_epoch); + req->r_replay_version.version = cpu_to_le64(reassert_version); req->r_got_reply = 1; } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { @@ -2433,105 +2538,6 @@ bad: } /* - * build new request AND message - * - */ -void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, - struct ceph_snap_context *snapc, u64 snap_id, - struct timespec *mtime) -{ - struct ceph_msg *msg = req->r_request; - void *p; - size_t msg_size; - int flags = req->r_flags; - u64 data_len; - unsigned int i; - - req->r_snapid = snap_id; - WARN_ON(snapc != req->r_snapc); - - /* encode request */ - msg->hdr.version = cpu_to_le16(4); - - p = msg->front.iov_base; - ceph_encode_32(&p, 1); /* client_inc is always 1 */ - req->r_request_osdmap_epoch = p; - p += 4; - req->r_request_flags = p; - p += 4; - if (req->r_flags & CEPH_OSD_FLAG_WRITE) - ceph_encode_timespec(p, mtime); - p += sizeof(struct ceph_timespec); - req->r_request_reassert_version = p; - p += sizeof(struct ceph_eversion); /* will get filled in */ - - /* oloc */ - ceph_encode_8(&p, 4); - ceph_encode_8(&p, 4); - ceph_encode_32(&p, 8 + 4 + 4); - req->r_request_pool = p; - p += 8; - ceph_encode_32(&p, -1); /* preferred */ - ceph_encode_32(&p, 0); /* key len */ - - ceph_encode_8(&p, 1); - req->r_request_pgid = p; - p += 8 + 4; - ceph_encode_32(&p, -1); /* preferred */ - - /* oid */ - ceph_encode_32(&p, req->r_base_oid.name_len); - memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); - dout("oid %*pE len %d\n", req->r_base_oid.name_len, - req->r_base_oid.name, req->r_base_oid.name_len); - p += req->r_base_oid.name_len; - - /* ops--can imply data */ - ceph_encode_16(&p, (u16)req->r_num_ops); - data_len = 0; - for (i = 0; i < req->r_num_ops; i++) { - data_len += osd_req_encode_op(req, p, i); - p += sizeof(struct ceph_osd_op); - } - - /* snaps */ - ceph_encode_64(&p, req->r_snapid); - ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); - ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); - if (req->r_snapc) { - for (i = 0; i < req->r_snapc->num_snaps; i++) { - ceph_encode_64(&p, req->r_snapc->snaps[i]); - } - } - - req->r_request_attempts = p; - p += 4; - - /* data */ - if (flags & CEPH_OSD_FLAG_WRITE) { - u16 data_off; - - /* - * The header "data_off" is a hint to the receiver - * allowing it to align received data into its - * buffers such that there's no need to re-copy - * it before writing it to disk (direct I/O). - */ - data_off = (u16) (off & 0xffff); - req->r_request->hdr.data_off = cpu_to_le16(data_off); - } - req->r_request->hdr.data_len = cpu_to_le32(data_len); - - BUG_ON(p > msg->front.iov_base + msg->front.iov_len); - msg_size = p - msg->front.iov_base; - msg->front.iov_len = msg_size; - msg->hdr.front_len = cpu_to_le32(msg_size); - - dout("build_request msg_size was %d\n", (int)msg_size); -} -EXPORT_SYMBOL(ceph_osdc_build_request); - -/* * Register request, send initial attempt. */ int ceph_osdc_start_request(struct ceph_osd_client *osdc, @@ -2749,15 +2755,12 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, return PTR_ERR(req); /* it may be a short read due to an object boundary */ - osd_req_op_extent_osd_data_pages(req, 0, pages, *plen, page_align, false, false); dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", off, *plen, *plen, page_align); - ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); - rc = ceph_osdc_start_request(osdc, req, false); if (!rc) rc = ceph_osdc_wait_request(osdc, req); @@ -2783,7 +2786,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, int rc = 0; int page_align = off & ~PAGE_MASK; - BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, @@ -2797,8 +2799,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, false, false); dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); - ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); - + req->r_mtime = *mtime; rc = ceph_osdc_start_request(osdc, req, true); if (!rc) rc = ceph_osdc_wait_request(osdc, req); |