summaryrefslogtreecommitdiffstats
path: root/sys/rpc/clnt_dg.c
diff options
context:
space:
mode:
Diffstat (limited to 'sys/rpc/clnt_dg.c')
-rw-r--r--sys/rpc/clnt_dg.c218
1 files changed, 146 insertions, 72 deletions
diff --git a/sys/rpc/clnt_dg.c b/sys/rpc/clnt_dg.c
index c66ac50..f14e1d6 100644
--- a/sys/rpc/clnt_dg.c
+++ b/sys/rpc/clnt_dg.c
@@ -45,6 +45,7 @@ __FBSDID("$FreeBSD$");
#include <sys/param.h>
#include <sys/systm.h>
+#include <sys/kernel.h>
#include <sys/lock.h>
#include <sys/malloc.h>
#include <sys/mbuf.h>
@@ -70,8 +71,8 @@ __FBSDID("$FreeBSD$");
#endif
static bool_t time_not_ok(struct timeval *);
-static enum clnt_stat clnt_dg_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
- xdrproc_t, void *, struct timeval);
+static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
+ rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
static void clnt_dg_abort(CLIENT *);
@@ -91,10 +92,13 @@ static struct clnt_ops clnt_dg_ops = {
static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
/*
- * A pending RPC request which awaits a reply.
+ * A pending RPC request which awaits a reply. Requests which have
+ * received their reply will have cr_xid set to zero and cr_mrep to
+ * the mbuf chain of the reply.
*/
struct cu_request {
TAILQ_ENTRY(cu_request) cr_link;
+ CLIENT *cr_client; /* owner */
uint32_t cr_xid; /* XID of request */
struct mbuf *cr_mrep; /* reply received by upcall */
int cr_error; /* any error from upcall */
@@ -123,6 +127,8 @@ struct cu_socket {
* Private data kept per client handle
*/
struct cu_data {
+ int cu_threads; /* # threads in clnt_vc_call */
+ bool_t cu_closing; /* TRUE if we are destroying */
struct socket *cu_socket; /* connection socket */
bool_t cu_closeit; /* opened by library */
struct sockaddr_storage cu_raddr; /* remote address */
@@ -203,10 +209,12 @@ clnt_dg_create(
sendsz = ((sendsz + 3) / 4) * 4;
recvsz = ((recvsz + 3) / 4) * 4;
cu = mem_alloc(sizeof (*cu));
+ cu->cu_threads = 0;
+ cu->cu_closing = FALSE;
(void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
cu->cu_rlen = svcaddr->sa_len;
/* Other values can also be set through clnt_control() */
- cu->cu_wait.tv_sec = 15; /* heuristically chosen */
+ cu->cu_wait.tv_sec = 3; /* heuristically chosen */
cu->cu_wait.tv_usec = 0;
cu->cu_total.tv_sec = -1;
cu->cu_total.tv_usec = -1;
@@ -237,6 +245,7 @@ clnt_dg_create(
*/
cu->cu_closeit = FALSE;
cu->cu_socket = so;
+ soreserve(so, 256*1024, 256*1024);
SOCKBUF_LOCK(&so->so_rcv);
recheck_socket:
@@ -274,6 +283,7 @@ recheck_socket:
}
SOCKBUF_UNLOCK(&so->so_rcv);
+ cl->cl_refs = 1;
cl->cl_ops = &clnt_dg_ops;
cl->cl_private = (caddr_t)(void *)cu;
cl->cl_auth = authnone_create();
@@ -291,7 +301,8 @@ err2:
static enum clnt_stat
clnt_dg_call(
- CLIENT *cl, /* client handle */
+ CLIENT *cl, /* client handle */
+ struct rpc_callextra *ext, /* call metadata */
rpcproc_t proc, /* procedure number */
xdrproc_t xargs, /* xdr routine for args */
void *argsp, /* pointer to args */
@@ -301,30 +312,52 @@ clnt_dg_call(
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ AUTH *auth;
XDR xdrs;
struct rpc_msg reply_msg;
bool_t ok;
+ int retrans; /* number of re-transmits so far */
int nrefreshes = 2; /* number of times to refresh cred */
- struct timeval timeout;
- struct timeval retransmit_time;
- struct timeval next_sendtime, starttime, time_waited, tv;
+ struct timeval *tvp;
+ int timeout;
+ int retransmit_time;
+ int next_sendtime, starttime, time_waited, tv;
struct sockaddr *sa;
socklen_t salen;
uint32_t xid;
struct mbuf *mreq = NULL;
- struct cu_request cr;
+ struct cu_request *cr;
int error;
+ cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
+
mtx_lock(&cs->cs_lock);
- cr.cr_mrep = NULL;
- cr.cr_error = 0;
+ if (cu->cu_closing) {
+ mtx_unlock(&cs->cs_lock);
+ free(cr, M_RPC);
+ return (RPC_CANTSEND);
+ }
+ cu->cu_threads++;
+
+ if (ext)
+ auth = ext->rc_auth;
+ else
+ auth = cl->cl_auth;
+
+ cr->cr_client = cl;
+ cr->cr_mrep = NULL;
+ cr->cr_error = 0;
if (cu->cu_total.tv_usec == -1) {
- timeout = utimeout; /* use supplied timeout */
+ tvp = &utimeout; /* use supplied timeout */
} else {
- timeout = cu->cu_total; /* use default timeout */
+ tvp = &cu->cu_total; /* use default timeout */
}
+ if (tvp->tv_sec || tvp->tv_usec)
+ timeout = tvtohz(tvp);
+ else
+ timeout = 0;
if (cu->cu_connect && !cu->cu_connected) {
mtx_unlock(&cs->cs_lock);
@@ -345,11 +378,11 @@ clnt_dg_call(
sa = (struct sockaddr *)&cu->cu_raddr;
salen = cu->cu_rlen;
}
- time_waited.tv_sec = 0;
- time_waited.tv_usec = 0;
- retransmit_time = next_sendtime = cu->cu_wait;
+ time_waited = 0;
+ retrans = 0;
+ retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
- getmicrotime(&starttime);
+ starttime = ticks;
call_again:
mtx_assert(&cs->cs_lock, MA_OWNED);
@@ -376,7 +409,7 @@ send_again:
goto get_reply;
if ((! XDR_PUTINT32(&xdrs, &proc)) ||
- (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) ||
+ (! AUTH_MARSHALL(auth, &xdrs)) ||
(! (*xargs)(&xdrs, argsp))) {
cu->cu_error.re_status = RPC_CANTENCODEARGS;
mtx_lock(&cs->cs_lock);
@@ -384,9 +417,9 @@ send_again:
}
m_fixhdr(mreq);
- cr.cr_xid = xid;
+ cr->cr_xid = xid;
mtx_lock(&cs->cs_lock);
- TAILQ_INSERT_TAIL(&cs->cs_pending, &cr, cr_link);
+ TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
mtx_unlock(&cs->cs_lock);
/*
@@ -406,8 +439,7 @@ send_again:
mtx_lock(&cs->cs_lock);
if (error) {
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
-
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
cu->cu_error.re_errno = error;
cu->cu_error.re_status = RPC_CANTSEND;
goto out;
@@ -415,24 +447,24 @@ send_again:
/*
* Check to see if we got an upcall while waiting for the
- * lock. In both these cases, the request has been removed
- * from cs->cs_pending.
+ * lock.
*/
- if (cr.cr_error) {
- cu->cu_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
+ cu->cu_error.re_errno = cr->cr_error;
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
- if (cr.cr_mrep) {
+ if (cr->cr_mrep) {
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
goto got_reply;
}
/*
* Hack to provide rpc-based message passing
*/
- if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
+ if (timeout == 0) {
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
cu->cu_error.re_status = RPC_TIMEDOUT;
goto out;
}
@@ -440,17 +472,23 @@ send_again:
get_reply:
for (;;) {
/* Decide how long to wait. */
- if (timevalcmp(&next_sendtime, &timeout, <)) {
+ if (next_sendtime < timeout)
tv = next_sendtime;
- } else {
+ else
tv = timeout;
+ tv -= time_waited;
+
+ if (tv > 0) {
+ if (cu->cu_closing)
+ error = 0;
+ else
+ error = msleep(cr, &cs->cs_lock,
+ cu->cu_waitflag, cu->cu_waitchan, tv);
+ } else {
+ error = EWOULDBLOCK;
}
- timevalsub(&tv, &time_waited);
- if (tv.tv_sec < 0 || tv.tv_usec < 0)
- tv.tv_sec = tv.tv_usec = 0;
- error = msleep(&cr, &cs->cs_lock, cu->cu_waitflag,
- cu->cu_waitchan, tvtohz(&tv));
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
if (!error) {
/*
@@ -458,8 +496,8 @@ get_reply:
* upcall had a receive error, report that,
* otherwise we have a reply.
*/
- if (cr.cr_error) {
- cu->cu_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ cu->cu_error.re_errno = cr->cr_error;
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
@@ -472,8 +510,6 @@ get_reply:
* re-send the request.
*/
if (error != EWOULDBLOCK) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
cu->cu_error.re_errno = error;
if (error == EINTR)
cu->cu_error.re_status = RPC_INTR;
@@ -482,29 +518,40 @@ get_reply:
goto out;
}
- getmicrotime(&tv);
- time_waited = tv;
- timevalsub(&time_waited, &starttime);
+ time_waited = ticks - starttime;
/* Check for timeout. */
- if (timevalcmp(&time_waited, &timeout, >)) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
+ if (time_waited > timeout) {
cu->cu_error.re_errno = EWOULDBLOCK;
cu->cu_error.re_status = RPC_TIMEDOUT;
goto out;
}
/* Retransmit if necessary. */
- if (timevalcmp(&time_waited, &next_sendtime, >)) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
+ if (time_waited >= next_sendtime) {
+ if (ext && ext->rc_feedback) {
+ mtx_unlock(&cs->cs_lock);
+ if (retrans == 0)
+ ext->rc_feedback(FEEDBACK_REXMIT1,
+ proc, ext->rc_feedback_arg);
+ else
+ ext->rc_feedback(FEEDBACK_REXMIT2,
+ proc, ext->rc_feedback_arg);
+ mtx_lock(&cs->cs_lock);
+ }
+ if (cu->cu_closing) {
+ cu->cu_error.re_errno = ESHUTDOWN;
+ cu->cu_error.re_status = RPC_CANTRECV;
+ goto out;
+ }
+ retrans++;
/* update retransmit_time */
- if (retransmit_time.tv_sec < RPC_MAX_BACKOFF)
- timevaladd(&retransmit_time, &retransmit_time);
- timevaladd(&next_sendtime, &retransmit_time);
+ if (retransmit_time < RPC_MAX_BACKOFF * hz)
+ retransmit_time = 2 * retransmit_time;
+ next_sendtime += retransmit_time;
goto send_again;
}
+ TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
}
got_reply:
@@ -514,10 +561,13 @@ got_reply:
*/
mtx_unlock(&cs->cs_lock);
- xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE);
+ if (ext && ext->rc_feedback)
+ ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
+
+ xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
ok = xdr_replymsg(&xdrs, &reply_msg);
XDR_DESTROY(&xdrs);
- cr.cr_mrep = NULL;
+ cr->cr_mrep = NULL;
mtx_lock(&cs->cs_lock);
@@ -562,10 +612,17 @@ out:
if (mreq)
m_freem(mreq);
- if (cr.cr_mrep)
- m_freem(cr.cr_mrep);
+ if (cr->cr_mrep)
+ m_freem(cr->cr_mrep);
+ cu->cu_threads--;
+ if (cu->cu_closing)
+ wakeup(cu);
+
mtx_unlock(&cs->cs_lock);
+
+ free(cr, M_RPC);
+
return (cu->cu_error.re_status);
}
@@ -732,30 +789,44 @@ clnt_dg_destroy(CLIENT *cl)
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ struct cu_request *cr;
struct socket *so = NULL;
bool_t lastsocketref;
- SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
-
mtx_lock(&cs->cs_lock);
+
+ /*
+ * Abort any pending requests and wait until everyone
+ * has finished with clnt_vc_call.
+ */
+ cu->cu_closing = TRUE;
+ TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
+ if (cr->cr_client == cl) {
+ cr->cr_xid = 0;
+ cr->cr_error = ESHUTDOWN;
+ wakeup(cr);
+ }
+ }
+
+ while (cu->cu_threads)
+ msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
+
cs->cs_refs--;
if (cs->cs_refs == 0) {
+ mtx_destroy(&cs->cs_lock);
+ SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
cu->cu_socket->so_upcallarg = NULL;
cu->cu_socket->so_upcall = NULL;
cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL;
- mtx_destroy(&cs->cs_lock);
SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
mem_free(cs, sizeof(*cs));
lastsocketref = TRUE;
} else {
mtx_unlock(&cs->cs_lock);
- SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
lastsocketref = FALSE;
}
- if (cu->cu_closeit) {
- KASSERT(lastsocketref, ("clnt_dg_destroy(): closing a socket "
- "shared with other clients"));
+ if (cu->cu_closeit && lastsocketref) {
so = cu->cu_socket;
cu->cu_socket = NULL;
}
@@ -812,10 +883,10 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
if (error) {
mtx_lock(&cs->cs_lock);
TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
+ cr->cr_xid = 0;
cr->cr_error = error;
wakeup(cr);
}
- TAILQ_INIT(&cs->cs_pending);
mtx_unlock(&cs->cs_lock);
break;
}
@@ -825,7 +896,11 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
*/
m = m_pullup(m, sizeof(xid));
if (!m)
- break;
+ /*
+ * Should never happen.
+ */
+ continue;
+
xid = ntohl(*mtod(m, uint32_t *));
/*
@@ -836,14 +911,13 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
if (cr->cr_xid == xid) {
/*
- * This one matches. We snip it out of
- * the pending list and leave the
+ * This one matches. We leave the
* reply mbuf in cr->cr_mrep. Set the
- * XID to zero so that clnt_dg_call
- * can know not to repeat the
- * TAILQ_REMOVE.
+ * XID to zero so that we will ignore
+ * any duplicated replies that arrive
+ * before clnt_dg_call removes it from
+ * the queue.
*/
- TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
cr->cr_xid = 0;
cr->cr_mrep = m;
cr->cr_error = 0;
OpenPOWER on IntegriCloud