summaryrefslogtreecommitdiffstats
path: root/sys/rpc/clnt_dg.c
diff options
context:
space:
mode:
Diffstat (limited to 'sys/rpc/clnt_dg.c')
-rw-r--r--sys/rpc/clnt_dg.c310
1 files changed, 241 insertions, 69 deletions
diff --git a/sys/rpc/clnt_dg.c b/sys/rpc/clnt_dg.c
index f14e1d6..e6d101d 100644
--- a/sys/rpc/clnt_dg.c
+++ b/sys/rpc/clnt_dg.c
@@ -72,11 +72,12 @@ __FBSDID("$FreeBSD$");
static bool_t time_not_ok(struct timeval *);
static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
- rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
+ rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
static void clnt_dg_abort(CLIENT *);
static bool_t clnt_dg_control(CLIENT *, u_int, void *);
+static void clnt_dg_close(CLIENT *);
static void clnt_dg_destroy(CLIENT *);
static void clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
@@ -85,6 +86,7 @@ static struct clnt_ops clnt_dg_ops = {
.cl_abort = clnt_dg_abort,
.cl_geterr = clnt_dg_geterr,
.cl_freeres = clnt_dg_freeres,
+ .cl_close = clnt_dg_close,
.cl_destroy = clnt_dg_destroy,
.cl_control = clnt_dg_control
};
@@ -102,6 +104,7 @@ struct cu_request {
uint32_t cr_xid; /* XID of request */
struct mbuf *cr_mrep; /* reply received by upcall */
int cr_error; /* any error from upcall */
+ char cr_verf[MAX_AUTH_BYTES]; /* reply verf */
};
TAILQ_HEAD(cu_request_list, cu_request);
@@ -120,7 +123,6 @@ struct cu_socket {
struct mtx cs_lock;
int cs_refs; /* Count of clients */
struct cu_request_list cs_pending; /* Requests awaiting replies */
-
};
/*
@@ -128,7 +130,8 @@ struct cu_socket {
*/
struct cu_data {
int cu_threads; /* # threads in clnt_vc_call */
- bool_t cu_closing; /* TRUE if we are destroying */
+ bool_t cu_closing; /* TRUE if we are closing */
+ bool_t cu_closed; /* TRUE if we are closed */
struct socket *cu_socket; /* connection socket */
bool_t cu_closeit; /* opened by library */
struct sockaddr_storage cu_raddr; /* remote address */
@@ -146,8 +149,14 @@ struct cu_data {
int cu_connected; /* Have done connect(). */
const char *cu_waitchan;
int cu_waitflag;
+ int cu_cwnd; /* congestion window */
+ int cu_sent; /* number of in-flight RPCs */
+ bool_t cu_cwnd_wait;
};
+#define CWNDSCALE 256
+#define MAXCWND (32 * CWNDSCALE)
+
/*
* Connection less client creation returns with client handle parameters.
* Default options are set, which the user can change using clnt_control().
@@ -211,6 +220,7 @@ clnt_dg_create(
cu = mem_alloc(sizeof (*cu));
cu->cu_threads = 0;
cu->cu_closing = FALSE;
+ cu->cu_closed = FALSE;
(void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
cu->cu_rlen = svcaddr->sa_len;
/* Other values can also be set through clnt_control() */
@@ -225,6 +235,9 @@ clnt_dg_create(
cu->cu_connected = FALSE;
cu->cu_waitchan = "rpcrecv";
cu->cu_waitflag = 0;
+ cu->cu_cwnd = MAXCWND / 2;
+ cu->cu_sent = 0;
+ cu->cu_cwnd_wait = FALSE;
(void) getmicrotime(&now);
cu->cu_xid = __RPC_GETXID(&now);
call_msg.rm_xid = cu->cu_xid;
@@ -304,15 +317,16 @@ clnt_dg_call(
CLIENT *cl, /* client handle */
struct rpc_callextra *ext, /* call metadata */
rpcproc_t proc, /* procedure number */
- xdrproc_t xargs, /* xdr routine for args */
- void *argsp, /* pointer to args */
- xdrproc_t xresults, /* xdr routine for results */
- void *resultsp, /* pointer to results */
+ struct mbuf *args, /* pointer to args */
+ struct mbuf **resultsp, /* pointer to results */
struct timeval utimeout) /* seconds to wait before giving up */
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ struct rpc_timers *rt;
AUTH *auth;
+ struct rpc_err *errp;
+ enum clnt_stat stat;
XDR xdrs;
struct rpc_msg reply_msg;
bool_t ok;
@@ -321,11 +335,11 @@ clnt_dg_call(
struct timeval *tvp;
int timeout;
int retransmit_time;
- int next_sendtime, starttime, time_waited, tv;
+ int next_sendtime, starttime, rtt, time_waited, tv = 0;
struct sockaddr *sa;
socklen_t salen;
- uint32_t xid;
- struct mbuf *mreq = NULL;
+ uint32_t xid = 0;
+ struct mbuf *mreq = NULL, *results;
struct cu_request *cr;
int error;
@@ -333,17 +347,20 @@ clnt_dg_call(
mtx_lock(&cs->cs_lock);
- if (cu->cu_closing) {
+ if (cu->cu_closing || cu->cu_closed) {
mtx_unlock(&cs->cs_lock);
free(cr, M_RPC);
return (RPC_CANTSEND);
}
cu->cu_threads++;
- if (ext)
+ if (ext) {
auth = ext->rc_auth;
- else
+ errp = &ext->rc_err;
+ } else {
auth = cl->cl_auth;
+ errp = &cu->cu_error;
+ }
cr->cr_client = cl;
cr->cr_mrep = NULL;
@@ -365,8 +382,8 @@ clnt_dg_call(
(struct sockaddr *)&cu->cu_raddr, curthread);
mtx_lock(&cs->cs_lock);
if (error) {
- cu->cu_error.re_errno = error;
- cu->cu_error.re_status = RPC_CANTSEND;
+ errp->re_errno = error;
+ errp->re_status = stat = RPC_CANTSEND;
goto out;
}
cu->cu_connected = 1;
@@ -380,7 +397,15 @@ clnt_dg_call(
}
time_waited = 0;
retrans = 0;
- retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
+ if (ext && ext->rc_timers) {
+ rt = ext->rc_timers;
+ if (!rt->rt_rtxcur)
+ rt->rt_rtxcur = tvtohz(&cu->cu_wait);
+ retransmit_time = next_sendtime = rt->rt_rtxcur;
+ } else {
+ rt = NULL;
+ retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
+ }
starttime = ticks;
@@ -394,9 +419,9 @@ send_again:
mtx_unlock(&cs->cs_lock);
MGETHDR(mreq, M_WAIT, MT_DATA);
- MCLGET(mreq, M_WAIT);
- mreq->m_len = 0;
- m_append(mreq, cu->cu_mcalllen, cu->cu_mcallc);
+ KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big"));
+ bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen);
+ mreq->m_len = cu->cu_mcalllen;
/*
* The XID is the first thing in the request.
@@ -405,20 +430,36 @@ send_again:
xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
- if (cu->cu_async == TRUE && xargs == NULL)
+ if (cu->cu_async == TRUE && args == NULL)
goto get_reply;
if ((! XDR_PUTINT32(&xdrs, &proc)) ||
- (! AUTH_MARSHALL(auth, &xdrs)) ||
- (! (*xargs)(&xdrs, argsp))) {
- cu->cu_error.re_status = RPC_CANTENCODEARGS;
+ (! AUTH_MARSHALL(auth, xid, &xdrs,
+ m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
+ errp->re_status = stat = RPC_CANTENCODEARGS;
mtx_lock(&cs->cs_lock);
goto out;
}
- m_fixhdr(mreq);
+ mreq->m_pkthdr.len = m_length(mreq, NULL);
cr->cr_xid = xid;
mtx_lock(&cs->cs_lock);
+
+ /*
+ * Try to get a place in the congestion window.
+ */
+ while (cu->cu_sent >= cu->cu_cwnd) {
+ cu->cu_cwnd_wait = TRUE;
+ error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock,
+ cu->cu_waitflag, "rpccwnd", 0);
+ if (error) {
+ errp->re_errno = error;
+ errp->re_status = stat = RPC_CANTSEND;
+ goto out;
+ }
+ }
+ cu->cu_sent += CWNDSCALE;
+
TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
mtx_unlock(&cs->cs_lock);
@@ -433,15 +474,22 @@ send_again:
* some clock time to spare while the packets are in flight.
* (We assume that this is actually only executed once.)
*/
- reply_msg.acpted_rply.ar_verf = _null_auth;
- reply_msg.acpted_rply.ar_results.where = resultsp;
- reply_msg.acpted_rply.ar_results.proc = xresults;
+ reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
+ reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
+ reply_msg.acpted_rply.ar_verf.oa_length = 0;
+ reply_msg.acpted_rply.ar_results.where = NULL;
+ reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
mtx_lock(&cs->cs_lock);
if (error) {
TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
- cu->cu_error.re_errno = error;
- cu->cu_error.re_status = RPC_CANTSEND;
+ errp->re_errno = error;
+ errp->re_status = stat = RPC_CANTSEND;
+ cu->cu_sent -= CWNDSCALE;
+ if (cu->cu_cwnd_wait) {
+ cu->cu_cwnd_wait = FALSE;
+ wakeup(&cu->cu_cwnd_wait);
+ }
goto out;
}
@@ -451,12 +499,22 @@ send_again:
*/
if (cr->cr_error) {
TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
- cu->cu_error.re_errno = cr->cr_error;
- cu->cu_error.re_status = RPC_CANTRECV;
+ errp->re_errno = cr->cr_error;
+ errp->re_status = stat = RPC_CANTRECV;
+ cu->cu_sent -= CWNDSCALE;
+ if (cu->cu_cwnd_wait) {
+ cu->cu_cwnd_wait = FALSE;
+ wakeup(&cu->cu_cwnd_wait);
+ }
goto out;
}
if (cr->cr_mrep) {
TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
+ cu->cu_sent -= CWNDSCALE;
+ if (cu->cu_cwnd_wait) {
+ cu->cu_cwnd_wait = FALSE;
+ wakeup(&cu->cu_cwnd_wait);
+ }
goto got_reply;
}
@@ -465,7 +523,12 @@ send_again:
*/
if (timeout == 0) {
TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
- cu->cu_error.re_status = RPC_TIMEDOUT;
+ errp->re_status = stat = RPC_TIMEDOUT;
+ cu->cu_sent -= CWNDSCALE;
+ if (cu->cu_cwnd_wait) {
+ cu->cu_cwnd_wait = FALSE;
+ wakeup(&cu->cu_cwnd_wait);
+ }
goto out;
}
@@ -479,7 +542,7 @@ get_reply:
tv -= time_waited;
if (tv > 0) {
- if (cu->cu_closing)
+ if (cu->cu_closing || cu->cu_closed)
error = 0;
else
error = msleep(cr, &cs->cs_lock,
@@ -489,6 +552,11 @@ get_reply:
}
TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
+ cu->cu_sent -= CWNDSCALE;
+ if (cu->cu_cwnd_wait) {
+ cu->cu_cwnd_wait = FALSE;
+ wakeup(&cu->cu_cwnd_wait);
+ }
if (!error) {
/*
@@ -497,10 +565,52 @@ get_reply:
* otherwise we have a reply.
*/
if (cr->cr_error) {
- cu->cu_error.re_errno = cr->cr_error;
- cu->cu_error.re_status = RPC_CANTRECV;
+ errp->re_errno = cr->cr_error;
+ errp->re_status = stat = RPC_CANTRECV;
goto out;
}
+
+ cu->cu_cwnd += (CWNDSCALE * CWNDSCALE
+ + cu->cu_cwnd / 2) / cu->cu_cwnd;
+ if (cu->cu_cwnd > MAXCWND)
+ cu->cu_cwnd = MAXCWND;
+
+ if (rt) {
+ /*
+ * Add one to the time since a tick
+ * count of N means that the actual
+ * time taken was somewhere between N
+ * and N+1.
+ */
+ rtt = ticks - starttime + 1;
+
+ /*
+ * Update our estimate of the round
+ * trip time using roughly the
+ * algorithm described in RFC
+ * 2988. Given an RTT sample R:
+ *
+ * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R|
+ * SRTT = (1-alpha) * SRTT + alpha * R
+ *
+ * where alpha = 0.125 and beta = 0.25.
+ *
+ * The initial retransmit timeout is
+ * SRTT + 4*RTTVAR and doubles on each
+ * retransmision.
+ */
+ if (rt->rt_srtt == 0) {
+ rt->rt_srtt = rtt;
+ rt->rt_deviate = rtt / 2;
+ } else {
+ int32_t error = rtt - rt->rt_srtt;
+ rt->rt_srtt += error / 8;
+ error = abs(error) - rt->rt_deviate;
+ rt->rt_deviate += error / 4;
+ }
+ rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate;
+ }
+
break;
}
@@ -510,11 +620,11 @@ get_reply:
* re-send the request.
*/
if (error != EWOULDBLOCK) {
- cu->cu_error.re_errno = error;
+ errp->re_errno = error;
if (error == EINTR)
- cu->cu_error.re_status = RPC_INTR;
+ errp->re_status = stat = RPC_INTR;
else
- cu->cu_error.re_status = RPC_CANTRECV;
+ errp->re_status = stat = RPC_CANTRECV;
goto out;
}
@@ -522,13 +632,16 @@ get_reply:
/* Check for timeout. */
if (time_waited > timeout) {
- cu->cu_error.re_errno = EWOULDBLOCK;
- cu->cu_error.re_status = RPC_TIMEDOUT;
+ errp->re_errno = EWOULDBLOCK;
+ errp->re_status = stat = RPC_TIMEDOUT;
goto out;
}
/* Retransmit if necessary. */
if (time_waited >= next_sendtime) {
+ cu->cu_cwnd /= 2;
+ if (cu->cu_cwnd < CWNDSCALE)
+ cu->cu_cwnd = CWNDSCALE;
if (ext && ext->rc_feedback) {
mtx_unlock(&cs->cs_lock);
if (retrans == 0)
@@ -539,9 +652,9 @@ get_reply:
proc, ext->rc_feedback_arg);
mtx_lock(&cs->cs_lock);
}
- if (cu->cu_closing) {
- cu->cu_error.re_errno = ESHUTDOWN;
- cu->cu_error.re_status = RPC_CANTRECV;
+ if (cu->cu_closing || cu->cu_closed) {
+ errp->re_errno = ESHUTDOWN;
+ errp->re_status = stat = RPC_CANTRECV;
goto out;
}
retrans++;
@@ -566,47 +679,72 @@ got_reply:
xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
ok = xdr_replymsg(&xdrs, &reply_msg);
- XDR_DESTROY(&xdrs);
cr->cr_mrep = NULL;
- mtx_lock(&cs->cs_lock);
-
if (ok) {
if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
- (reply_msg.acpted_rply.ar_stat == SUCCESS))
- cu->cu_error.re_status = RPC_SUCCESS;
+ (reply_msg.acpted_rply.ar_stat == SUCCESS))
+ errp->re_status = stat = RPC_SUCCESS;
else
- _seterr_reply(&reply_msg, &(cu->cu_error));
-
- if (cu->cu_error.re_status == RPC_SUCCESS) {
- if (! AUTH_VALIDATE(cl->cl_auth,
- &reply_msg.acpted_rply.ar_verf)) {
- cu->cu_error.re_status = RPC_AUTHERROR;
- cu->cu_error.re_why = AUTH_INVALIDRESP;
- }
- if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
- xdrs.x_op = XDR_FREE;
- (void) xdr_opaque_auth(&xdrs,
- &(reply_msg.acpted_rply.ar_verf));
+ stat = _seterr_reply(&reply_msg, &(cu->cu_error));
+
+ if (errp->re_status == RPC_SUCCESS) {
+ results = xdrmbuf_getall(&xdrs);
+ if (! AUTH_VALIDATE(auth, xid,
+ &reply_msg.acpted_rply.ar_verf,
+ &results)) {
+ errp->re_status = stat = RPC_AUTHERROR;
+ errp->re_why = AUTH_INVALIDRESP;
+ if (retrans &&
+ auth->ah_cred.oa_flavor == RPCSEC_GSS) {
+ /*
+ * If we retransmitted, its
+ * possible that we will
+ * receive a reply for one of
+ * the earlier transmissions
+ * (which will use an older
+ * RPCSEC_GSS sequence
+ * number). In this case, just
+ * go back and listen for a
+ * new reply. We could keep a
+ * record of all the seq
+ * numbers we have transmitted
+ * so far so that we could
+ * accept a reply for any of
+ * them here.
+ */
+ XDR_DESTROY(&xdrs);
+ mtx_lock(&cs->cs_lock);
+ TAILQ_INSERT_TAIL(&cs->cs_pending,
+ cr, cr_link);
+ cr->cr_mrep = NULL;
+ goto get_reply;
+ }
+ } else {
+ *resultsp = results;
}
} /* end successful completion */
/*
* If unsuccesful AND error is an authentication error
* then refresh credentials and try again, else break
*/
- else if (cu->cu_error.re_status == RPC_AUTHERROR)
+ else if (stat == RPC_AUTHERROR)
/* maybe our credentials need to be refreshed ... */
if (nrefreshes > 0 &&
- AUTH_REFRESH(cl->cl_auth, &reply_msg)) {
+ AUTH_REFRESH(auth, &reply_msg)) {
nrefreshes--;
+ XDR_DESTROY(&xdrs);
+ mtx_lock(&cs->cs_lock);
goto call_again;
}
/* end of unsuccessful completion */
} /* end of valid reply message */
else {
- cu->cu_error.re_status = RPC_CANTDECODERES;
+ errp->re_status = stat = RPC_CANTDECODERES;
}
+ XDR_DESTROY(&xdrs);
+ mtx_lock(&cs->cs_lock);
out:
mtx_assert(&cs->cs_lock, MA_OWNED);
@@ -621,9 +759,12 @@ out:
mtx_unlock(&cs->cs_lock);
+ if (auth && stat != RPC_SUCCESS)
+ AUTH_VALIDATE(auth, xid, NULL, NULL);
+
free(cr, M_RPC);
- return (cu->cu_error.re_status);
+ return (stat);
}
static void
@@ -759,7 +900,7 @@ clnt_dg_control(CLIENT *cl, u_int request, void *info)
cu->cu_connect = *(int *)info;
break;
case CLSET_WAITCHAN:
- cu->cu_waitchan = *(const char **)info;
+ cu->cu_waitchan = (const char *)info;
break;
case CLGET_WAITCHAN:
*(const char **) info = cu->cu_waitchan;
@@ -785,16 +926,27 @@ clnt_dg_control(CLIENT *cl, u_int request, void *info)
}
static void
-clnt_dg_destroy(CLIENT *cl)
+clnt_dg_close(CLIENT *cl)
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
struct cu_request *cr;
- struct socket *so = NULL;
- bool_t lastsocketref;
mtx_lock(&cs->cs_lock);
+ if (cu->cu_closed) {
+ mtx_unlock(&cs->cs_lock);
+ return;
+ }
+
+ if (cu->cu_closing) {
+ while (cu->cu_closing)
+ msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
+ KASSERT(cu->cu_closed, ("client should be closed"));
+ mtx_unlock(&cs->cs_lock);
+ return;
+ }
+
/*
* Abort any pending requests and wait until everyone
* has finished with clnt_vc_call.
@@ -811,6 +963,25 @@ clnt_dg_destroy(CLIENT *cl)
while (cu->cu_threads)
msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
+ cu->cu_closing = FALSE;
+ cu->cu_closed = TRUE;
+
+ mtx_unlock(&cs->cs_lock);
+ wakeup(cu);
+}
+
+static void
+clnt_dg_destroy(CLIENT *cl)
+{
+ struct cu_data *cu = (struct cu_data *)cl->cl_private;
+ struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ struct socket *so = NULL;
+ bool_t lastsocketref;
+
+ clnt_dg_close(cl);
+
+ mtx_lock(&cs->cs_lock);
+
cs->cs_refs--;
if (cs->cs_refs == 0) {
mtx_destroy(&cs->cs_lock);
@@ -894,7 +1065,8 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
/*
* The XID is in the first uint32_t of the reply.
*/
- m = m_pullup(m, sizeof(xid));
+ if (m->m_len < sizeof(xid))
+ m = m_pullup(m, sizeof(xid));
if (!m)
/*
* Should never happen.
OpenPOWER on IntegriCloud