summaryrefslogtreecommitdiffstats
path: root/fs/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/osd_client.c')
-rw-r--r--fs/ceph/osd_client.c249
1 files changed, 195 insertions, 54 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 2647daf..c5d818e 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -6,12 +6,16 @@
#include <linux/pagemap.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
+#ifdef CONFIG_BLOCK
+#include <linux/bio.h>
+#endif
#include "super.h"
#include "osd_client.h"
#include "messenger.h"
#include "decode.h"
#include "auth.h"
+#include "pagelist.h"
#define OSD_OP_FRONT_LEN 4096
#define OSD_OPREPLY_FRONT_LEN 512
@@ -22,29 +26,50 @@ static int __kick_requests(struct ceph_osd_client *osdc,
static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static int op_needs_trail(int op)
+{
+ switch (op) {
+ case CEPH_OSD_OP_GETXATTR:
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_CMPXATTR:
+ case CEPH_OSD_OP_CALL:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+static int op_has_extent(int op)
+{
+ return (op == CEPH_OSD_OP_READ ||
+ op == CEPH_OSD_OP_WRITE);
+}
+
void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
u64 snapid,
- u64 off, u64 len, u64 *bno,
- struct ceph_osd_request *req)
+ u64 off, u64 *plen, u64 *bno,
+ struct ceph_osd_request *req,
+ struct ceph_osd_req_op *op)
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
- struct ceph_osd_op *op = (void *)(reqhead + 1);
- u64 orig_len = len;
+ u64 orig_len = *plen;
u64 objoff, objlen; /* extent in object */
reqhead->snapid = cpu_to_le64(snapid);
/* object extent? */
- ceph_calc_file_object_mapping(layout, off, &len, bno,
+ ceph_calc_file_object_mapping(layout, off, plen, bno,
&objoff, &objlen);
- if (len < orig_len)
+ if (*plen < orig_len)
dout(" skipping last %llu, final file extent %llu~%llu\n",
- orig_len - len, off, len);
+ orig_len - *plen, off, *plen);
- op->extent.offset = cpu_to_le64(objoff);
- op->extent.length = cpu_to_le64(objlen);
- req->r_num_pages = calc_pages_for(off, len);
+ if (op_has_extent(op->op)) {
+ op->extent.offset = objoff;
+ op->extent.length = objlen;
+ }
+ req->r_num_pages = calc_pages_for(off, *plen);
dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
*bno, objoff, objlen, req->r_num_pages);
@@ -80,11 +105,13 @@ static void calc_layout(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 *plen,
- struct ceph_osd_request *req)
+ struct ceph_osd_request *req,
+ struct ceph_osd_req_op *op)
{
u64 bno;
- ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
+ ceph_calc_raw_layout(osdc, layout, vino.snap, off,
+ plen, &bno, req, op);
sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
req->r_oid_len = strlen(req->r_oid);
@@ -113,35 +140,64 @@ void ceph_osdc_release_request(struct kref *kref)
if (req->r_own_pages)
ceph_release_page_vector(req->r_pages,
req->r_num_pages);
+#ifdef CONFIG_BLOCK
+ if (req->r_bio)
+ bio_put(req->r_bio);
+#endif
ceph_put_snap_context(req->r_snapc);
+ if (req->r_trail) {
+ ceph_pagelist_release(req->r_trail);
+ kfree(req->r_trail);
+ }
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
else
kfree(req);
}
+static int op_needs_trail(int op)
+{
+ switch (op) {
+ case CEPH_OSD_OP_GETXATTR:
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_CMPXATTR:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
+{
+ int i = 0;
+
+ if (needs_trail)
+ *needs_trail = 0;
+ while (ops[i].op) {
+ if (needs_trail && op_needs_trail(ops[i].op))
+ *needs_trail = 1;
+ i++;
+ }
+
+ return i;
+}
+
struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
int flags,
struct ceph_snap_context *snapc,
- int do_sync,
+ struct ceph_osd_req_op *ops,
bool use_mempool,
gfp_t gfp_flags,
- struct page **pages)
+ struct page **pages,
+ struct bio *bio)
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
- int num_op = 1 + do_sync;
- size_t msg_size = sizeof(struct ceph_osd_request_head) +
- num_op*sizeof(struct ceph_osd_op);
+ int needs_trail;
+ int num_op = get_num_ops(ops, &needs_trail);
+ size_t msg_size = sizeof(struct ceph_osd_request_head);
- if (use_mempool) {
- req = mempool_alloc(osdc->req_mempool, gfp_flags);
- memset(req, 0, sizeof(*req));
- } else {
- req = kzalloc(sizeof(*req), gfp_flags);
- }
- if (!req)
- return NULL;
+ msg_size += num_op*sizeof(struct ceph_osd_op);
if (use_mempool) {
req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -154,6 +210,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
req->r_osdc = osdc;
req->r_mempool = use_mempool;
+
kref_init(&req->r_kref);
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
@@ -174,6 +231,15 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
}
req->r_reply = msg;
+ /* allocate space for the trailing data */
+ if (needs_trail) {
+ req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
+ if (!req->r_trail) {
+ ceph_osdc_put_request(req);
+ return NULL;
+ }
+ ceph_pagelist_init(req->r_trail);
+ }
/* create request message; allow space for oid */
msg_size += 40;
if (snapc)
@@ -186,38 +252,87 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
ceph_osdc_put_request(req);
return NULL;
}
+
msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
memset(msg->front.iov_base, 0, msg->front.iov_len);
req->r_request = msg;
req->r_pages = pages;
+#ifdef CONFIG_BLOCK
+ if (bio) {
+ req->r_bio = bio;
+ bio_get(req->r_bio);
+ }
+#endif
return req;
}
+static void osd_req_encode_op(struct ceph_osd_request *req,
+ struct ceph_osd_op *dst,
+ struct ceph_osd_req_op *src)
+{
+ dst->op = cpu_to_le16(src->op);
+
+ switch (dst->op) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_WRITE:
+ dst->extent.offset =
+ cpu_to_le64(src->extent.offset);
+ dst->extent.length =
+ cpu_to_le64(src->extent.length);
+ dst->extent.truncate_size =
+ cpu_to_le64(src->extent.truncate_size);
+ dst->extent.truncate_seq =
+ cpu_to_le32(src->extent.truncate_seq);
+ break;
+
+ case CEPH_OSD_OP_GETXATTR:
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_CMPXATTR:
+ BUG_ON(!req->r_trail);
+
+ dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
+ dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
+ dst->xattr.cmp_op = src->xattr.cmp_op;
+ dst->xattr.cmp_mode = src->xattr.cmp_mode;
+ ceph_pagelist_append(req->r_trail, src->xattr.name,
+ src->xattr.name_len);
+ ceph_pagelist_append(req->r_trail, src->xattr.val,
+ src->xattr.value_len);
+ break;
+ case CEPH_OSD_OP_STARTSYNC:
+ break;
+ default:
+ pr_err("unrecognized osd opcode %d\n", dst->op);
+ WARN_ON(1);
+ break;
+ }
+ dst->payload_len = cpu_to_le32(src->payload_len);
+}
+
/*
* build new request AND message
*
*/
void ceph_osdc_build_request(struct ceph_osd_request *req,
- u64 off, u64 *plen,
- int opcode,
- struct ceph_snap_context *snapc,
- int do_sync,
- u32 truncate_seq,
- u64 truncate_size,
- struct timespec *mtime,
- const char *oid,
- int oid_len)
+ u64 off, u64 *plen,
+ struct ceph_osd_req_op *src_ops,
+ struct ceph_snap_context *snapc,
+ struct timespec *mtime,
+ const char *oid,
+ int oid_len)
{
struct ceph_msg *msg = req->r_request;
struct ceph_osd_request_head *head;
+ struct ceph_osd_req_op *src_op;
struct ceph_osd_op *op;
void *p;
- int num_op = 1 + do_sync;
+ int num_op = get_num_ops(src_ops, NULL);
size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
- int i;
int flags = req->r_flags;
+ u64 data_len = 0;
+ int i;
head = msg->front.iov_base;
op = (void *)(head + 1);
@@ -230,25 +345,23 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
if (flags & CEPH_OSD_FLAG_WRITE)
ceph_encode_timespec(&head->mtime, mtime);
head->num_ops = cpu_to_le16(num_op);
- op->op = cpu_to_le16(opcode);
- if (flags & CEPH_OSD_FLAG_WRITE) {
- req->r_request->hdr.data_off = cpu_to_le16(off);
- req->r_request->hdr.data_len = cpu_to_le32(*plen);
- op->payload_len = cpu_to_le32(*plen);
- }
- op->extent.truncate_size = cpu_to_le64(truncate_size);
- op->extent.truncate_seq = cpu_to_le32(truncate_seq);
/* fill in oid */
head->object_len = cpu_to_le32(oid_len);
memcpy(p, oid, oid_len);
p += oid_len;
- if (do_sync) {
+ src_op = src_ops;
+ while (src_op->op) {
+ osd_req_encode_op(req, op, src_op);
+ src_op++;
op++;
- op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
}
+
+ if (req->r_trail)
+ data_len += req->r_trail->length;
+
if (snapc) {
head->snap_seq = cpu_to_le64(snapc->seq);
head->num_snaps = cpu_to_le32(snapc->num_snaps);
@@ -258,6 +371,14 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
}
}
+ if (flags & CEPH_OSD_FLAG_WRITE) {
+ req->r_request->hdr.data_off = cpu_to_le16(off);
+ req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
+ } else if (data_len) {
+ req->r_request->hdr.data_off = 0;
+ req->r_request->hdr.data_len = cpu_to_le32(data_len);
+ }
+
BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
msg_size = p - msg->front.iov_base;
msg->front.iov_len = msg_size;
@@ -288,21 +409,34 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
struct timespec *mtime,
bool use_mempool, int num_reply)
{
- struct ceph_osd_request *req =
- ceph_osdc_alloc_request(osdc, flags,
- snapc, do_sync,
+ struct ceph_osd_req_op ops[3];
+ struct ceph_osd_request *req;
+
+ ops[0].op = opcode;
+ ops[0].extent.truncate_seq = truncate_seq;
+ ops[0].extent.truncate_size = truncate_size;
+ ops[0].payload_len = 0;
+
+ if (do_sync) {
+ ops[1].op = CEPH_OSD_OP_STARTSYNC;
+ ops[1].payload_len = 0;
+ ops[2].op = 0;
+ } else
+ ops[1].op = 0;
+
+ req = ceph_osdc_alloc_request(osdc, flags,
+ snapc, ops,
use_mempool,
- GFP_NOFS, NULL);
+ GFP_NOFS, NULL, NULL);
if (IS_ERR(req))
return req;
/* calculate max write size */
- calc_layout(osdc, vino, layout, off, plen, req);
+ calc_layout(osdc, vino, layout, off, plen, req, ops);
req->r_file_layout = *layout; /* keep a copy */
- ceph_osdc_build_request(req, off, plen, opcode,
- snapc, do_sync,
- truncate_seq, truncate_size,
+ ceph_osdc_build_request(req, off, plen, ops,
+ snapc,
mtime,
req->r_oid, req->r_oid_len);
@@ -1177,6 +1311,10 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
req->r_request->pages = req->r_pages;
req->r_request->nr_pages = req->r_num_pages;
+#ifdef CONFIG_BLOCK
+ req->r_request->bio = req->r_bio;
+#endif
+ req->r_request->trail = req->r_trail;
register_request(osdc, req);
@@ -1493,6 +1631,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
}
m->pages = req->r_pages;
m->nr_pages = req->r_num_pages;
+#ifdef CONFIG_BLOCK
+ m->bio = req->r_bio;
+#endif
}
*skip = 0;
req->r_con_filling_msg = ceph_con_get(con);
OpenPOWER on IntegriCloud