diff options
Diffstat (limited to 'sys/rpc/clnt_dg.c')
-rw-r--r-- | sys/rpc/clnt_dg.c | 218 |
1 files changed, 146 insertions, 72 deletions
diff --git a/sys/rpc/clnt_dg.c b/sys/rpc/clnt_dg.c index c66ac50..f14e1d6 100644 --- a/sys/rpc/clnt_dg.c +++ b/sys/rpc/clnt_dg.c @@ -45,6 +45,7 @@ __FBSDID("$FreeBSD$"); #include <sys/param.h> #include <sys/systm.h> +#include <sys/kernel.h> #include <sys/lock.h> #include <sys/malloc.h> #include <sys/mbuf.h> @@ -70,8 +71,8 @@ __FBSDID("$FreeBSD$"); #endif static bool_t time_not_ok(struct timeval *); -static enum clnt_stat clnt_dg_call(CLIENT *, rpcproc_t, xdrproc_t, void *, - xdrproc_t, void *, struct timeval); +static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, + rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval); static void clnt_dg_geterr(CLIENT *, struct rpc_err *); static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); static void clnt_dg_abort(CLIENT *); @@ -91,10 +92,13 @@ static struct clnt_ops clnt_dg_ops = { static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; /* - * A pending RPC request which awaits a reply. + * A pending RPC request which awaits a reply. Requests which have + * received their reply will have cr_xid set to zero and cr_mrep to + * the mbuf chain of the reply. */ struct cu_request { TAILQ_ENTRY(cu_request) cr_link; + CLIENT *cr_client; /* owner */ uint32_t cr_xid; /* XID of request */ struct mbuf *cr_mrep; /* reply received by upcall */ int cr_error; /* any error from upcall */ @@ -123,6 +127,8 @@ struct cu_socket { * Private data kept per client handle */ struct cu_data { + int cu_threads; /* # threads in clnt_vc_call */ + bool_t cu_closing; /* TRUE if we are destroying */ struct socket *cu_socket; /* connection socket */ bool_t cu_closeit; /* opened by library */ struct sockaddr_storage cu_raddr; /* remote address */ @@ -203,10 +209,12 @@ clnt_dg_create( sendsz = ((sendsz + 3) / 4) * 4; recvsz = ((recvsz + 3) / 4) * 4; cu = mem_alloc(sizeof (*cu)); + cu->cu_threads = 0; + cu->cu_closing = FALSE; (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); cu->cu_rlen = svcaddr->sa_len; /* Other values can also be set through clnt_control() */ - cu->cu_wait.tv_sec = 15; /* heuristically chosen */ + cu->cu_wait.tv_sec = 3; /* heuristically chosen */ cu->cu_wait.tv_usec = 0; cu->cu_total.tv_sec = -1; cu->cu_total.tv_usec = -1; @@ -237,6 +245,7 @@ clnt_dg_create( */ cu->cu_closeit = FALSE; cu->cu_socket = so; + soreserve(so, 256*1024, 256*1024); SOCKBUF_LOCK(&so->so_rcv); recheck_socket: @@ -274,6 +283,7 @@ recheck_socket: } SOCKBUF_UNLOCK(&so->so_rcv); + cl->cl_refs = 1; cl->cl_ops = &clnt_dg_ops; cl->cl_private = (caddr_t)(void *)cu; cl->cl_auth = authnone_create(); @@ -291,7 +301,8 @@ err2: static enum clnt_stat clnt_dg_call( - CLIENT *cl, /* client handle */ + CLIENT *cl, /* client handle */ + struct rpc_callextra *ext, /* call metadata */ rpcproc_t proc, /* procedure number */ xdrproc_t xargs, /* xdr routine for args */ void *argsp, /* pointer to args */ @@ -301,30 +312,52 @@ clnt_dg_call( { struct cu_data *cu = (struct cu_data *)cl->cl_private; struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; + AUTH *auth; XDR xdrs; struct rpc_msg reply_msg; bool_t ok; + int retrans; /* number of re-transmits so far */ int nrefreshes = 2; /* number of times to refresh cred */ - struct timeval timeout; - struct timeval retransmit_time; - struct timeval next_sendtime, starttime, time_waited, tv; + struct timeval *tvp; + int timeout; + int retransmit_time; + int next_sendtime, starttime, time_waited, tv; struct sockaddr *sa; socklen_t salen; uint32_t xid; struct mbuf *mreq = NULL; - struct cu_request cr; + struct cu_request *cr; int error; + cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); + mtx_lock(&cs->cs_lock); - cr.cr_mrep = NULL; - cr.cr_error = 0; + if (cu->cu_closing) { + mtx_unlock(&cs->cs_lock); + free(cr, M_RPC); + return (RPC_CANTSEND); + } + cu->cu_threads++; + + if (ext) + auth = ext->rc_auth; + else + auth = cl->cl_auth; + + cr->cr_client = cl; + cr->cr_mrep = NULL; + cr->cr_error = 0; if (cu->cu_total.tv_usec == -1) { - timeout = utimeout; /* use supplied timeout */ + tvp = &utimeout; /* use supplied timeout */ } else { - timeout = cu->cu_total; /* use default timeout */ + tvp = &cu->cu_total; /* use default timeout */ } + if (tvp->tv_sec || tvp->tv_usec) + timeout = tvtohz(tvp); + else + timeout = 0; if (cu->cu_connect && !cu->cu_connected) { mtx_unlock(&cs->cs_lock); @@ -345,11 +378,11 @@ clnt_dg_call( sa = (struct sockaddr *)&cu->cu_raddr; salen = cu->cu_rlen; } - time_waited.tv_sec = 0; - time_waited.tv_usec = 0; - retransmit_time = next_sendtime = cu->cu_wait; + time_waited = 0; + retrans = 0; + retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); - getmicrotime(&starttime); + starttime = ticks; call_again: mtx_assert(&cs->cs_lock, MA_OWNED); @@ -376,7 +409,7 @@ send_again: goto get_reply; if ((! XDR_PUTINT32(&xdrs, &proc)) || - (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) || + (! AUTH_MARSHALL(auth, &xdrs)) || (! (*xargs)(&xdrs, argsp))) { cu->cu_error.re_status = RPC_CANTENCODEARGS; mtx_lock(&cs->cs_lock); @@ -384,9 +417,9 @@ send_again: } m_fixhdr(mreq); - cr.cr_xid = xid; + cr->cr_xid = xid; mtx_lock(&cs->cs_lock); - TAILQ_INSERT_TAIL(&cs->cs_pending, &cr, cr_link); + TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); mtx_unlock(&cs->cs_lock); /* @@ -406,8 +439,7 @@ send_again: mtx_lock(&cs->cs_lock); if (error) { - TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); - + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); cu->cu_error.re_errno = error; cu->cu_error.re_status = RPC_CANTSEND; goto out; @@ -415,24 +447,24 @@ send_again: /* * Check to see if we got an upcall while waiting for the - * lock. In both these cases, the request has been removed - * from cs->cs_pending. + * lock. */ - if (cr.cr_error) { - cu->cu_error.re_errno = cr.cr_error; + if (cr->cr_error) { + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); + cu->cu_error.re_errno = cr->cr_error; cu->cu_error.re_status = RPC_CANTRECV; goto out; } - if (cr.cr_mrep) { + if (cr->cr_mrep) { + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); goto got_reply; } /* * Hack to provide rpc-based message passing */ - if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { - if (cr.cr_xid) - TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); + if (timeout == 0) { + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); cu->cu_error.re_status = RPC_TIMEDOUT; goto out; } @@ -440,17 +472,23 @@ send_again: get_reply: for (;;) { /* Decide how long to wait. */ - if (timevalcmp(&next_sendtime, &timeout, <)) { + if (next_sendtime < timeout) tv = next_sendtime; - } else { + else tv = timeout; + tv -= time_waited; + + if (tv > 0) { + if (cu->cu_closing) + error = 0; + else + error = msleep(cr, &cs->cs_lock, + cu->cu_waitflag, cu->cu_waitchan, tv); + } else { + error = EWOULDBLOCK; } - timevalsub(&tv, &time_waited); - if (tv.tv_sec < 0 || tv.tv_usec < 0) - tv.tv_sec = tv.tv_usec = 0; - error = msleep(&cr, &cs->cs_lock, cu->cu_waitflag, - cu->cu_waitchan, tvtohz(&tv)); + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); if (!error) { /* @@ -458,8 +496,8 @@ get_reply: * upcall had a receive error, report that, * otherwise we have a reply. */ - if (cr.cr_error) { - cu->cu_error.re_errno = cr.cr_error; + if (cr->cr_error) { + cu->cu_error.re_errno = cr->cr_error; cu->cu_error.re_status = RPC_CANTRECV; goto out; } @@ -472,8 +510,6 @@ get_reply: * re-send the request. */ if (error != EWOULDBLOCK) { - if (cr.cr_xid) - TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); cu->cu_error.re_errno = error; if (error == EINTR) cu->cu_error.re_status = RPC_INTR; @@ -482,29 +518,40 @@ get_reply: goto out; } - getmicrotime(&tv); - time_waited = tv; - timevalsub(&time_waited, &starttime); + time_waited = ticks - starttime; /* Check for timeout. */ - if (timevalcmp(&time_waited, &timeout, >)) { - if (cr.cr_xid) - TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); + if (time_waited > timeout) { cu->cu_error.re_errno = EWOULDBLOCK; cu->cu_error.re_status = RPC_TIMEDOUT; goto out; } /* Retransmit if necessary. */ - if (timevalcmp(&time_waited, &next_sendtime, >)) { - if (cr.cr_xid) - TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); + if (time_waited >= next_sendtime) { + if (ext && ext->rc_feedback) { + mtx_unlock(&cs->cs_lock); + if (retrans == 0) + ext->rc_feedback(FEEDBACK_REXMIT1, + proc, ext->rc_feedback_arg); + else + ext->rc_feedback(FEEDBACK_REXMIT2, + proc, ext->rc_feedback_arg); + mtx_lock(&cs->cs_lock); + } + if (cu->cu_closing) { + cu->cu_error.re_errno = ESHUTDOWN; + cu->cu_error.re_status = RPC_CANTRECV; + goto out; + } + retrans++; /* update retransmit_time */ - if (retransmit_time.tv_sec < RPC_MAX_BACKOFF) - timevaladd(&retransmit_time, &retransmit_time); - timevaladd(&next_sendtime, &retransmit_time); + if (retransmit_time < RPC_MAX_BACKOFF * hz) + retransmit_time = 2 * retransmit_time; + next_sendtime += retransmit_time; goto send_again; } + TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); } got_reply: @@ -514,10 +561,13 @@ got_reply: */ mtx_unlock(&cs->cs_lock); - xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE); + if (ext && ext->rc_feedback) + ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); + + xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); ok = xdr_replymsg(&xdrs, &reply_msg); XDR_DESTROY(&xdrs); - cr.cr_mrep = NULL; + cr->cr_mrep = NULL; mtx_lock(&cs->cs_lock); @@ -562,10 +612,17 @@ out: if (mreq) m_freem(mreq); - if (cr.cr_mrep) - m_freem(cr.cr_mrep); + if (cr->cr_mrep) + m_freem(cr->cr_mrep); + cu->cu_threads--; + if (cu->cu_closing) + wakeup(cu); + mtx_unlock(&cs->cs_lock); + + free(cr, M_RPC); + return (cu->cu_error.re_status); } @@ -732,30 +789,44 @@ clnt_dg_destroy(CLIENT *cl) { struct cu_data *cu = (struct cu_data *)cl->cl_private; struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; + struct cu_request *cr; struct socket *so = NULL; bool_t lastsocketref; - SOCKBUF_LOCK(&cu->cu_socket->so_rcv); - mtx_lock(&cs->cs_lock); + + /* + * Abort any pending requests and wait until everyone + * has finished with clnt_vc_call. + */ + cu->cu_closing = TRUE; + TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { + if (cr->cr_client == cl) { + cr->cr_xid = 0; + cr->cr_error = ESHUTDOWN; + wakeup(cr); + } + } + + while (cu->cu_threads) + msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); + cs->cs_refs--; if (cs->cs_refs == 0) { + mtx_destroy(&cs->cs_lock); + SOCKBUF_LOCK(&cu->cu_socket->so_rcv); cu->cu_socket->so_upcallarg = NULL; cu->cu_socket->so_upcall = NULL; cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL; - mtx_destroy(&cs->cs_lock); SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); mem_free(cs, sizeof(*cs)); lastsocketref = TRUE; } else { mtx_unlock(&cs->cs_lock); - SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); lastsocketref = FALSE; } - if (cu->cu_closeit) { - KASSERT(lastsocketref, ("clnt_dg_destroy(): closing a socket " - "shared with other clients")); + if (cu->cu_closeit && lastsocketref) { so = cu->cu_socket; cu->cu_socket = NULL; } @@ -812,10 +883,10 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) if (error) { mtx_lock(&cs->cs_lock); TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { + cr->cr_xid = 0; cr->cr_error = error; wakeup(cr); } - TAILQ_INIT(&cs->cs_pending); mtx_unlock(&cs->cs_lock); break; } @@ -825,7 +896,11 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) */ m = m_pullup(m, sizeof(xid)); if (!m) - break; + /* + * Should never happen. + */ + continue; + xid = ntohl(*mtod(m, uint32_t *)); /* @@ -836,14 +911,13 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { if (cr->cr_xid == xid) { /* - * This one matches. We snip it out of - * the pending list and leave the + * This one matches. We leave the * reply mbuf in cr->cr_mrep. Set the - * XID to zero so that clnt_dg_call - * can know not to repeat the - * TAILQ_REMOVE. + * XID to zero so that we will ignore + * any duplicated replies that arrive + * before clnt_dg_call removes it from + * the queue. */ - TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); cr->cr_xid = 0; cr->cr_mrep = m; cr->cr_error = 0; |