summaryrefslogtreecommitdiffstats
path: root/sys/rpc/clnt_vc.c
diff options
context:
space:
mode:
Diffstat (limited to 'sys/rpc/clnt_vc.c')
-rw-r--r--sys/rpc/clnt_vc.c138
1 files changed, 96 insertions, 42 deletions
diff --git a/sys/rpc/clnt_vc.c b/sys/rpc/clnt_vc.c
index 5731e1e..cb09352 100644
--- a/sys/rpc/clnt_vc.c
+++ b/sys/rpc/clnt_vc.c
@@ -80,8 +80,8 @@ struct cmessage {
struct cmsgcred cmcred;
};
-static enum clnt_stat clnt_vc_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
- xdrproc_t, void *, struct timeval);
+static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
+ rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
static void clnt_vc_abort(CLIENT *);
@@ -100,7 +100,9 @@ static struct clnt_ops clnt_vc_ops = {
};
/*
- * A pending RPC request which awaits a reply.
+ * A pending RPC request which awaits a reply. Requests which have
+ * received their reply will have cr_xid set to zero and cr_mrep to
+ * the mbuf chain of the reply.
*/
struct ct_request {
TAILQ_ENTRY(ct_request) cr_link;
@@ -113,6 +115,8 @@ TAILQ_HEAD(ct_request_list, ct_request);
struct ct_data {
struct mtx ct_lock;
+ int ct_threads; /* number of threads in clnt_vc_call */
+ bool_t ct_closing; /* TRUE if we are destroying client */
struct socket *ct_socket; /* connection socket */
bool_t ct_closeit; /* close it on destroy */
struct timeval ct_wait; /* wait interval in milliseconds */
@@ -161,7 +165,7 @@ clnt_vc_create(
static uint32_t disrupt;
struct __rpc_sockinfo si;
XDR xdrs;
- int error;
+ int error, interrupted;
if (disrupt == 0)
disrupt = (uint32_t)(long)raddr;
@@ -170,10 +174,31 @@ clnt_vc_create(
ct = (struct ct_data *)mem_alloc(sizeof (*ct));
mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
+ ct->ct_threads = 0;
+ ct->ct_closing = FALSE;
if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
error = soconnect(so, raddr, curthread);
+ SOCK_LOCK(so);
+ interrupted = 0;
+ while ((so->so_state & SS_ISCONNECTING)
+ && so->so_error == 0) {
+ error = msleep(&so->so_timeo, SOCK_MTX(so),
+ PSOCK | PCATCH, "connec", 0);
+ if (error) {
+ if (error == EINTR || error == ERESTART)
+ interrupted = 1;
+ break;
+ }
+ }
+ if (error == 0) {
+ error = so->so_error;
+ so->so_error = 0;
+ }
+ SOCK_UNLOCK(so);
if (error) {
+ if (!interrupted)
+ so->so_state &= ~SS_ISCONNECTING;
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_errno = error;
goto err;
@@ -224,6 +249,7 @@ clnt_vc_create(
* Create a client handle which uses xdrrec for serialization
* and authnone for authentication.
*/
+ cl->cl_refs = 1;
cl->cl_ops = &clnt_vc_ops;
cl->cl_private = ct;
cl->cl_auth = authnone_create();
@@ -255,6 +281,7 @@ err:
static enum clnt_stat
clnt_vc_call(
CLIENT *cl,
+ struct rpc_callextra *ext,
rpcproc_t proc,
xdrproc_t xdr_args,
void *args_ptr,
@@ -263,6 +290,7 @@ clnt_vc_call(
struct timeval utimeout)
{
struct ct_data *ct = (struct ct_data *) cl->cl_private;
+ AUTH *auth;
XDR xdrs;
struct rpc_msg reply_msg;
bool_t ok;
@@ -270,13 +298,27 @@ clnt_vc_call(
struct timeval timeout;
uint32_t xid;
struct mbuf *mreq = NULL;
- struct ct_request cr;
+ struct ct_request *cr;
int error;
+ cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
+
mtx_lock(&ct->ct_lock);
- cr.cr_mrep = NULL;
- cr.cr_error = 0;
+ if (ct->ct_closing) {
+ mtx_unlock(&ct->ct_lock);
+ free(cr, M_RPC);
+ return (RPC_CANTSEND);
+ }
+ ct->ct_threads++;
+
+ if (ext)
+ auth = ext->rc_auth;
+ else
+ auth = cl->cl_auth;
+
+ cr->cr_mrep = NULL;
+ cr->cr_error = 0;
if (ct->ct_wait.tv_usec == -1) {
timeout = utimeout; /* use supplied timeout */
@@ -311,12 +353,12 @@ call_again:
ct->ct_error.re_status = RPC_SUCCESS;
if ((! XDR_PUTINT32(&xdrs, &proc)) ||
- (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) ||
+ (! AUTH_MARSHALL(auth, &xdrs)) ||
(! (*xdr_args)(&xdrs, args_ptr))) {
if (ct->ct_error.re_status == RPC_SUCCESS)
ct->ct_error.re_status = RPC_CANTENCODEARGS;
- m_freem(mreq);
- return (ct->ct_error.re_status);
+ mtx_lock(&ct->ct_lock);
+ goto out;
}
m_fixhdr(mreq);
@@ -327,9 +369,9 @@ call_again:
*mtod(mreq, uint32_t *) =
htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
- cr.cr_xid = xid;
+ cr->cr_xid = xid;
mtx_lock(&ct->ct_lock);
- TAILQ_INSERT_TAIL(&ct->ct_pending, &cr, cr_link);
+ TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
mtx_unlock(&ct->ct_lock);
/*
@@ -343,10 +385,8 @@ call_again:
reply_msg.acpted_rply.ar_results.proc = xdr_results;
mtx_lock(&ct->ct_lock);
-
if (error) {
- TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
-
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
ct->ct_error.re_errno = error;
ct->ct_error.re_status = RPC_CANTSEND;
goto out;
@@ -357,12 +397,14 @@ call_again:
* lock. In both these cases, the request has been removed
* from ct->ct_pending.
*/
- if (cr.cr_error) {
- ct->ct_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
+ ct->ct_error.re_errno = cr->cr_error;
ct->ct_error.re_status = RPC_CANTRECV;
goto out;
}
- if (cr.cr_mrep) {
+ if (cr->cr_mrep) {
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
goto got_reply;
}
@@ -370,23 +412,22 @@ call_again:
* Hack to provide rpc-based message passing
*/
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
ct->ct_error.re_status = RPC_TIMEDOUT;
goto out;
}
- error = msleep(&cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
+ error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
tvtohz(&timeout));
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
+
if (error) {
/*
* The sleep returned an error so our request is still
* on the list. Turn the error code into an
* appropriate client status.
*/
- if (cr.cr_xid)
- TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
ct->ct_error.re_errno = error;
switch (error) {
case EINTR:
@@ -405,8 +446,8 @@ call_again:
* upcall had a receive error, report that,
* otherwise we have a reply.
*/
- if (cr.cr_error) {
- ct->ct_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ ct->ct_error.re_errno = cr->cr_error;
ct->ct_error.re_status = RPC_CANTRECV;
goto out;
}
@@ -419,10 +460,10 @@ got_reply:
*/
mtx_unlock(&ct->ct_lock);
- xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE);
+ xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
ok = xdr_replymsg(&xdrs, &reply_msg);
XDR_DESTROY(&xdrs);
- cr.cr_mrep = NULL;
+ cr->cr_mrep = NULL;
mtx_lock(&ct->ct_lock);
@@ -466,10 +507,17 @@ out:
if (mreq)
m_freem(mreq);
- if (cr.cr_mrep)
- m_freem(cr.cr_mrep);
+ if (cr->cr_mrep)
+ m_freem(cr->cr_mrep);
+ ct->ct_threads--;
+ if (ct->ct_closing)
+ wakeup(ct);
+
mtx_unlock(&ct->ct_lock);
+
+ free(cr, M_RPC);
+
return (ct->ct_error.re_status);
}
@@ -628,6 +676,7 @@ static void
clnt_vc_destroy(CLIENT *cl)
{
struct ct_data *ct = (struct ct_data *) cl->cl_private;
+ struct ct_request *cr;
struct socket *so = NULL;
mtx_lock(&ct->ct_lock);
@@ -639,8 +688,19 @@ clnt_vc_destroy(CLIENT *cl)
ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
- KASSERT(!TAILQ_FIRST(&ct->ct_pending),
- ("Destroying RPC client with pending RPC requests"));
+ /*
+ * Abort any pending requests and wait until everyone
+ * has finished with clnt_vc_call.
+ */
+ ct->ct_closing = TRUE;
+ TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
+ cr->cr_xid = 0;
+ cr->cr_error = ESHUTDOWN;
+ wakeup(cr);
+ }
+
+ while (ct->ct_threads)
+ msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
if (ct->ct_closeit) {
so = ct->ct_socket;
@@ -732,7 +792,6 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
cr->cr_error = error;
wakeup(cr);
}
- TAILQ_INIT(&ct->ct_pending);
mtx_unlock(&ct->ct_lock);
break;
}
@@ -795,19 +854,14 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
if (cr->cr_xid == xid) {
/*
* This one
- * matches. We snip it
- * out of the pending
- * list and leave the
- * reply mbuf in
+ * matches. We leave
+ * the reply mbuf in
* cr->cr_mrep. Set
* the XID to zero so
- * that clnt_vc_call
- * can know not to
- * repeat the
- * TAILQ_REMOVE.
+ * that we will ignore
+ * any duplicaed
+ * replies.
*/
- TAILQ_REMOVE(&ct->ct_pending,
- cr, cr_link);
cr->cr_xid = 0;
cr->cr_mrep = ct->ct_record;
cr->cr_error = 0;
OpenPOWER on IntegriCloud