summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sys/rpc/svc_vc.c243
1 files changed, 115 insertions, 128 deletions
diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c
index 8298c95..0adb3fd 100644
--- a/sys/rpc/svc_vc.c
+++ b/sys/rpc/svc_vc.c
@@ -381,15 +381,11 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
* We must re-test for new connections after taking
* the lock to protect us in the case where a new
* connection arrives after our call to accept fails
- * with EWOULDBLOCK. The pool lock protects us from
- * racing the upcall after our TAILQ_EMPTY() call
- * returns false.
+ * with EWOULDBLOCK.
*/
ACCEPT_LOCK();
- mtx_lock(&xprt->xp_pool->sp_lock);
if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
- xprt_inactive_locked(xprt);
- mtx_unlock(&xprt->xp_pool->sp_lock);
+ xprt_inactive(xprt);
ACCEPT_UNLOCK();
sx_xunlock(&xprt->xp_lock);
return (FALSE);
@@ -526,35 +522,14 @@ static enum xprt_stat
svc_vc_stat(SVCXPRT *xprt)
{
struct cf_conn *cd;
- struct mbuf *m;
- size_t n;
cd = (struct cf_conn *)(xprt->xp_p1);
if (cd->strm_stat == XPRT_DIED)
return (XPRT_DIED);
- /*
- * Return XPRT_MOREREQS if we have buffered data and we are
- * mid-record or if we have enough data for a record
- * marker. Since this is only a hint, we read mpending and
- * resid outside the lock. We do need to take the lock if we
- * have to traverse the mbuf chain.
- */
- if (cd->mpending) {
- if (cd->resid)
- return (XPRT_MOREREQS);
- n = 0;
- sx_xlock(&xprt->xp_lock);
- m = cd->mpending;
- while (m && n < sizeof(uint32_t)) {
- n += m->m_len;
- m = m->m_next;
- }
- sx_xunlock(&xprt->xp_lock);
- if (n >= sizeof(uint32_t))
- return (XPRT_MOREREQS);
- }
+ if (cd->mreq != NULL && cd->resid == 0 && cd->eor)
+ return (XPRT_MOREREQS);
if (soreadable(xprt->xp_socket))
return (XPRT_MOREREQS);
@@ -575,6 +550,78 @@ svc_vc_backchannel_stat(SVCXPRT *xprt)
return (XPRT_IDLE);
}
+/*
+ * If we have an mbuf chain in cd->mpending, try to parse a record from it,
+ * leaving the result in cd->mreq. If we don't have a complete record, leave
+ * the partial result in cd->mreq and try to read more from the socket.
+ */
+static void
+svc_vc_process_pending(SVCXPRT *xprt)
+{
+ struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
+ struct socket *so = xprt->xp_socket;
+ struct mbuf *m;
+
+ /*
+ * If cd->resid is non-zero, we have part of the
+ * record already, otherwise we are expecting a record
+ * marker.
+ */
+ if (!cd->resid && cd->mpending) {
+ /*
+ * See if there is enough data buffered to
+ * make up a record marker. Make sure we can
+ * handle the case where the record marker is
+ * split across more than one mbuf.
+ */
+ size_t n = 0;
+ uint32_t header;
+
+ m = cd->mpending;
+ while (n < sizeof(uint32_t) && m) {
+ n += m->m_len;
+ m = m->m_next;
+ }
+ if (n < sizeof(uint32_t)) {
+ so->so_rcv.sb_lowat = sizeof(uint32_t) - n;
+ return;
+ }
+ m_copydata(cd->mpending, 0, sizeof(header),
+ (char *)&header);
+ header = ntohl(header);
+ cd->eor = (header & 0x80000000) != 0;
+ cd->resid = header & 0x7fffffff;
+ m_adj(cd->mpending, sizeof(uint32_t));
+ }
+
+ /*
+ * Start pulling off mbufs from cd->mpending
+ * until we either have a complete record or
+ * we run out of data. We use m_split to pull
+ * data - it will pull as much as possible and
+ * split the last mbuf if necessary.
+ */
+ while (cd->mpending && cd->resid) {
+ m = cd->mpending;
+ if (cd->mpending->m_next
+ || cd->mpending->m_len > cd->resid)
+ cd->mpending = m_split(cd->mpending,
+ cd->resid, M_WAITOK);
+ else
+ cd->mpending = NULL;
+ if (cd->mreq)
+ m_last(cd->mreq)->m_next = m;
+ else
+ cd->mreq = m;
+ while (m) {
+ cd->resid -= m->m_len;
+ m = m->m_next;
+ }
+ }
+
+ so->so_rcv.sb_lowat = imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2));
+}
+
static bool_t
svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
struct sockaddr **addrp, struct mbuf **mp)
@@ -582,6 +629,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
struct uio uio;
struct mbuf *m;
+ struct socket* so = xprt->xp_socket;
XDR xdrs;
int error, rcvflag;
@@ -592,99 +640,40 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
sx_xlock(&xprt->xp_lock);
for (;;) {
- /*
- * If we have an mbuf chain in cd->mpending, try to parse a
- * record from it, leaving the result in cd->mreq. If we don't
- * have a complete record, leave the partial result in
- * cd->mreq and try to read more from the socket.
- */
- if (cd->mpending) {
- /*
- * If cd->resid is non-zero, we have part of the
- * record already, otherwise we are expecting a record
- * marker.
- */
- if (!cd->resid) {
- /*
- * See if there is enough data buffered to
- * make up a record marker. Make sure we can
- * handle the case where the record marker is
- * split across more than one mbuf.
- */
- size_t n = 0;
- uint32_t header;
-
- m = cd->mpending;
- while (n < sizeof(uint32_t) && m) {
- n += m->m_len;
- m = m->m_next;
- }
- if (n < sizeof(uint32_t))
- goto readmore;
- m_copydata(cd->mpending, 0, sizeof(header),
- (char *)&header);
- header = ntohl(header);
- cd->eor = (header & 0x80000000) != 0;
- cd->resid = header & 0x7fffffff;
- m_adj(cd->mpending, sizeof(uint32_t));
+ /* If we have no request ready, check pending queue. */
+ while (cd->mpending &&
+ (cd->mreq == NULL || cd->resid != 0 || !cd->eor))
+ svc_vc_process_pending(xprt);
+
+ /* Process and return complete request in cd->mreq. */
+ if (cd->mreq != NULL && cd->resid == 0 && cd->eor) {
+
+ xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
+ cd->mreq = NULL;
+
+ /* Check for next request in a pending queue. */
+ svc_vc_process_pending(xprt);
+ if (cd->mreq == NULL || cd->resid != 0) {
+ SOCKBUF_LOCK(&so->so_rcv);
+ if (!soreadable(so))
+ xprt_inactive(xprt);
+ SOCKBUF_UNLOCK(&so->so_rcv);
}
- /*
- * Start pulling off mbufs from cd->mpending
- * until we either have a complete record or
- * we run out of data. We use m_split to pull
- * data - it will pull as much as possible and
- * split the last mbuf if necessary.
- */
- while (cd->mpending && cd->resid) {
- m = cd->mpending;
- if (cd->mpending->m_next
- || cd->mpending->m_len > cd->resid)
- cd->mpending = m_split(cd->mpending,
- cd->resid, M_WAITOK);
- else
- cd->mpending = NULL;
- if (cd->mreq)
- m_last(cd->mreq)->m_next = m;
- else
- cd->mreq = m;
- while (m) {
- cd->resid -= m->m_len;
- m = m->m_next;
- }
- }
+ sx_xunlock(&xprt->xp_lock);
- /*
- * If cd->resid is zero now, we have managed to
- * receive a record fragment from the stream. Check
- * for the end-of-record mark to see if we need more.
- */
- if (cd->resid == 0) {
- if (!cd->eor)
- continue;
-
- /*
- * Success - we have a complete record in
- * cd->mreq.
- */
- xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
- cd->mreq = NULL;
- sx_xunlock(&xprt->xp_lock);
-
- if (! xdr_callmsg(&xdrs, msg)) {
- XDR_DESTROY(&xdrs);
- return (FALSE);
- }
-
- *addrp = NULL;
- *mp = xdrmbuf_getall(&xdrs);
+ if (! xdr_callmsg(&xdrs, msg)) {
XDR_DESTROY(&xdrs);
-
- return (TRUE);
+ return (FALSE);
}
+
+ *addrp = NULL;
+ *mp = xdrmbuf_getall(&xdrs);
+ XDR_DESTROY(&xdrs);
+
+ return (TRUE);
}
- readmore:
/*
* The socket upcall calls xprt_active() which will eventually
* cause the server to call us here. We attempt to
@@ -697,8 +686,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
uio.uio_td = curthread;
m = NULL;
rcvflag = MSG_DONTWAIT;
- error = soreceive(xprt->xp_socket, NULL, &uio, &m, NULL,
- &rcvflag);
+ error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
if (error == EWOULDBLOCK) {
/*
@@ -706,25 +694,23 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
* taking the lock to protect us in the case
* where a new packet arrives on the socket
* after our call to soreceive fails with
- * EWOULDBLOCK. The pool lock protects us from
- * racing the upcall after our soreadable()
- * call returns false.
+ * EWOULDBLOCK.
*/
- mtx_lock(&xprt->xp_pool->sp_lock);
- if (!soreadable(xprt->xp_socket))
- xprt_inactive_locked(xprt);
- mtx_unlock(&xprt->xp_pool->sp_lock);
+ SOCKBUF_LOCK(&so->so_rcv);
+ if (!soreadable(so))
+ xprt_inactive(xprt);
+ SOCKBUF_UNLOCK(&so->so_rcv);
sx_xunlock(&xprt->xp_lock);
return (FALSE);
}
if (error) {
- SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
+ SOCKBUF_LOCK(&so->so_rcv);
if (xprt->xp_upcallset) {
xprt->xp_upcallset = 0;
- soupcall_clear(xprt->xp_socket, SO_RCV);
+ soupcall_clear(so, SO_RCV);
}
- SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
+ SOCKBUF_UNLOCK(&so->so_rcv);
xprt_inactive(xprt);
cd->strm_stat = XPRT_DIED;
sx_xunlock(&xprt->xp_lock);
@@ -908,7 +894,8 @@ svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
{
SVCXPRT *xprt = (SVCXPRT *) arg;
- xprt_active(xprt);
+ if (soreadable(xprt->xp_socket))
+ xprt_active(xprt);
return (SU_OK);
}
OpenPOWER on IntegriCloud