diff options
-rw-r--r-- | include/linux/ceph/osd_client.h | 6 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 167 |
2 files changed, 164 insertions, 9 deletions
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 3170ca6..fd47e87 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -155,6 +155,8 @@ struct ceph_osd_request { struct ceph_object_locator r_base_oloc; struct ceph_object_id r_base_oid; + struct ceph_object_locator r_target_oloc; + struct ceph_object_id r_target_oid; u64 r_snapid; unsigned long r_stamp; /* send OR check time */ @@ -162,6 +164,10 @@ struct ceph_osd_request { struct ceph_snap_context *r_snapc; /* snap context for writes */ }; +struct ceph_request_redirect { + struct ceph_object_locator oloc; +}; + struct ceph_osd_event { u64 cookie; int one_shot; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 3997a87..010ff3b 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -369,6 +369,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_osd_item); req->r_base_oloc.pool = -1; + req->r_target_oloc.pool = -1; /* create reply message */ if (use_mempool) @@ -1256,23 +1257,36 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap, struct ceph_osd_request *req, struct ceph_pg *pg_out) { - if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { + bool need_check_tiering; + + need_check_tiering = false; + if (req->r_target_oloc.pool == -1) { + req->r_target_oloc = req->r_base_oloc; /* struct */ + need_check_tiering = true; + } + if (req->r_target_oid.name_len == 0) { + ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); + need_check_tiering = true; + } + + if (need_check_tiering && + (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { struct ceph_pg_pool_info *pi; - pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool); + pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool); if (pi) { if ((req->r_flags & CEPH_OSD_FLAG_READ) && pi->read_tier >= 0) - req->r_base_oloc.pool = pi->read_tier; + req->r_target_oloc.pool = pi->read_tier; if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && pi->write_tier >= 0) - req->r_base_oloc.pool = pi->write_tier; + req->r_target_oloc.pool = pi->write_tier; } /* !pi is caught in ceph_oloc_oid_to_pg() */ } - return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc, - &req->r_base_oid, pg_out); + return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc, + &req->r_target_oid, pg_out); } /* @@ -1382,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc, /* fill in message content that changes each time we send it */ put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); put_unaligned_le32(req->r_flags, req->r_request_flags); - put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool); + put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool); p = req->r_request_pgid; ceph_encode_64(&p, req->r_pgid.pool); ceph_encode_32(&p, req->r_pgid.seed); @@ -1483,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work) round_jiffies_relative(delay)); } +static int ceph_oloc_decode(void **p, void *end, + struct ceph_object_locator *oloc) +{ + u8 struct_v, struct_cv; + u32 len; + void *struct_end; + int ret = 0; + + ceph_decode_need(p, end, 1 + 1 + 4, e_inval); + struct_v = ceph_decode_8(p); + struct_cv = ceph_decode_8(p); + if (struct_v < 3) { + pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", + struct_v, struct_cv); + goto e_inval; + } + if (struct_cv > 6) { + pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", + struct_v, struct_cv); + goto e_inval; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, e_inval); + struct_end = *p + len; + + oloc->pool = ceph_decode_64(p); + *p += 4; /* skip preferred */ + + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_object_locator::key is set\n"); + goto e_inval; + } + + if (struct_v >= 5) { + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_object_locator::nspace is set\n"); + goto e_inval; + } + } + + if (struct_v >= 6) { + s64 hash = ceph_decode_64(p); + if (hash != -1) { + pr_warn("ceph_object_locator::hash is set\n"); + goto e_inval; + } + } + + /* skip the rest */ + *p = struct_end; +out: + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + +static int ceph_redirect_decode(void **p, void *end, + struct ceph_request_redirect *redir) +{ + u8 struct_v, struct_cv; + u32 len; + void *struct_end; + int ret; + + ceph_decode_need(p, end, 1 + 1 + 4, e_inval); + struct_v = ceph_decode_8(p); + struct_cv = ceph_decode_8(p); + if (struct_cv > 1) { + pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", + struct_v, struct_cv); + goto e_inval; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, e_inval); + struct_end = *p + len; + + ret = ceph_oloc_decode(p, end, &redir->oloc); + if (ret) + goto out; + + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_request_redirect::object_name is set\n"); + goto e_inval; + } + + len = ceph_decode_32(p); + *p += len; /* skip osd_instructions */ + + /* skip the rest */ + *p = struct_end; +out: + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + static void complete_request(struct ceph_osd_request *req) { complete_all(&req->r_safe_completion); /* fsync waiter */ @@ -1497,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, { void *p, *end; struct ceph_osd_request *req; + struct ceph_request_redirect redir; u64 tid; int object_len; unsigned int numops; @@ -1576,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, for (i = 0; i < numops; i++) req->r_reply_op_result[i] = ceph_decode_32(&p); - already_completed = req->r_got_reply; + if (le16_to_cpu(msg->hdr.version) >= 6) { + p += 8 + 4; /* skip replay_version */ + p += 8; /* skip user_version */ - if (!req->r_got_reply) { + err = ceph_redirect_decode(&p, end, &redir); + if (err) + goto bad_put; + } else { + redir.oloc.pool = -1; + } + if (redir.oloc.pool != -1) { + dout("redirect pool %lld\n", redir.oloc.pool); + + __unregister_request(osdc, req); + mutex_unlock(&osdc->request_mutex); + + req->r_target_oloc = redir.oloc; /* struct */ + + /* + * Start redirect requests with nofail=true. If + * mapping fails, request will end up on the notarget + * list, waiting for the new osdmap (which can take + * a while), even though the original request mapped + * successfully. In the future we might want to follow + * original request's nofail setting here. + */ + err = ceph_osdc_start_request(osdc, req, true); + BUG_ON(err); + + goto done; + } + + already_completed = req->r_got_reply; + if (!req->r_got_reply) { req->r_result = result; dout("handle_reply result %d bytes %d\n", req->r_result, bytes); |