diff options
Diffstat (limited to 'sys/rpc/clnt_dg.c')
-rw-r--r-- | sys/rpc/clnt_dg.c | 310 |
1 files changed, 241 insertions, 69 deletions
diff --git a/sys/rpc/clnt_dg.c b/sys/rpc/clnt_dg.c index f14e1d6..e6d101d 100644 --- a/sys/rpc/clnt_dg.c +++ b/sys/rpc/clnt_dg.c @@ -72,11 +72,12 @@ __FBSDID("$FreeBSD$"); static bool_t time_not_ok(struct timeval *); static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, - rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval); + rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); static void clnt_dg_geterr(CLIENT *, struct rpc_err *); static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); static void clnt_dg_abort(CLIENT *); static bool_t clnt_dg_control(CLIENT *, u_int, void *); +static void clnt_dg_close(CLIENT *); static void clnt_dg_destroy(CLIENT *); static void clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); @@ -85,6 +86,7 @@ static struct clnt_ops clnt_dg_ops = { .cl_abort = clnt_dg_abort, .cl_geterr = clnt_dg_geterr, .cl_freeres = clnt_dg_freeres, + .cl_close = clnt_dg_close, .cl_destroy = clnt_dg_destroy, .cl_control = clnt_dg_control }; @@ -102,6 +104,7 @@ struct cu_request { uint32_t cr_xid; /* XID of request */ struct mbuf *cr_mrep; /* reply received by upcall */ int cr_error; /* any error from upcall */ + char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ }; TAILQ_HEAD(cu_request_list, cu_request); @@ -120,7 +123,6 @@ struct cu_socket { struct mtx cs_lock; int cs_refs; /* Count of clients */ struct cu_request_list cs_pending; /* Requests awaiting replies */ - }; /* @@ -128,7 +130,8 @@ struct cu_socket { */ struct cu_data { int cu_threads; /* # threads in clnt_vc_call */ - bool_t cu_closing; /* TRUE if we are destroying */ + bool_t cu_closing; /* TRUE if we are closing */ + bool_t cu_closed; /* TRUE if we are closed */ struct socket *cu_socket; /* connection socket */ bool_t cu_closeit; /* opened by library */ struct sockaddr_storage cu_raddr; /* remote address */ @@ -146,8 +149,14 @@ struct cu_data { int cu_connected; /* Have done connect(). */ const char *cu_waitchan; int cu_waitflag; + int cu_cwnd; /* congestion window */ + int cu_sent; /* number of in-flight RPCs */ + bool_t cu_cwnd_wait; }; +#define CWNDSCALE 256 +#define MAXCWND (32 * CWNDSCALE) + /* * Connection less client creation returns with client handle parameters. * Default options are set, which the user can change using clnt_control(). @@ -211,6 +220,7 @@ clnt_dg_create( cu = mem_alloc(sizeof (*cu)); cu->cu_threads = 0; cu->cu_closing = FALSE; + cu->cu_closed = FALSE; (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); cu->cu_rlen = svcaddr->sa_len; /* Other values can also be set through clnt_control() */ @@ -225,6 +235,9 @@ clnt_dg_create( cu->cu_connected = FALSE; cu->cu_waitchan = "rpcrecv"; cu->cu_waitflag = 0; + cu->cu_cwnd = MAXCWND / 2; + cu->cu_sent = 0; + cu->cu_cwnd_wait = FALSE; (void) getmicrotime(&now); cu->cu_xid = __RPC_GETXID(&now); call_msg.rm_xid = cu->cu_xid; @@ -304,15 +317,16 @@ clnt_dg_call( CLIENT *cl, /* client handle */ struct rpc_callextra *ext, /* call metadata */ rpcproc_t proc, /* procedure number */ - xdrproc_t xargs, /* xdr routine for args */ - void *argsp, /* pointer to args */ - xdrproc_t xresults, /* xdr routine for results */ - void *resultsp, /* pointer to results */ + struct mbuf *args, /* pointer to args */ + struct mbuf **resultsp, /* pointer to results */ struct timeval utimeout) /* seconds to wait before giving up */ { struct cu_data *cu = (struct cu_data *)cl->cl_private; struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; + struct rpc_timers *rt; AUTH *auth; + struct rpc_err *errp; + enum clnt_stat stat; XDR xdrs; struct rpc_msg reply_msg; bool_t ok; @@ -321,11 +335,11 @@ clnt_dg_call( struct timeval *tvp; int timeout; int retransmit_time; - int next_sendtime, starttime, time_waited, tv; + int next_sendtime, starttime, rtt, time_waited, tv = 0; struct sockaddr *sa; socklen_t salen; - uint32_t xid; - struct mbuf *mreq = NULL; + uint32_t xid = 0; + struct mbuf *mreq = NULL, *results; struct cu_request *cr; int error; @@ -333,17 +347,20 @@ clnt_dg_call( mtx_lock(&cs->cs_lock); - if (cu->cu_closing) { + if (cu->cu_closing || cu->cu_closed) { mtx_unlock(&cs->cs_lock); free(cr, M_RPC); return (RPC_CANTSEND); } cu->cu_threads++; - if (ext) + if (ext) { auth = ext->rc_auth; - else + errp = &ext->rc_err; + } else { auth = cl->cl_auth; + errp = &cu->cu_error; + } cr->cr_client = cl; cr->cr_mrep = NULL; @@ -365,8 +382,8 @@ clnt_dg_call( (struct sockaddr *)&cu->cu_raddr, curthread); mtx_lock(&cs->cs_lock); if (error) { - cu->cu_error.re_errno = error; - cu->cu_error.re_status = RPC_CANTSEND; + errp->re_errno = error; + errp->re_status = stat = RPC_CANTSEND; goto out; } cu->cu_connected = 1; @@ -380,7 +397,15 @@ clnt_dg_call( } time_waited = 0; retrans = 0; - retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); + if (ext && ext->rc_timers) { + rt = ext->rc_timers; + if (!rt->rt_rtxcur) + rt->rt_rtxcur = tvtohz(&cu->cu_wait); + retransmit_time = next_sendtime = rt->rt_rtxcur; + } else { + rt = NULL; + retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); + } starttime = ticks; @@ -394,9 +419,9 @@ send_again: mtx_unlock(&cs->cs_lock); MGETHDR(mreq, M_WAIT, MT_DATA); - MCLGET(mreq, M_WAIT); - mreq->m_len = 0; - m_append(mreq, cu->cu_mcalllen, cu->cu_mcallc); + KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); + bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); + mreq->m_len = cu->cu_mcalllen; /* * The XID is the first thing in the request. @@ -405,20 +430,36 @@ send_again: xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); - if (cu->cu_async == TRUE && xargs == NULL) + if (cu->cu_async == TRUE && args == NULL) goto get_reply; if ((! XDR_PUTINT32(&xdrs, &proc)) || - (! AUTH_MARSHALL(auth, &xdrs)) || - (! (*xargs)(&xdrs, argsp))) { - cu->cu_error.re_status = RPC_CANTENCODEARGS; + (! AUTH_MARSHALL(auth, xid, &xdrs, + m_copym(args, 0, M_COPYALL, M_WAITOK)))) { + errp->re_status = stat = RPC_CANTENCODEARGS; mtx_lock(&cs->cs_lock); goto out; } - m_fixhdr(mreq); + mreq->m_pkthdr.len = m_length(mreq, NULL); cr->cr_xid = xid; mtx_lock(&cs->cs_lock); + + /* + * Try to get a place in the congestion window. + */ + while (cu->cu_sent >= cu->cu_cwnd) { + cu->cu_cwnd_wait = TRUE; + error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, + cu->cu_waitflag, "rpccwnd", 0); + if (error) { + errp->re_errno = error; + errp->re_status = stat = RPC_CANTSEND; + goto out; + } + } + cu->cu_sent += CWNDSCALE; + TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); mtx_unlock(&cs->cs_lock); @@ -433,15 +474,22 @@ send_again: * some clock time to spare while the packets are in flight. * (We assume that this is actually only executed once.) */ - reply_msg.acpted_rply.ar_verf = _null_auth; - reply_msg.acpted_rply.ar_results.where = resultsp; - reply_msg.acpted_rply.ar_results.proc = xresults; + reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; + reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; + reply_msg.acpted_rply.ar_verf.oa_length = 0; + reply_msg.acpted_rply.ar_results.where = NULL; + reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; mtx_lock(&cs->cs_lock); if (error) { TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); - cu->cu_error.re_errno = error; - cu->cu_error.re_status = RPC_CANTSEND; + errp->re_errno = error; + errp->re_status = stat = RPC_CANTSEND; + cu->cu_sent -= CWNDSCALE; + if (cu->cu_cwnd_wait) { + cu->cu_cwnd_wait = FALSE; + wakeup(&cu->cu_cwnd_wait); + } goto out; } @@ -451,12 +499,22 @@ send_again: */ if (cr->cr_error) { TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); - cu->cu_error.re_errno = cr->cr_error; - cu->cu_error.re_status = RPC_CANTRECV; + errp->re_errno = cr->cr_error; + errp->re_status = stat = RPC_CANTRECV; + cu->cu_sent -= CWNDSCALE; + if (cu->cu_cwnd_wait) { + cu->cu_cwnd_wait = FALSE; + wakeup(&cu->cu_cwnd_wait); + } goto out; } if (cr->cr_mrep) { TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); + cu->cu_sent -= CWNDSCALE; + if (cu->cu_cwnd_wait) { + cu->cu_cwnd_wait = FALSE; + wakeup(&cu->cu_cwnd_wait); + } goto got_reply; } @@ -465,7 +523,12 @@ send_again: */ if (timeout == 0) { TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); - cu->cu_error.re_status = RPC_TIMEDOUT; + errp->re_status = stat = RPC_TIMEDOUT; + cu->cu_sent -= CWNDSCALE; + if (cu->cu_cwnd_wait) { + cu->cu_cwnd_wait = FALSE; + wakeup(&cu->cu_cwnd_wait); + } goto out; } @@ -479,7 +542,7 @@ get_reply: tv -= time_waited; if (tv > 0) { - if (cu->cu_closing) + if (cu->cu_closing || cu->cu_closed) error = 0; else error = msleep(cr, &cs->cs_lock, @@ -489,6 +552,11 @@ get_reply: } TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); + cu->cu_sent -= CWNDSCALE; + if (cu->cu_cwnd_wait) { + cu->cu_cwnd_wait = FALSE; + wakeup(&cu->cu_cwnd_wait); + } if (!error) { /* @@ -497,10 +565,52 @@ get_reply: * otherwise we have a reply. */ if (cr->cr_error) { - cu->cu_error.re_errno = cr->cr_error; - cu->cu_error.re_status = RPC_CANTRECV; + errp->re_errno = cr->cr_error; + errp->re_status = stat = RPC_CANTRECV; goto out; } + + cu->cu_cwnd += (CWNDSCALE * CWNDSCALE + + cu->cu_cwnd / 2) / cu->cu_cwnd; + if (cu->cu_cwnd > MAXCWND) + cu->cu_cwnd = MAXCWND; + + if (rt) { + /* + * Add one to the time since a tick + * count of N means that the actual + * time taken was somewhere between N + * and N+1. + */ + rtt = ticks - starttime + 1; + + /* + * Update our estimate of the round + * trip time using roughly the + * algorithm described in RFC + * 2988. Given an RTT sample R: + * + * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| + * SRTT = (1-alpha) * SRTT + alpha * R + * + * where alpha = 0.125 and beta = 0.25. + * + * The initial retransmit timeout is + * SRTT + 4*RTTVAR and doubles on each + * retransmision. + */ + if (rt->rt_srtt == 0) { + rt->rt_srtt = rtt; + rt->rt_deviate = rtt / 2; + } else { + int32_t error = rtt - rt->rt_srtt; + rt->rt_srtt += error / 8; + error = abs(error) - rt->rt_deviate; + rt->rt_deviate += error / 4; + } + rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; + } + break; } @@ -510,11 +620,11 @@ get_reply: * re-send the request. */ if (error != EWOULDBLOCK) { - cu->cu_error.re_errno = error; + errp->re_errno = error; if (error == EINTR) - cu->cu_error.re_status = RPC_INTR; + errp->re_status = stat = RPC_INTR; else - cu->cu_error.re_status = RPC_CANTRECV; + errp->re_status = stat = RPC_CANTRECV; goto out; } @@ -522,13 +632,16 @@ get_reply: /* Check for timeout. */ if (time_waited > timeout) { - cu->cu_error.re_errno = EWOULDBLOCK; - cu->cu_error.re_status = RPC_TIMEDOUT; + errp->re_errno = EWOULDBLOCK; + errp->re_status = stat = RPC_TIMEDOUT; goto out; } /* Retransmit if necessary. */ if (time_waited >= next_sendtime) { + cu->cu_cwnd /= 2; + if (cu->cu_cwnd < CWNDSCALE) + cu->cu_cwnd = CWNDSCALE; if (ext && ext->rc_feedback) { mtx_unlock(&cs->cs_lock); if (retrans == 0) @@ -539,9 +652,9 @@ get_reply: proc, ext->rc_feedback_arg); mtx_lock(&cs->cs_lock); } - if (cu->cu_closing) { - cu->cu_error.re_errno = ESHUTDOWN; - cu->cu_error.re_status = RPC_CANTRECV; + if (cu->cu_closing || cu->cu_closed) { + errp->re_errno = ESHUTDOWN; + errp->re_status = stat = RPC_CANTRECV; goto out; } retrans++; @@ -566,47 +679,72 @@ got_reply: xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); ok = xdr_replymsg(&xdrs, &reply_msg); - XDR_DESTROY(&xdrs); cr->cr_mrep = NULL; - mtx_lock(&cs->cs_lock); - if (ok) { if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && - (reply_msg.acpted_rply.ar_stat == SUCCESS)) - cu->cu_error.re_status = RPC_SUCCESS; + (reply_msg.acpted_rply.ar_stat == SUCCESS)) + errp->re_status = stat = RPC_SUCCESS; else - _seterr_reply(&reply_msg, &(cu->cu_error)); - - if (cu->cu_error.re_status == RPC_SUCCESS) { - if (! AUTH_VALIDATE(cl->cl_auth, - &reply_msg.acpted_rply.ar_verf)) { - cu->cu_error.re_status = RPC_AUTHERROR; - cu->cu_error.re_why = AUTH_INVALIDRESP; - } - if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) { - xdrs.x_op = XDR_FREE; - (void) xdr_opaque_auth(&xdrs, - &(reply_msg.acpted_rply.ar_verf)); + stat = _seterr_reply(&reply_msg, &(cu->cu_error)); + + if (errp->re_status == RPC_SUCCESS) { + results = xdrmbuf_getall(&xdrs); + if (! AUTH_VALIDATE(auth, xid, + &reply_msg.acpted_rply.ar_verf, + &results)) { + errp->re_status = stat = RPC_AUTHERROR; + errp->re_why = AUTH_INVALIDRESP; + if (retrans && + auth->ah_cred.oa_flavor == RPCSEC_GSS) { + /* + * If we retransmitted, its + * possible that we will + * receive a reply for one of + * the earlier transmissions + * (which will use an older + * RPCSEC_GSS sequence + * number). In this case, just + * go back and listen for a + * new reply. We could keep a + * record of all the seq + * numbers we have transmitted + * so far so that we could + * accept a reply for any of + * them here. + */ + XDR_DESTROY(&xdrs); + mtx_lock(&cs->cs_lock); + TAILQ_INSERT_TAIL(&cs->cs_pending, + cr, cr_link); + cr->cr_mrep = NULL; + goto get_reply; + } + } else { + *resultsp = results; } } /* end successful completion */ /* * If unsuccesful AND error is an authentication error * then refresh credentials and try again, else break */ - else if (cu->cu_error.re_status == RPC_AUTHERROR) + else if (stat == RPC_AUTHERROR) /* maybe our credentials need to be refreshed ... */ if (nrefreshes > 0 && - AUTH_REFRESH(cl->cl_auth, &reply_msg)) { + AUTH_REFRESH(auth, &reply_msg)) { nrefreshes--; + XDR_DESTROY(&xdrs); + mtx_lock(&cs->cs_lock); goto call_again; } /* end of unsuccessful completion */ } /* end of valid reply message */ else { - cu->cu_error.re_status = RPC_CANTDECODERES; + errp->re_status = stat = RPC_CANTDECODERES; } + XDR_DESTROY(&xdrs); + mtx_lock(&cs->cs_lock); out: mtx_assert(&cs->cs_lock, MA_OWNED); @@ -621,9 +759,12 @@ out: mtx_unlock(&cs->cs_lock); + if (auth && stat != RPC_SUCCESS) + AUTH_VALIDATE(auth, xid, NULL, NULL); + free(cr, M_RPC); - return (cu->cu_error.re_status); + return (stat); } static void @@ -759,7 +900,7 @@ clnt_dg_control(CLIENT *cl, u_int request, void *info) cu->cu_connect = *(int *)info; break; case CLSET_WAITCHAN: - cu->cu_waitchan = *(const char **)info; + cu->cu_waitchan = (const char *)info; break; case CLGET_WAITCHAN: *(const char **) info = cu->cu_waitchan; @@ -785,16 +926,27 @@ clnt_dg_control(CLIENT *cl, u_int request, void *info) } static void -clnt_dg_destroy(CLIENT *cl) +clnt_dg_close(CLIENT *cl) { struct cu_data *cu = (struct cu_data *)cl->cl_private; struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; struct cu_request *cr; - struct socket *so = NULL; - bool_t lastsocketref; mtx_lock(&cs->cs_lock); + if (cu->cu_closed) { + mtx_unlock(&cs->cs_lock); + return; + } + + if (cu->cu_closing) { + while (cu->cu_closing) + msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); + KASSERT(cu->cu_closed, ("client should be closed")); + mtx_unlock(&cs->cs_lock); + return; + } + /* * Abort any pending requests and wait until everyone * has finished with clnt_vc_call. @@ -811,6 +963,25 @@ clnt_dg_destroy(CLIENT *cl) while (cu->cu_threads) msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); + cu->cu_closing = FALSE; + cu->cu_closed = TRUE; + + mtx_unlock(&cs->cs_lock); + wakeup(cu); +} + +static void +clnt_dg_destroy(CLIENT *cl) +{ + struct cu_data *cu = (struct cu_data *)cl->cl_private; + struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; + struct socket *so = NULL; + bool_t lastsocketref; + + clnt_dg_close(cl); + + mtx_lock(&cs->cs_lock); + cs->cs_refs--; if (cs->cs_refs == 0) { mtx_destroy(&cs->cs_lock); @@ -894,7 +1065,8 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) /* * The XID is in the first uint32_t of the reply. */ - m = m_pullup(m, sizeof(xid)); + if (m->m_len < sizeof(xid)) + m = m_pullup(m, sizeof(xid)); if (!m) /* * Should never happen. |