diff options
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 219 |
1 files changed, 131 insertions, 88 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index d2667e5..a00c74f 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -584,8 +584,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); } else { BUG_ON(num_ops > CEPH_OSD_MAX_OPS); - req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]), - gfp_flags); + req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags); } if (unlikely(!req)) return NULL; @@ -767,7 +766,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_extent_dup_last); -void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, +int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method) { struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, @@ -779,7 +778,9 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, BUG_ON(opcode != CEPH_OSD_OP_CALL); pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); - BUG_ON(!pagelist); + if (!pagelist) + return -ENOMEM; + ceph_pagelist_init(pagelist); op->cls.class_name = class; @@ -799,6 +800,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); op->indata_len = payload_len; + return 0; } EXPORT_SYMBOL(osd_req_op_cls_init); @@ -1027,7 +1029,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, truncate_size, truncate_seq); } - req->r_abort_on_full = true; req->r_flags = flags; req->r_base_oloc.pool = layout->pool_id; req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); @@ -1055,6 +1056,38 @@ EXPORT_SYMBOL(ceph_osdc_new_request); DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) +/* + * Call @fn on each OSD request as long as @fn returns 0. + */ +static void for_each_request(struct ceph_osd_client *osdc, + int (*fn)(struct ceph_osd_request *req, void *arg), + void *arg) +{ + struct rb_node *n, *p; + + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + + for (p = rb_first(&osd->o_requests); p; ) { + struct ceph_osd_request *req = + rb_entry(p, struct ceph_osd_request, r_node); + + p = rb_next(p); + if (fn(req, arg)) + return; + } + } + + for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { + struct ceph_osd_request *req = + rb_entry(p, struct ceph_osd_request, r_node); + + p = rb_next(p); + if (fn(req, arg)) + return; + } +} + static bool osd_homeless(struct ceph_osd *osd) { return osd->o_osd == CEPH_HOMELESS_OSD; @@ -1396,7 +1429,6 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, bool recovery_deletes = ceph_osdmap_flag(osdc, CEPH_OSDMAP_RECOVERY_DELETES); enum calc_target_result ct_res; - int ret; t->epoch = osdc->osdmap->epoch; pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); @@ -1432,14 +1464,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, } } - ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, - &pgid); - if (ret) { - WARN_ON(ret != -ENOENT); - t->osd = CEPH_HOMELESS_OSD; - ct_res = CALC_TARGET_POOL_DNE; - goto out; - } + __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid); last_pgid.pool = pgid.pool; last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); @@ -2162,9 +2187,9 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked) struct ceph_osd_client *osdc = req->r_osdc; struct ceph_osd *osd; enum calc_target_result ct_res; + int err = 0; bool need_send = false; bool promoted = false; - bool need_abort = false; WARN_ON(req->r_tid); dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); @@ -2180,7 +2205,10 @@ again: goto promote; } - if (osdc->osdmap->epoch < osdc->epoch_barrier) { + if (osdc->abort_err) { + dout("req %p abort_err %d\n", req, osdc->abort_err); + err = osdc->abort_err; + } else if (osdc->osdmap->epoch < osdc->epoch_barrier) { dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, osdc->epoch_barrier); req->r_t.paused = true; @@ -2201,11 +2229,13 @@ again: (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || pool_full(osdc, req->r_t.base_oloc.pool))) { dout("req %p full/pool_full\n", req); - pr_warn_ratelimited("FULL or reached pool quota\n"); - req->r_t.paused = true; - maybe_request_map(osdc); - if (req->r_abort_on_full) - need_abort = true; + if (osdc->abort_on_full) { + err = -ENOSPC; + } else { + pr_warn_ratelimited("FULL or reached pool quota\n"); + req->r_t.paused = true; + maybe_request_map(osdc); + } } else if (!osd_homeless(osd)) { need_send = true; } else { @@ -2222,11 +2252,11 @@ again: link_request(osd, req); if (need_send) send_request(req); - else if (need_abort) - complete_request(req, -ENOSPC); + else if (err) + complete_request(req, err); mutex_unlock(&osd->lock); - if (ct_res == CALC_TARGET_POOL_DNE) + if (!err && ct_res == CALC_TARGET_POOL_DNE) send_map_check(req); if (promoted) @@ -2282,11 +2312,21 @@ static void finish_request(struct ceph_osd_request *req) static void __complete_request(struct ceph_osd_request *req) { - if (req->r_callback) { - dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, - req->r_tid, req->r_callback, req->r_result); + dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, + req->r_tid, req->r_callback, req->r_result); + + if (req->r_callback) req->r_callback(req); - } + complete_all(&req->r_completion); + ceph_osdc_put_request(req); +} + +static void complete_request_workfn(struct work_struct *work) +{ + struct ceph_osd_request *req = + container_of(work, struct ceph_osd_request, r_complete_work); + + __complete_request(req); } /* @@ -2298,9 +2338,9 @@ static void complete_request(struct ceph_osd_request *req, int err) req->r_result = err; finish_request(req); - __complete_request(req); - complete_all(&req->r_completion); - ceph_osdc_put_request(req); + + INIT_WORK(&req->r_complete_work, complete_request_workfn); + queue_work(req->r_osdc->completion_wq, &req->r_complete_work); } static void cancel_map_check(struct ceph_osd_request *req) @@ -2337,6 +2377,28 @@ static void abort_request(struct ceph_osd_request *req, int err) complete_request(req, err); } +static int abort_fn(struct ceph_osd_request *req, void *arg) +{ + int err = *(int *)arg; + + abort_request(req, err); + return 0; /* continue iteration */ +} + +/* + * Abort all in-flight requests with @err and arrange for all future + * requests to be failed immediately. + */ +void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err) +{ + dout("%s osdc %p err %d\n", __func__, osdc, err); + down_write(&osdc->lock); + for_each_request(osdc, abort_fn, &err); + osdc->abort_err = err; + up_write(&osdc->lock); +} +EXPORT_SYMBOL(ceph_osdc_abort_requests); + static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) { if (likely(eb > osdc->epoch_barrier)) { @@ -2364,6 +2426,30 @@ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); /* + * We can end up releasing caps as a result of abort_request(). + * In that case, we probably want to ensure that the cap release message + * has an updated epoch barrier in it, so set the epoch barrier prior to + * aborting the first request. + */ +static int abort_on_full_fn(struct ceph_osd_request *req, void *arg) +{ + struct ceph_osd_client *osdc = req->r_osdc; + bool *victims = arg; + + if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && + (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || + pool_full(osdc, req->r_t.base_oloc.pool))) { + if (!*victims) { + update_epoch_barrier(osdc, osdc->osdmap->epoch); + *victims = true; + } + abort_request(req, -ENOSPC); + } + + return 0; /* continue iteration */ +} + +/* * Drop all pending requests that are stalled waiting on a full condition to * clear, and complete them with ENOSPC as the return code. Set the * osdc->epoch_barrier to the latest map epoch that we've seen if any were @@ -2371,61 +2457,11 @@ EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); */ static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) { - struct rb_node *n; bool victims = false; - dout("enter abort_on_full\n"); - - if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) - goto out; - - /* Scan list and see if there is anything to abort */ - for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { - struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); - struct rb_node *m; - - m = rb_first(&osd->o_requests); - while (m) { - struct ceph_osd_request *req = rb_entry(m, - struct ceph_osd_request, r_node); - m = rb_next(m); - - if (req->r_abort_on_full) { - victims = true; - break; - } - } - if (victims) - break; - } - - if (!victims) - goto out; - - /* - * Update the barrier to current epoch if it's behind that point, - * since we know we have some calls to be aborted in the tree. - */ - update_epoch_barrier(osdc, osdc->osdmap->epoch); - - for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { - struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); - struct rb_node *m; - - m = rb_first(&osd->o_requests); - while (m) { - struct ceph_osd_request *req = rb_entry(m, - struct ceph_osd_request, r_node); - m = rb_next(m); - - if (req->r_abort_on_full && - (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || - pool_full(osdc, req->r_t.target_oloc.pool))) - abort_request(req, -ENOSPC); - } - } -out: - dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier); + if (osdc->abort_on_full && + (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc))) + for_each_request(osdc, abort_on_full_fn, &victims); } static void check_pool_dne(struct ceph_osd_request *req) @@ -3542,8 +3578,6 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) up_read(&osdc->lock); __complete_request(req); - complete_all(&req->r_completion); - ceph_osdc_put_request(req); return; fail_request: @@ -4928,7 +4962,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, if (ret) goto out_put_req; - osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + if (ret) + goto out_put_req; + if (req_page) osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 0, false, false); @@ -4997,6 +5034,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (!osdc->notify_wq) goto out_msgpool_reply; + osdc->completion_wq = create_singlethread_workqueue("ceph-completion"); + if (!osdc->completion_wq) + goto out_notify_wq; + schedule_delayed_work(&osdc->timeout_work, osdc->client->options->osd_keepalive_timeout); schedule_delayed_work(&osdc->osds_timeout_work, @@ -5004,6 +5045,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) return 0; +out_notify_wq: + destroy_workqueue(osdc->notify_wq); out_msgpool_reply: ceph_msgpool_destroy(&osdc->msgpool_op_reply); out_msgpool: @@ -5018,7 +5061,7 @@ out: void ceph_osdc_stop(struct ceph_osd_client *osdc) { - flush_workqueue(osdc->notify_wq); + destroy_workqueue(osdc->completion_wq); destroy_workqueue(osdc->notify_wq); cancel_delayed_work_sync(&osdc->timeout_work); cancel_delayed_work_sync(&osdc->osds_timeout_work); |