summaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c127
1 files changed, 88 insertions, 39 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 010ff3b..b0dfce7 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -436,6 +436,7 @@ static bool osd_req_opcode_valid(u16 opcode)
case CEPH_OSD_OP_OMAPCLEAR:
case CEPH_OSD_OP_OMAPRMKEYS:
case CEPH_OSD_OP_OMAP_CMP:
+ case CEPH_OSD_OP_SETALLOCHINT:
case CEPH_OSD_OP_CLONERANGE:
case CEPH_OSD_OP_ASSERT_SRC_VERSION:
case CEPH_OSD_OP_SRC_CMPXATTR:
@@ -591,6 +592,26 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_watch_init);
+void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
+ unsigned int which,
+ u64 expected_object_size,
+ u64 expected_write_size)
+{
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
+ CEPH_OSD_OP_SETALLOCHINT);
+
+ op->alloc_hint.expected_object_size = expected_object_size;
+ op->alloc_hint.expected_write_size = expected_write_size;
+
+ /*
+ * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
+ * not worth a feature bit. Set FAILOK per-op flag to make
+ * sure older osds don't trip over an unsupported opcode.
+ */
+ op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
+}
+EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
+
static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
struct ceph_osd_data *osd_data)
{
@@ -681,6 +702,12 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
dst->watch.ver = cpu_to_le64(src->watch.ver);
dst->watch.flag = src->watch.flag;
break;
+ case CEPH_OSD_OP_SETALLOCHINT:
+ dst->alloc_hint.expected_object_size =
+ cpu_to_le64(src->alloc_hint.expected_object_size);
+ dst->alloc_hint.expected_write_size =
+ cpu_to_le64(src->alloc_hint.expected_write_size);
+ break;
default:
pr_err("unsupported osd opcode %s\n",
ceph_osd_op_name(src->op));
@@ -688,7 +715,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
return 0;
}
+
dst->op = cpu_to_le16(src->op);
+ dst->flags = cpu_to_le32(src->flags);
dst->payload_len = cpu_to_le32(src->payload_len);
return request_data_len;
@@ -1304,7 +1333,7 @@ static int __map_request(struct ceph_osd_client *osdc,
{
struct ceph_pg pgid;
int acting[CEPH_PG_MAX_SIZE];
- int o = -1, num = 0;
+ int num, o;
int err;
bool was_paused;
@@ -1317,11 +1346,9 @@ static int __map_request(struct ceph_osd_client *osdc,
}
req->r_pgid = pgid;
- err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
- if (err > 0) {
- o = acting[0];
- num = err;
- }
+ num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o);
+ if (num < 0)
+ num = 0;
was_paused = req->r_paused;
req->r_paused = __req_should_be_paused(osdc, req);
@@ -1427,6 +1454,40 @@ static void __send_queued(struct ceph_osd_client *osdc)
}
/*
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req,
+ bool nofail)
+{
+ int rc;
+
+ __register_request(osdc, req);
+ req->r_sent = 0;
+ req->r_got_reply = 0;
+ rc = __map_request(osdc, req, 0);
+ if (rc < 0) {
+ if (nofail) {
+ dout("osdc_start_request failed map, "
+ " will retry %lld\n", req->r_tid);
+ rc = 0;
+ } else {
+ __unregister_request(osdc, req);
+ }
+ return rc;
+ }
+
+ if (req->r_osd == NULL) {
+ dout("send_request %p no up osds in pg\n", req);
+ ceph_monc_request_next_osdmap(&osdc->client->monc);
+ } else {
+ __send_queued(osdc);
+ }
+
+ return 0;
+}
+
+/*
* Timeout callback, called every N seconds when 1 or more osd
* requests has been active for more than N seconds. When this
* happens, we ping all OSDs with requests who have timed out to
@@ -1653,6 +1714,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
osdmap_epoch = ceph_decode_32(&p);
/* lookup */
+ down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid);
if (req == NULL) {
@@ -1709,7 +1771,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
dout("redirect pool %lld\n", redir.oloc.pool);
__unregister_request(osdc, req);
- mutex_unlock(&osdc->request_mutex);
req->r_target_oloc = redir.oloc; /* struct */
@@ -1721,10 +1782,10 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
* successfully. In the future we might want to follow
* original request's nofail setting here.
*/
- err = ceph_osdc_start_request(osdc, req, true);
+ err = __ceph_osdc_start_request(osdc, req, true);
BUG_ON(err);
- goto done;
+ goto out_unlock;
}
already_completed = req->r_got_reply;
@@ -1742,8 +1803,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
req->r_got_reply = 1;
} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
dout("handle_reply tid %llu dup ack\n", tid);
- mutex_unlock(&osdc->request_mutex);
- goto done;
+ goto out_unlock;
}
dout("handle_reply tid %llu flags %d\n", tid, flags);
@@ -1758,6 +1818,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
__unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
+ up_read(&osdc->map_sem);
if (!already_completed) {
if (req->r_unsafe_callback &&
@@ -1775,10 +1836,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
complete_request(req);
}
-done:
+out:
dout("req=%p req->r_linger=%d\n", req, req->r_linger);
ceph_osdc_put_request(req);
return;
+out_unlock:
+ mutex_unlock(&osdc->request_mutex);
+ up_read(&osdc->map_sem);
+ goto out;
bad_put:
req->r_result = -EIO;
@@ -1791,6 +1856,7 @@ bad_put:
ceph_osdc_put_request(req);
bad_mutex:
mutex_unlock(&osdc->request_mutex);
+ up_read(&osdc->map_sem);
bad:
pr_err("corrupt osd_op_reply got %d %d\n",
(int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
@@ -1994,7 +2060,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
int skipped_map = 0;
dout("taking full map %u len %d\n", epoch, maplen);
- newmap = osdmap_decode(&p, p+maplen);
+ newmap = ceph_osdmap_decode(&p, p+maplen);
if (IS_ERR(newmap)) {
err = PTR_ERR(newmap);
goto bad;
@@ -2043,7 +2109,6 @@ bad:
pr_err("osdc handle_map corrupt msg\n");
ceph_msg_dump(msg);
up_write(&osdc->map_sem);
- return;
}
/*
@@ -2242,7 +2307,6 @@ done_err:
bad:
pr_err("osdc handle_watch_notify corrupt msg\n");
- return;
}
/*
@@ -2351,34 +2415,16 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req,
bool nofail)
{
- int rc = 0;
+ int rc;
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
- __register_request(osdc, req);
- req->r_sent = 0;
- req->r_got_reply = 0;
- rc = __map_request(osdc, req, 0);
- if (rc < 0) {
- if (nofail) {
- dout("osdc_start_request failed map, "
- " will retry %lld\n", req->r_tid);
- rc = 0;
- } else {
- __unregister_request(osdc, req);
- }
- goto out_unlock;
- }
- if (req->r_osd == NULL) {
- dout("send_request %p no up osds in pg\n", req);
- ceph_monc_request_next_osdmap(&osdc->client->monc);
- } else {
- __send_queued(osdc);
- }
- rc = 0;
-out_unlock:
+
+ rc = __ceph_osdc_start_request(osdc, req, nofail);
+
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
+
return rc;
}
EXPORT_SYMBOL(ceph_osdc_start_request);
@@ -2504,9 +2550,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
err = -ENOMEM;
osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
if (!osdc->notify_wq)
- goto out_msgpool;
+ goto out_msgpool_reply;
+
return 0;
+out_msgpool_reply:
+ ceph_msgpool_destroy(&osdc->msgpool_op_reply);
out_msgpool:
ceph_msgpool_destroy(&osdc->msgpool_op);
out_mempool:
OpenPOWER on IntegriCloud