diff options
Diffstat (limited to 'sys/rpc')
-rw-r--r-- | sys/rpc/svc_vc.c | 243 |
1 files changed, 115 insertions, 128 deletions
diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c index 8298c95..0adb3fd 100644 --- a/sys/rpc/svc_vc.c +++ b/sys/rpc/svc_vc.c @@ -381,15 +381,11 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg, * We must re-test for new connections after taking * the lock to protect us in the case where a new * connection arrives after our call to accept fails - * with EWOULDBLOCK. The pool lock protects us from - * racing the upcall after our TAILQ_EMPTY() call - * returns false. + * with EWOULDBLOCK. */ ACCEPT_LOCK(); - mtx_lock(&xprt->xp_pool->sp_lock); if (TAILQ_EMPTY(&xprt->xp_socket->so_comp)) - xprt_inactive_locked(xprt); - mtx_unlock(&xprt->xp_pool->sp_lock); + xprt_inactive(xprt); ACCEPT_UNLOCK(); sx_xunlock(&xprt->xp_lock); return (FALSE); @@ -526,35 +522,14 @@ static enum xprt_stat svc_vc_stat(SVCXPRT *xprt) { struct cf_conn *cd; - struct mbuf *m; - size_t n; cd = (struct cf_conn *)(xprt->xp_p1); if (cd->strm_stat == XPRT_DIED) return (XPRT_DIED); - /* - * Return XPRT_MOREREQS if we have buffered data and we are - * mid-record or if we have enough data for a record - * marker. Since this is only a hint, we read mpending and - * resid outside the lock. We do need to take the lock if we - * have to traverse the mbuf chain. - */ - if (cd->mpending) { - if (cd->resid) - return (XPRT_MOREREQS); - n = 0; - sx_xlock(&xprt->xp_lock); - m = cd->mpending; - while (m && n < sizeof(uint32_t)) { - n += m->m_len; - m = m->m_next; - } - sx_xunlock(&xprt->xp_lock); - if (n >= sizeof(uint32_t)) - return (XPRT_MOREREQS); - } + if (cd->mreq != NULL && cd->resid == 0 && cd->eor) + return (XPRT_MOREREQS); if (soreadable(xprt->xp_socket)) return (XPRT_MOREREQS); @@ -575,6 +550,78 @@ svc_vc_backchannel_stat(SVCXPRT *xprt) return (XPRT_IDLE); } +/* + * If we have an mbuf chain in cd->mpending, try to parse a record from it, + * leaving the result in cd->mreq. If we don't have a complete record, leave + * the partial result in cd->mreq and try to read more from the socket. + */ +static void +svc_vc_process_pending(SVCXPRT *xprt) +{ + struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; + struct socket *so = xprt->xp_socket; + struct mbuf *m; + + /* + * If cd->resid is non-zero, we have part of the + * record already, otherwise we are expecting a record + * marker. + */ + if (!cd->resid && cd->mpending) { + /* + * See if there is enough data buffered to + * make up a record marker. Make sure we can + * handle the case where the record marker is + * split across more than one mbuf. + */ + size_t n = 0; + uint32_t header; + + m = cd->mpending; + while (n < sizeof(uint32_t) && m) { + n += m->m_len; + m = m->m_next; + } + if (n < sizeof(uint32_t)) { + so->so_rcv.sb_lowat = sizeof(uint32_t) - n; + return; + } + m_copydata(cd->mpending, 0, sizeof(header), + (char *)&header); + header = ntohl(header); + cd->eor = (header & 0x80000000) != 0; + cd->resid = header & 0x7fffffff; + m_adj(cd->mpending, sizeof(uint32_t)); + } + + /* + * Start pulling off mbufs from cd->mpending + * until we either have a complete record or + * we run out of data. We use m_split to pull + * data - it will pull as much as possible and + * split the last mbuf if necessary. + */ + while (cd->mpending && cd->resid) { + m = cd->mpending; + if (cd->mpending->m_next + || cd->mpending->m_len > cd->resid) + cd->mpending = m_split(cd->mpending, + cd->resid, M_WAITOK); + else + cd->mpending = NULL; + if (cd->mreq) + m_last(cd->mreq)->m_next = m; + else + cd->mreq = m; + while (m) { + cd->resid -= m->m_len; + m = m->m_next; + } + } + + so->so_rcv.sb_lowat = imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2)); +} + static bool_t svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, struct sockaddr **addrp, struct mbuf **mp) @@ -582,6 +629,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; struct uio uio; struct mbuf *m; + struct socket* so = xprt->xp_socket; XDR xdrs; int error, rcvflag; @@ -592,99 +640,40 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, sx_xlock(&xprt->xp_lock); for (;;) { - /* - * If we have an mbuf chain in cd->mpending, try to parse a - * record from it, leaving the result in cd->mreq. If we don't - * have a complete record, leave the partial result in - * cd->mreq and try to read more from the socket. - */ - if (cd->mpending) { - /* - * If cd->resid is non-zero, we have part of the - * record already, otherwise we are expecting a record - * marker. - */ - if (!cd->resid) { - /* - * See if there is enough data buffered to - * make up a record marker. Make sure we can - * handle the case where the record marker is - * split across more than one mbuf. - */ - size_t n = 0; - uint32_t header; - - m = cd->mpending; - while (n < sizeof(uint32_t) && m) { - n += m->m_len; - m = m->m_next; - } - if (n < sizeof(uint32_t)) - goto readmore; - m_copydata(cd->mpending, 0, sizeof(header), - (char *)&header); - header = ntohl(header); - cd->eor = (header & 0x80000000) != 0; - cd->resid = header & 0x7fffffff; - m_adj(cd->mpending, sizeof(uint32_t)); + /* If we have no request ready, check pending queue. */ + while (cd->mpending && + (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) + svc_vc_process_pending(xprt); + + /* Process and return complete request in cd->mreq. */ + if (cd->mreq != NULL && cd->resid == 0 && cd->eor) { + + xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); + cd->mreq = NULL; + + /* Check for next request in a pending queue. */ + svc_vc_process_pending(xprt); + if (cd->mreq == NULL || cd->resid != 0) { + SOCKBUF_LOCK(&so->so_rcv); + if (!soreadable(so)) + xprt_inactive(xprt); + SOCKBUF_UNLOCK(&so->so_rcv); } - /* - * Start pulling off mbufs from cd->mpending - * until we either have a complete record or - * we run out of data. We use m_split to pull - * data - it will pull as much as possible and - * split the last mbuf if necessary. - */ - while (cd->mpending && cd->resid) { - m = cd->mpending; - if (cd->mpending->m_next - || cd->mpending->m_len > cd->resid) - cd->mpending = m_split(cd->mpending, - cd->resid, M_WAITOK); - else - cd->mpending = NULL; - if (cd->mreq) - m_last(cd->mreq)->m_next = m; - else - cd->mreq = m; - while (m) { - cd->resid -= m->m_len; - m = m->m_next; - } - } + sx_xunlock(&xprt->xp_lock); - /* - * If cd->resid is zero now, we have managed to - * receive a record fragment from the stream. Check - * for the end-of-record mark to see if we need more. - */ - if (cd->resid == 0) { - if (!cd->eor) - continue; - - /* - * Success - we have a complete record in - * cd->mreq. - */ - xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); - cd->mreq = NULL; - sx_xunlock(&xprt->xp_lock); - - if (! xdr_callmsg(&xdrs, msg)) { - XDR_DESTROY(&xdrs); - return (FALSE); - } - - *addrp = NULL; - *mp = xdrmbuf_getall(&xdrs); + if (! xdr_callmsg(&xdrs, msg)) { XDR_DESTROY(&xdrs); - - return (TRUE); + return (FALSE); } + + *addrp = NULL; + *mp = xdrmbuf_getall(&xdrs); + XDR_DESTROY(&xdrs); + + return (TRUE); } - readmore: /* * The socket upcall calls xprt_active() which will eventually * cause the server to call us here. We attempt to @@ -697,8 +686,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, uio.uio_td = curthread; m = NULL; rcvflag = MSG_DONTWAIT; - error = soreceive(xprt->xp_socket, NULL, &uio, &m, NULL, - &rcvflag); + error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); if (error == EWOULDBLOCK) { /* @@ -706,25 +694,23 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, * taking the lock to protect us in the case * where a new packet arrives on the socket * after our call to soreceive fails with - * EWOULDBLOCK. The pool lock protects us from - * racing the upcall after our soreadable() - * call returns false. + * EWOULDBLOCK. */ - mtx_lock(&xprt->xp_pool->sp_lock); - if (!soreadable(xprt->xp_socket)) - xprt_inactive_locked(xprt); - mtx_unlock(&xprt->xp_pool->sp_lock); + SOCKBUF_LOCK(&so->so_rcv); + if (!soreadable(so)) + xprt_inactive(xprt); + SOCKBUF_UNLOCK(&so->so_rcv); sx_xunlock(&xprt->xp_lock); return (FALSE); } if (error) { - SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); + SOCKBUF_LOCK(&so->so_rcv); if (xprt->xp_upcallset) { xprt->xp_upcallset = 0; - soupcall_clear(xprt->xp_socket, SO_RCV); + soupcall_clear(so, SO_RCV); } - SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); + SOCKBUF_UNLOCK(&so->so_rcv); xprt_inactive(xprt); cd->strm_stat = XPRT_DIED; sx_xunlock(&xprt->xp_lock); @@ -908,7 +894,8 @@ svc_vc_soupcall(struct socket *so, void *arg, int waitflag) { SVCXPRT *xprt = (SVCXPRT *) arg; - xprt_active(xprt); + if (soreadable(xprt->xp_socket)) + xprt_active(xprt); return (SU_OK); } |