diff options
author | mav <mav@FreeBSD.org> | 2013-12-19 21:31:28 +0000 |
---|---|---|
committer | mav <mav@FreeBSD.org> | 2013-12-19 21:31:28 +0000 |
commit | 17e956838c06da00cf97085488d43b4f72b9b34d (patch) | |
tree | f0f7499bfbce904cd24014b8181f12114c92bae3 /sys/rpc | |
parent | d687ec321f16395e1d889bce6214a49e676f630a (diff) | |
download | FreeBSD-src-17e956838c06da00cf97085488d43b4f72b9b34d.zip FreeBSD-src-17e956838c06da00cf97085488d43b4f72b9b34d.tar.gz |
Rework flow control for connection-oriented (TCP) RPC server.
When processing receive buffer, write the amount of data, expected
in present request record, into socket's so_rcv.sb_lowat to make stack
aware about our needs. When processing following upcalls, ignore them
until socket collect enough data to be read and processed in one turn.
This change reduces number of context switches and other operations
in RPC stack during large NFS writes (especially via non-Jumbo networks)
by order of magnitude.
After precessing current packet, take another look into the pending
buffer to find out whether the next packet had been already received.
If not, deactivate this port right there without making RPC code to
push this port to another thread just to find that there is nothing.
If the next packet is received partially, also deactivate the port, but
also update socket's so_rcv.sb_lowat to not be woken up prematurely.
This change additionally reduces number of context switches per NFS
request about in half.
Diffstat (limited to 'sys/rpc')
-rw-r--r-- | sys/rpc/svc_vc.c | 243 |
1 files changed, 115 insertions, 128 deletions
diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c index 8298c95..0adb3fd 100644 --- a/sys/rpc/svc_vc.c +++ b/sys/rpc/svc_vc.c @@ -381,15 +381,11 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg, * We must re-test for new connections after taking * the lock to protect us in the case where a new * connection arrives after our call to accept fails - * with EWOULDBLOCK. The pool lock protects us from - * racing the upcall after our TAILQ_EMPTY() call - * returns false. + * with EWOULDBLOCK. */ ACCEPT_LOCK(); - mtx_lock(&xprt->xp_pool->sp_lock); if (TAILQ_EMPTY(&xprt->xp_socket->so_comp)) - xprt_inactive_locked(xprt); - mtx_unlock(&xprt->xp_pool->sp_lock); + xprt_inactive(xprt); ACCEPT_UNLOCK(); sx_xunlock(&xprt->xp_lock); return (FALSE); @@ -526,35 +522,14 @@ static enum xprt_stat svc_vc_stat(SVCXPRT *xprt) { struct cf_conn *cd; - struct mbuf *m; - size_t n; cd = (struct cf_conn *)(xprt->xp_p1); if (cd->strm_stat == XPRT_DIED) return (XPRT_DIED); - /* - * Return XPRT_MOREREQS if we have buffered data and we are - * mid-record or if we have enough data for a record - * marker. Since this is only a hint, we read mpending and - * resid outside the lock. We do need to take the lock if we - * have to traverse the mbuf chain. - */ - if (cd->mpending) { - if (cd->resid) - return (XPRT_MOREREQS); - n = 0; - sx_xlock(&xprt->xp_lock); - m = cd->mpending; - while (m && n < sizeof(uint32_t)) { - n += m->m_len; - m = m->m_next; - } - sx_xunlock(&xprt->xp_lock); - if (n >= sizeof(uint32_t)) - return (XPRT_MOREREQS); - } + if (cd->mreq != NULL && cd->resid == 0 && cd->eor) + return (XPRT_MOREREQS); if (soreadable(xprt->xp_socket)) return (XPRT_MOREREQS); @@ -575,6 +550,78 @@ svc_vc_backchannel_stat(SVCXPRT *xprt) return (XPRT_IDLE); } +/* + * If we have an mbuf chain in cd->mpending, try to parse a record from it, + * leaving the result in cd->mreq. If we don't have a complete record, leave + * the partial result in cd->mreq and try to read more from the socket. + */ +static void +svc_vc_process_pending(SVCXPRT *xprt) +{ + struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; + struct socket *so = xprt->xp_socket; + struct mbuf *m; + + /* + * If cd->resid is non-zero, we have part of the + * record already, otherwise we are expecting a record + * marker. + */ + if (!cd->resid && cd->mpending) { + /* + * See if there is enough data buffered to + * make up a record marker. Make sure we can + * handle the case where the record marker is + * split across more than one mbuf. + */ + size_t n = 0; + uint32_t header; + + m = cd->mpending; + while (n < sizeof(uint32_t) && m) { + n += m->m_len; + m = m->m_next; + } + if (n < sizeof(uint32_t)) { + so->so_rcv.sb_lowat = sizeof(uint32_t) - n; + return; + } + m_copydata(cd->mpending, 0, sizeof(header), + (char *)&header); + header = ntohl(header); + cd->eor = (header & 0x80000000) != 0; + cd->resid = header & 0x7fffffff; + m_adj(cd->mpending, sizeof(uint32_t)); + } + + /* + * Start pulling off mbufs from cd->mpending + * until we either have a complete record or + * we run out of data. We use m_split to pull + * data - it will pull as much as possible and + * split the last mbuf if necessary. + */ + while (cd->mpending && cd->resid) { + m = cd->mpending; + if (cd->mpending->m_next + || cd->mpending->m_len > cd->resid) + cd->mpending = m_split(cd->mpending, + cd->resid, M_WAITOK); + else + cd->mpending = NULL; + if (cd->mreq) + m_last(cd->mreq)->m_next = m; + else + cd->mreq = m; + while (m) { + cd->resid -= m->m_len; + m = m->m_next; + } + } + + so->so_rcv.sb_lowat = imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2)); +} + static bool_t svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, struct sockaddr **addrp, struct mbuf **mp) @@ -582,6 +629,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; struct uio uio; struct mbuf *m; + struct socket* so = xprt->xp_socket; XDR xdrs; int error, rcvflag; @@ -592,99 +640,40 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, sx_xlock(&xprt->xp_lock); for (;;) { - /* - * If we have an mbuf chain in cd->mpending, try to parse a - * record from it, leaving the result in cd->mreq. If we don't - * have a complete record, leave the partial result in - * cd->mreq and try to read more from the socket. - */ - if (cd->mpending) { - /* - * If cd->resid is non-zero, we have part of the - * record already, otherwise we are expecting a record - * marker. - */ - if (!cd->resid) { - /* - * See if there is enough data buffered to - * make up a record marker. Make sure we can - * handle the case where the record marker is - * split across more than one mbuf. - */ - size_t n = 0; - uint32_t header; - - m = cd->mpending; - while (n < sizeof(uint32_t) && m) { - n += m->m_len; - m = m->m_next; - } - if (n < sizeof(uint32_t)) - goto readmore; - m_copydata(cd->mpending, 0, sizeof(header), - (char *)&header); - header = ntohl(header); - cd->eor = (header & 0x80000000) != 0; - cd->resid = header & 0x7fffffff; - m_adj(cd->mpending, sizeof(uint32_t)); + /* If we have no request ready, check pending queue. */ + while (cd->mpending && + (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) + svc_vc_process_pending(xprt); + + /* Process and return complete request in cd->mreq. */ + if (cd->mreq != NULL && cd->resid == 0 && cd->eor) { + + xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); + cd->mreq = NULL; + + /* Check for next request in a pending queue. */ + svc_vc_process_pending(xprt); + if (cd->mreq == NULL || cd->resid != 0) { + SOCKBUF_LOCK(&so->so_rcv); + if (!soreadable(so)) + xprt_inactive(xprt); + SOCKBUF_UNLOCK(&so->so_rcv); } - /* - * Start pulling off mbufs from cd->mpending - * until we either have a complete record or - * we run out of data. We use m_split to pull - * data - it will pull as much as possible and - * split the last mbuf if necessary. - */ - while (cd->mpending && cd->resid) { - m = cd->mpending; - if (cd->mpending->m_next - || cd->mpending->m_len > cd->resid) - cd->mpending = m_split(cd->mpending, - cd->resid, M_WAITOK); - else - cd->mpending = NULL; - if (cd->mreq) - m_last(cd->mreq)->m_next = m; - else - cd->mreq = m; - while (m) { - cd->resid -= m->m_len; - m = m->m_next; - } - } + sx_xunlock(&xprt->xp_lock); - /* - * If cd->resid is zero now, we have managed to - * receive a record fragment from the stream. Check - * for the end-of-record mark to see if we need more. - */ - if (cd->resid == 0) { - if (!cd->eor) - continue; - - /* - * Success - we have a complete record in - * cd->mreq. - */ - xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); - cd->mreq = NULL; - sx_xunlock(&xprt->xp_lock); - - if (! xdr_callmsg(&xdrs, msg)) { - XDR_DESTROY(&xdrs); - return (FALSE); - } - - *addrp = NULL; - *mp = xdrmbuf_getall(&xdrs); + if (! xdr_callmsg(&xdrs, msg)) { XDR_DESTROY(&xdrs); - - return (TRUE); + return (FALSE); } + + *addrp = NULL; + *mp = xdrmbuf_getall(&xdrs); + XDR_DESTROY(&xdrs); + + return (TRUE); } - readmore: /* * The socket upcall calls xprt_active() which will eventually * cause the server to call us here. We attempt to @@ -697,8 +686,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, uio.uio_td = curthread; m = NULL; rcvflag = MSG_DONTWAIT; - error = soreceive(xprt->xp_socket, NULL, &uio, &m, NULL, - &rcvflag); + error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); if (error == EWOULDBLOCK) { /* @@ -706,25 +694,23 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, * taking the lock to protect us in the case * where a new packet arrives on the socket * after our call to soreceive fails with - * EWOULDBLOCK. The pool lock protects us from - * racing the upcall after our soreadable() - * call returns false. + * EWOULDBLOCK. */ - mtx_lock(&xprt->xp_pool->sp_lock); - if (!soreadable(xprt->xp_socket)) - xprt_inactive_locked(xprt); - mtx_unlock(&xprt->xp_pool->sp_lock); + SOCKBUF_LOCK(&so->so_rcv); + if (!soreadable(so)) + xprt_inactive(xprt); + SOCKBUF_UNLOCK(&so->so_rcv); sx_xunlock(&xprt->xp_lock); return (FALSE); } if (error) { - SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); + SOCKBUF_LOCK(&so->so_rcv); if (xprt->xp_upcallset) { xprt->xp_upcallset = 0; - soupcall_clear(xprt->xp_socket, SO_RCV); + soupcall_clear(so, SO_RCV); } - SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); + SOCKBUF_UNLOCK(&so->so_rcv); xprt_inactive(xprt); cd->strm_stat = XPRT_DIED; sx_xunlock(&xprt->xp_lock); @@ -908,7 +894,8 @@ svc_vc_soupcall(struct socket *so, void *arg, int waitflag) { SVCXPRT *xprt = (SVCXPRT *) arg; - xprt_active(xprt); + if (soreadable(xprt->xp_socket)) + xprt_active(xprt); return (SU_OK); } |