diff options
-rw-r--r-- | fs/ceph/mds_client.c | 2 | ||||
-rw-r--r-- | fs/ceph/messenger.c | 142 | ||||
-rw-r--r-- | fs/ceph/messenger.h | 9 | ||||
-rw-r--r-- | fs/ceph/mon_client.c | 25 | ||||
-rw-r--r-- | fs/ceph/osd_client.c | 17 |
5 files changed, 115 insertions, 80 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 623c67c..93998a0 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2953,8 +2953,6 @@ const static struct ceph_connection_operations mds_con_ops = { .get_authorizer = get_authorizer, .verify_authorizer_reply = verify_authorizer_reply, .peer_reset = peer_reset, - .alloc_msg = ceph_alloc_msg, - .alloc_middle = ceph_alloc_middle, }; diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 1360708..25de15c0 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -1279,8 +1279,34 @@ static void process_ack(struct ceph_connection *con) +static int read_partial_message_section(struct ceph_connection *con, + struct kvec *section, unsigned int sec_len, + u32 *crc) +{ + int left; + int ret; + + BUG_ON(!section); + + while (section->iov_len < sec_len) { + BUG_ON(section->iov_base == NULL); + left = sec_len - section->iov_len; + ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + + section->iov_len, left); + if (ret <= 0) + return ret; + section->iov_len += ret; + if (section->iov_len == sec_len) + *crc = crc32c(0, section->iov_base, + section->iov_len); + } + return 1; +} +static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, + struct ceph_msg_header *hdr, + int *skip); /* * read (part of) a message. */ @@ -1292,6 +1318,7 @@ static int read_partial_message(struct ceph_connection *con) int to, want, left; unsigned front_len, middle_len, data_len, data_off; int datacrc = con->msgr->nocrc; + int skip; dout("read_partial_message con %p msg %p\n", con, m); @@ -1315,7 +1342,6 @@ static int read_partial_message(struct ceph_connection *con) } } } - front_len = le32_to_cpu(con->in_hdr.front_len); if (front_len > CEPH_MSG_MAX_FRONT_LEN) return -EIO; @@ -1330,8 +1356,8 @@ static int read_partial_message(struct ceph_connection *con) if (!con->in_msg) { dout("got hdr type %d front %d data %d\n", con->in_hdr.type, con->in_hdr.front_len, con->in_hdr.data_len); - con->in_msg = con->ops->alloc_msg(con, &con->in_hdr); - if (!con->in_msg) { + con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); + if (skip) { /* skip this message */ pr_err("alloc_msg returned NULL, skipping message\n"); con->in_base_pos = -front_len - middle_len - data_len - @@ -1342,56 +1368,28 @@ static int read_partial_message(struct ceph_connection *con) if (IS_ERR(con->in_msg)) { ret = PTR_ERR(con->in_msg); con->in_msg = NULL; - con->error_msg = "out of memory for incoming message"; + con->error_msg = "error allocating memory for incoming message"; return ret; } m = con->in_msg; m->front.iov_len = 0; /* haven't read it yet */ + if (m->middle) + m->middle->vec.iov_len = 0; memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr)); } /* front */ - while (m->front.iov_len < front_len) { - BUG_ON(m->front.iov_base == NULL); - left = front_len - m->front.iov_len; - ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base + - m->front.iov_len, left); - if (ret <= 0) - return ret; - m->front.iov_len += ret; - if (m->front.iov_len == front_len) - con->in_front_crc = crc32c(0, m->front.iov_base, - m->front.iov_len); - } + ret = read_partial_message_section(con, &m->front, front_len, + &con->in_front_crc); + if (ret <= 0) + return ret; /* middle */ - while (middle_len > 0 && (!m->middle || - m->middle->vec.iov_len < middle_len)) { - if (m->middle == NULL) { - ret = -EOPNOTSUPP; - if (con->ops->alloc_middle) - ret = con->ops->alloc_middle(con, m); - if (ret < 0) { - pr_err("alloc_middle fail skipping payload\n"); - con->in_base_pos = -middle_len - data_len - - sizeof(m->footer); - ceph_msg_put(con->in_msg); - con->in_msg = NULL; - con->in_tag = CEPH_MSGR_TAG_READY; - return 0; - } - m->middle->vec.iov_len = 0; - } - left = middle_len - m->middle->vec.iov_len; - ret = ceph_tcp_recvmsg(con->sock, - (char *)m->middle->vec.iov_base + - m->middle->vec.iov_len, left); + if (m->middle) { + ret = read_partial_message_section(con, &m->middle->vec, middle_len, + &con->in_middle_crc); if (ret <= 0) return ret; - m->middle->vec.iov_len += ret; - if (m->middle->vec.iov_len == middle_len) - con->in_middle_crc = crc32c(0, m->middle->vec.iov_base, - m->middle->vec.iov_len); } /* (page) data */ @@ -2116,31 +2114,13 @@ out: } /* - * Generic message allocator, for incoming messages. - */ -struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, - struct ceph_msg_header *hdr) -{ - int type = le16_to_cpu(hdr->type); - int front_len = le32_to_cpu(hdr->front_len); - struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL); - - if (!msg) { - pr_err("unable to allocate msg type %d len %d\n", - type, front_len); - return ERR_PTR(-ENOMEM); - } - return msg; -} - -/* * Allocate "middle" portion of a message, if it is needed and wasn't * allocated by alloc_msg. This allows us to read a small fixed-size * per-type header in the front and then gracefully fail (i.e., * propagate the error to the caller based on info in the front) when * the middle is too large. */ -int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) +static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) { int type = le16_to_cpu(msg->hdr.type); int middle_len = le32_to_cpu(msg->hdr.middle_len); @@ -2156,6 +2136,48 @@ int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) return 0; } +/* + * Generic message allocator, for incoming messages. + */ +static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, + struct ceph_msg_header *hdr, + int *skip) +{ + int type = le16_to_cpu(hdr->type); + int front_len = le32_to_cpu(hdr->front_len); + int middle_len = le32_to_cpu(hdr->middle_len); + struct ceph_msg *msg = NULL; + int ret; + + if (con->ops->alloc_msg) { + msg = con->ops->alloc_msg(con, hdr, skip); + if (IS_ERR(msg)) + return msg; + + if (*skip) + return NULL; + } + if (!msg) { + *skip = 0; + msg = ceph_msg_new(type, front_len, 0, 0, NULL); + if (!msg) { + pr_err("unable to allocate msg type %d len %d\n", + type, front_len); + return ERR_PTR(-ENOMEM); + } + } + + if (middle_len) { + ret = ceph_alloc_middle(con, msg); + + if (ret < 0) { + ceph_msg_put(msg); + return msg; + } + } + return msg; +} + /* * Free a generically kmalloc'd message. diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index a7b6841..b6bec590 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h @@ -44,9 +44,8 @@ struct ceph_connection_operations { void (*peer_reset) (struct ceph_connection *con); struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, - struct ceph_msg_header *hdr); - int (*alloc_middle) (struct ceph_connection *con, - struct ceph_msg *msg); + struct ceph_msg_header *hdr, + int *skip); /* an incoming message has a data payload; tell me what pages I * should read the data into. */ int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m, @@ -242,10 +241,6 @@ extern struct ceph_msg *ceph_msg_new(int type, int front_len, struct page **pages); extern void ceph_msg_kfree(struct ceph_msg *m); -extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, - struct ceph_msg_header *hdr); -extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg); - static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) { diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c index 223e8bc..6c00b37 100644 --- a/fs/ceph/mon_client.c +++ b/fs/ceph/mon_client.c @@ -692,21 +692,33 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) * Allocate memory for incoming message */ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, - struct ceph_msg_header *hdr) + struct ceph_msg_header *hdr, + int *skip) { struct ceph_mon_client *monc = con->private; int type = le16_to_cpu(hdr->type); - int front = le32_to_cpu(hdr->front_len); + int front_len = le32_to_cpu(hdr->front_len); + struct ceph_msg *m; + *skip = 0; switch (type) { case CEPH_MSG_MON_SUBSCRIBE_ACK: - return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front); + m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len); + break; case CEPH_MSG_STATFS_REPLY: - return ceph_msgpool_get(&monc->msgpool_statfs_reply, front); + m = ceph_msgpool_get(&monc->msgpool_statfs_reply, front_len); + break; case CEPH_MSG_AUTH_REPLY: - return ceph_msgpool_get(&monc->msgpool_auth_reply, front); + m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len); + break; + default: + return NULL; } - return ceph_alloc_msg(con, hdr); + + if (!m) + *skip = 1; + + return m; } /* @@ -749,5 +761,4 @@ const static struct ceph_connection_operations mon_con_ops = { .dispatch = dispatch, .fault = mon_fault, .alloc_msg = mon_alloc_msg, - .alloc_middle = ceph_alloc_middle, }; diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index 8417e21..545e936 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c @@ -1304,18 +1304,28 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) } static struct ceph_msg *alloc_msg(struct ceph_connection *con, - struct ceph_msg_header *hdr) + struct ceph_msg_header *hdr, + int *skip) { struct ceph_osd *osd = con->private; struct ceph_osd_client *osdc = osd->o_osdc; int type = le16_to_cpu(hdr->type); int front = le32_to_cpu(hdr->front_len); + struct ceph_msg *m; + *skip = 0; switch (type) { case CEPH_MSG_OSD_OPREPLY: - return ceph_msgpool_get(&osdc->msgpool_op_reply, front); + m = ceph_msgpool_get(&osdc->msgpool_op_reply, front); + break; + default: + return NULL; } - return ceph_alloc_msg(con, hdr); + + if (!m) + *skip = 1; + + return m; } /* @@ -1390,6 +1400,5 @@ const static struct ceph_connection_operations osd_con_ops = { .verify_authorizer_reply = verify_authorizer_reply, .alloc_msg = alloc_msg, .fault = osd_reset, - .alloc_middle = ceph_alloc_middle, .prepare_pages = prepare_pages, }; |