summaryrefslogtreecommitdiffstats
path: root/sys/rpc
diff options
context:
space:
mode:
authordfr <dfr@FreeBSD.org>2008-06-26 10:21:54 +0000
committerdfr <dfr@FreeBSD.org>2008-06-26 10:21:54 +0000
commit41cea6d5ca71b8cf057f9face8055b218b30e18e (patch)
tree994a214037913bc4e44eaee5070c65aeadf53485 /sys/rpc
parentca3c788812715a263f83dcec4bdabaf6c10eb922 (diff)
downloadFreeBSD-src-41cea6d5ca71b8cf057f9face8055b218b30e18e.zip
FreeBSD-src-41cea6d5ca71b8cf057f9face8055b218b30e18e.tar.gz
Re-implement the client side of rpc.lockd in the kernel. This implementation
provides the correct semantics for flock(2) style locks which are used by the lockf(1) command line tool and the pidfile(3) library. It also implements recovery from server restarts and ensures that dirty cache blocks are written to the server before obtaining locks (allowing multiple clients to use file locking to safely share data). Sponsored by: Isilon Systems PR: 94256 MFC after: 2 weeks
Diffstat (limited to 'sys/rpc')
-rw-r--r--sys/rpc/auth_unix.c127
-rw-r--r--sys/rpc/authunix_prot.c5
-rw-r--r--sys/rpc/clnt.h98
-rw-r--r--sys/rpc/clnt_dg.c218
-rw-r--r--sys/rpc/clnt_rc.c103
-rw-r--r--sys/rpc/clnt_vc.c138
-rw-r--r--sys/rpc/svc_vc.c49
7 files changed, 556 insertions, 182 deletions
diff --git a/sys/rpc/auth_unix.c b/sys/rpc/auth_unix.c
index a2f5fd2..757d2dd 100644
--- a/sys/rpc/auth_unix.c
+++ b/sys/rpc/auth_unix.c
@@ -50,9 +50,11 @@ __FBSDID("$FreeBSD$");
#include <sys/param.h>
#include <sys/systm.h>
+#include <sys/hash.h>
+#include <sys/kernel.h>
#include <sys/lock.h>
#include <sys/malloc.h>
-#include <sys/mutex.h>
+#include <sys/sx.h>
#include <sys/ucred.h>
#include <rpc/types.h>
@@ -81,14 +83,39 @@ static struct auth_ops authunix_ops = {
* This struct is pointed to by the ah_private field of an auth_handle.
*/
struct audata {
+ TAILQ_ENTRY(audata) au_link;
+ TAILQ_ENTRY(audata) au_alllink;
+ int au_refs;
+ struct xucred au_xcred;
struct opaque_auth au_origcred; /* original credentials */
struct opaque_auth au_shcred; /* short hand cred */
u_long au_shfaults; /* short hand cache faults */
char au_marshed[MAX_AUTH_BYTES];
u_int au_mpos; /* xdr pos at end of marshed */
+ AUTH *au_auth; /* link back to AUTH */
};
+TAILQ_HEAD(audata_list, audata);
#define AUTH_PRIVATE(auth) ((struct audata *)auth->ah_private)
+#define AUTH_UNIX_HASH_SIZE 16
+#define AUTH_UNIX_MAX 256
+static struct audata_list auth_unix_cache[AUTH_UNIX_HASH_SIZE];
+static struct audata_list auth_unix_all;
+static struct sx auth_unix_lock;
+static int auth_unix_count;
+
+static void
+authunix_init(void *dummy)
+{
+ int i;
+
+ for (i = 0; i < AUTH_UNIX_HASH_SIZE; i++)
+ TAILQ_INIT(&auth_unix_cache[i]);
+ TAILQ_INIT(&auth_unix_all);
+ sx_init(&auth_unix_lock, "auth_unix_lock");
+}
+SYSINIT(authunix_init, SI_SUB_KMEM, SI_ORDER_ANY, authunix_init, NULL);
+
/*
* Create a unix style authenticator.
* Returns an auth handle with the given stuff in it.
@@ -96,38 +123,70 @@ struct audata {
AUTH *
authunix_create(struct ucred *cred)
{
+ uint32_t h, th;
struct xucred xcr;
char mymem[MAX_AUTH_BYTES];
XDR xdrs;
AUTH *auth;
- struct audata *au;
+ struct audata *au, *tau;
struct timeval now;
uint32_t time;
int len;
+ if (auth_unix_count > AUTH_UNIX_MAX) {
+ while (auth_unix_count > AUTH_UNIX_MAX) {
+ sx_xlock(&auth_unix_lock);
+ tau = TAILQ_FIRST(&auth_unix_all);
+ th = HASHSTEP(HASHINIT, tau->au_xcred.cr_uid)
+ % AUTH_UNIX_HASH_SIZE;
+ TAILQ_REMOVE(&auth_unix_cache[th], tau, au_link);
+ TAILQ_REMOVE(&auth_unix_all, tau, au_alllink);
+ auth_unix_count--;
+ sx_xunlock(&auth_unix_lock);
+ AUTH_DESTROY(tau->au_auth);
+ }
+ }
+
+ /*
+ * Hash the uid to see if we already have an AUTH with this cred.
+ */
+ h = HASHSTEP(HASHINIT, cred->cr_uid) % AUTH_UNIX_HASH_SIZE;
+ cru2x(cred, &xcr);
+again:
+ sx_slock(&auth_unix_lock);
+ TAILQ_FOREACH(au, &auth_unix_cache[h], au_link) {
+ if (!memcmp(&xcr, &au->au_xcred, sizeof(xcr))) {
+ if (sx_try_upgrade(&auth_unix_lock)) {
+ /*
+ * Keep auth_unix_all LRU sorted.
+ */
+ TAILQ_REMOVE(&auth_unix_all, au, au_alllink);
+ TAILQ_INSERT_TAIL(&auth_unix_all, au,
+ au_alllink);
+ au->au_refs++;
+ sx_xunlock(&auth_unix_lock);
+ return (au->au_auth);
+ } else {
+ sx_sunlock(&auth_unix_lock);
+ goto again;
+ }
+ }
+ }
+
/*
* Allocate and set up auth handle
*/
au = NULL;
auth = mem_alloc(sizeof(*auth));
-#ifndef _KERNEL
- if (auth == NULL) {
- printf("authunix_create: out of memory");
- goto cleanup_authunix_create;
- }
-#endif
au = mem_alloc(sizeof(*au));
-#ifndef _KERNEL
- if (au == NULL) {
- printf("authunix_create: out of memory");
- goto cleanup_authunix_create;
- }
-#endif
auth->ah_ops = &authunix_ops;
auth->ah_private = (caddr_t)au;
auth->ah_verf = au->au_shcred = _null_auth;
+ au->au_refs = 1;
+ au->au_xcred = xcr;
au->au_shfaults = 0;
au->au_origcred.oa_base = NULL;
+ au->au_auth = auth;
getmicrotime(&now);
time = now.tv_sec;
@@ -141,14 +200,7 @@ authunix_create(struct ucred *cred)
panic("authunix_create: failed to encode creds");
au->au_origcred.oa_length = len = XDR_GETPOS(&xdrs);
au->au_origcred.oa_flavor = AUTH_UNIX;
-#ifdef _KERNEL
au->au_origcred.oa_base = mem_alloc((u_int) len);
-#else
- if ((au->au_origcred.oa_base = mem_alloc((u_int) len)) == NULL) {
- printf("authunix_create: out of memory");
- goto cleanup_authunix_create;
- }
-#endif
memcpy(au->au_origcred.oa_base, mymem, (size_t)len);
/*
@@ -156,18 +208,19 @@ authunix_create(struct ucred *cred)
*/
auth->ah_cred = au->au_origcred;
marshal_new_auth(auth);
- return (auth);
-#ifndef _KERNEL
- cleanup_authunix_create:
- if (auth)
- mem_free(auth, sizeof(*auth));
- if (au) {
- if (au->au_origcred.oa_base)
- mem_free(au->au_origcred.oa_base, (u_int)len);
- mem_free(au, sizeof(*au));
+
+ if (sx_try_upgrade(&auth_unix_lock)) {
+ auth_unix_count++;
+ TAILQ_INSERT_TAIL(&auth_unix_cache[h], au, au_link);
+ TAILQ_INSERT_TAIL(&auth_unix_all, au, au_alllink);
+ au->au_refs++; /* one for the cache, one for user */
+ sx_xunlock(&auth_unix_lock);
+ return (auth);
+ } else {
+ sx_sunlock(&auth_unix_lock);
+ AUTH_DESTROY(auth);
+ goto again;
}
- return (NULL);
-#endif
}
/*
@@ -262,8 +315,18 @@ static void
authunix_destroy(AUTH *auth)
{
struct audata *au;
+ int refs;
au = AUTH_PRIVATE(auth);
+
+ sx_xlock(&auth_unix_lock);
+ au->au_refs--;
+ refs = au->au_refs;
+ sx_xunlock(&auth_unix_lock);
+
+ if (refs > 0)
+ return;
+
mem_free(au->au_origcred.oa_base, au->au_origcred.oa_length);
if (au->au_shcred.oa_base != NULL)
diff --git a/sys/rpc/authunix_prot.c b/sys/rpc/authunix_prot.c
index 3092b1f..141f594 100644
--- a/sys/rpc/authunix_prot.c
+++ b/sys/rpc/authunix_prot.c
@@ -68,7 +68,12 @@ xdr_authunix_parms(XDR *xdrs, uint32_t *time, struct xucred *cred)
uint32_t junk;
if (xdrs->x_op == XDR_ENCODE) {
+ /*
+ * Restrict name length to 255 according to RFC 1057.
+ */
namelen = strlen(hostname);
+ if (namelen > 255)
+ namelen = 255;
} else {
namelen = 0;
}
diff --git a/sys/rpc/clnt.h b/sys/rpc/clnt.h
index 4d6a778..03e3112 100644
--- a/sys/rpc/clnt.h
+++ b/sys/rpc/clnt.h
@@ -62,6 +62,7 @@
#include <rpc/clnt_stat.h>
#include <sys/cdefs.h>
#ifdef _KERNEL
+#include <sys/refcount.h>
#include <rpc/netconfig.h>
#else
#include <netconfig.h>
@@ -109,6 +110,23 @@ struct rpc_err {
#define re_lb ru.RE_lb
};
+#ifdef _KERNEL
+/*
+ * Functions of this type may be used to receive notification when RPC
+ * calls have to be re-transmitted etc.
+ */
+typedef void rpc_feedback(int cmd, int procnum, void *);
+
+/*
+ * A structure used with CLNT_CALL_EXT to pass extra information used
+ * while processing an RPC call.
+ */
+struct rpc_callextra {
+ AUTH *rc_auth; /* auth handle to use for this call */
+ rpc_feedback *rc_feedback; /* callback for retransmits etc. */
+ void *rc_feedback_arg; /* argument for callback */
+};
+#endif
/*
* Client rpc handle.
@@ -116,12 +134,35 @@ struct rpc_err {
* Client is responsible for initializing auth, see e.g. auth_none.c.
*/
typedef struct __rpc_client {
+#ifdef _KERNEL
+ volatile u_int cl_refs; /* reference count */
+ AUTH *cl_auth; /* authenticator */
+ struct clnt_ops {
+ /* call remote procedure */
+ enum clnt_stat (*cl_call)(struct __rpc_client *,
+ struct rpc_callextra *, rpcproc_t, xdrproc_t, void *,
+ xdrproc_t, void *, struct timeval);
+ /* abort a call */
+ void (*cl_abort)(struct __rpc_client *);
+ /* get specific error code */
+ void (*cl_geterr)(struct __rpc_client *,
+ struct rpc_err *);
+ /* frees results */
+ bool_t (*cl_freeres)(struct __rpc_client *,
+ xdrproc_t, void *);
+ /* destroy this structure */
+ void (*cl_destroy)(struct __rpc_client *);
+ /* the ioctl() of rpc */
+ bool_t (*cl_control)(struct __rpc_client *, u_int,
+ void *);
+ } *cl_ops;
+#else
AUTH *cl_auth; /* authenticator */
struct clnt_ops {
/* call remote procedure */
enum clnt_stat (*cl_call)(struct __rpc_client *,
- rpcproc_t, xdrproc_t, void *, xdrproc_t,
- void *, struct timeval);
+ rpcproc_t, xdrproc_t, void *, xdrproc_t,
+ void *, struct timeval);
/* abort a call */
void (*cl_abort)(struct __rpc_client *);
/* get specific error code */
@@ -136,12 +177,12 @@ typedef struct __rpc_client {
bool_t (*cl_control)(struct __rpc_client *, u_int,
void *);
} *cl_ops;
+#endif
void *cl_private; /* private stuff */
char *cl_netid; /* network token */
char *cl_tp; /* device name */
} CLIENT;
-
/*
* Timers used for the pseudo-transport protocol when using datagrams
*/
@@ -154,8 +195,10 @@ struct rpc_timers {
/*
* Feedback values used for possible congestion and rate control
*/
-#define FEEDBACK_REXMIT1 1 /* first retransmit */
-#define FEEDBACK_OK 2 /* no retransmits */
+#define FEEDBACK_OK 1 /* no retransmits */
+#define FEEDBACK_REXMIT1 2 /* first retransmit */
+#define FEEDBACK_REXMIT2 3 /* second and subsequent retransmit */
+#define FEEDBACK_RECONNECT 4 /* client reconnect */
/* Used to set version of portmapper used in broadcast */
@@ -171,6 +214,30 @@ struct rpc_timers {
*
*/
+#ifdef _KERNEL
+#define CLNT_ACQUIRE(rh) \
+ refcount_acquire(&(rh)->cl_refs)
+#define CLNT_RELEASE(rh) \
+ if (refcount_release(&(rh)->cl_refs)) \
+ CLNT_DESTROY(rh)
+
+/*
+ * enum clnt_stat
+ * CLNT_CALL_EXT(rh, ext, proc, xargs, argsp, xres, resp, timeout)
+ * CLIENT *rh;
+ * struct rpc_callextra *ext;
+ * rpcproc_t proc;
+ * xdrproc_t xargs;
+ * void *argsp;
+ * xdrproc_t xres;
+ * void *resp;
+ * struct timeval timeout;
+ */
+#define CLNT_CALL_EXT(rh, ext, proc, xargs, argsp, xres, resp, secs) \
+ ((*(rh)->cl_ops->cl_call)(rh, ext, proc, xargs, \
+ argsp, xres, resp, secs))
+#endif
+
/*
* enum clnt_stat
* CLNT_CALL(rh, proc, xargs, argsp, xres, resp, timeout)
@@ -182,12 +249,21 @@ struct rpc_timers {
* void *resp;
* struct timeval timeout;
*/
-#define CLNT_CALL(rh, proc, xargs, argsp, xres, resp, secs) \
- ((*(rh)->cl_ops->cl_call)(rh, proc, xargs, \
+#ifdef _KERNEL
+#define CLNT_CALL(rh, proc, xargs, argsp, xres, resp, secs) \
+ ((*(rh)->cl_ops->cl_call)(rh, NULL, proc, xargs, \
argsp, xres, resp, secs))
-#define clnt_call(rh, proc, xargs, argsp, xres, resp, secs) \
- ((*(rh)->cl_ops->cl_call)(rh, proc, xargs, \
+#define clnt_call(rh, proc, xargs, argsp, xres, resp, secs) \
+ ((*(rh)->cl_ops->cl_call)(rh, NULL, proc, xargs, \
argsp, xres, resp, secs))
+#else
+#define CLNT_CALL(rh, proc, xargs, argsp, xres, resp, secs) \
+ ((*(rh)->cl_ops->cl_call)(rh, proc, xargs, \
+ argsp, xres, resp, secs))
+#define clnt_call(rh, proc, xargs, argsp, xres, resp, secs) \
+ ((*(rh)->cl_ops->cl_call)(rh, proc, xargs, \
+ argsp, xres, resp, secs))
+#endif
/*
* void
@@ -262,6 +338,8 @@ struct rpc_timers {
#define CLGET_WAITCHAN 22 /* get string used in msleep call */
#define CLSET_INTERRUPTIBLE 23 /* set interruptible flag */
#define CLGET_INTERRUPTIBLE 24 /* set interruptible flag */
+#define CLSET_RETRIES 25 /* set retry count for reconnect */
+#define CLGET_RETRIES 26 /* get retry count for reconnect */
#endif
@@ -534,6 +612,7 @@ __END_DECLS
#define rpc_createerr (*(__rpc_createerr()))
#endif
+#ifndef _KERNEL
/*
* The simplified interface:
* enum clnt_stat
@@ -612,7 +691,6 @@ extern enum clnt_stat rpc_broadcast_exp(const rpcprog_t, const rpcvers_t,
const int, const char *);
__END_DECLS
-#ifndef _KERNEL
/* For backward compatibility */
#include <rpc/clnt_soc.h>
#endif
diff --git a/sys/rpc/clnt_dg.c b/sys/rpc/clnt_dg.c
index c66ac50..f14e1d6 100644
--- a/sys/rpc/clnt_dg.c
+++ b/sys/rpc/clnt_dg.c
@@ -45,6 +45,7 @@ __FBSDID("$FreeBSD$");
#include <sys/param.h>
#include <sys/systm.h>
+#include <sys/kernel.h>
#include <sys/lock.h>
#include <sys/malloc.h>
#include <sys/mbuf.h>
@@ -70,8 +71,8 @@ __FBSDID("$FreeBSD$");
#endif
static bool_t time_not_ok(struct timeval *);
-static enum clnt_stat clnt_dg_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
- xdrproc_t, void *, struct timeval);
+static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
+ rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
static void clnt_dg_abort(CLIENT *);
@@ -91,10 +92,13 @@ static struct clnt_ops clnt_dg_ops = {
static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
/*
- * A pending RPC request which awaits a reply.
+ * A pending RPC request which awaits a reply. Requests which have
+ * received their reply will have cr_xid set to zero and cr_mrep to
+ * the mbuf chain of the reply.
*/
struct cu_request {
TAILQ_ENTRY(cu_request) cr_link;
+ CLIENT *cr_client; /* owner */
uint32_t cr_xid; /* XID of request */
struct mbuf *cr_mrep; /* reply received by upcall */
int cr_error; /* any error from upcall */
@@ -123,6 +127,8 @@ struct cu_socket {
* Private data kept per client handle
*/
struct cu_data {
+ int cu_threads; /* # threads in clnt_vc_call */
+ bool_t cu_closing; /* TRUE if we are destroying */
struct socket *cu_socket; /* connection socket */
bool_t cu_closeit; /* opened by library */
struct sockaddr_storage cu_raddr; /* remote address */
@@ -203,10 +209,12 @@ clnt_dg_create(
sendsz = ((sendsz + 3) / 4) * 4;
recvsz = ((recvsz + 3) / 4) * 4;
cu = mem_alloc(sizeof (*cu));
+ cu->cu_threads = 0;
+ cu->cu_closing = FALSE;
(void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
cu->cu_rlen = svcaddr->sa_len;
/* Other values can also be set through clnt_control() */
- cu->cu_wait.tv_sec = 15; /* heuristically chosen */
+ cu->cu_wait.tv_sec = 3; /* heuristically chosen */
cu->cu_wait.tv_usec = 0;
cu->cu_total.tv_sec = -1;
cu->cu_total.tv_usec = -1;
@@ -237,6 +245,7 @@ clnt_dg_create(
*/
cu->cu_closeit = FALSE;
cu->cu_socket = so;
+ soreserve(so, 256*1024, 256*1024);
SOCKBUF_LOCK(&so->so_rcv);
recheck_socket:
@@ -274,6 +283,7 @@ recheck_socket:
}
SOCKBUF_UNLOCK(&so->so_rcv);
+ cl->cl_refs = 1;
cl->cl_ops = &clnt_dg_ops;
cl->cl_private = (caddr_t)(void *)cu;
cl->cl_auth = authnone_create();
@@ -291,7 +301,8 @@ err2:
static enum clnt_stat
clnt_dg_call(
- CLIENT *cl, /* client handle */
+ CLIENT *cl, /* client handle */
+ struct rpc_callextra *ext, /* call metadata */
rpcproc_t proc, /* procedure number */
xdrproc_t xargs, /* xdr routine for args */
void *argsp, /* pointer to args */
@@ -301,30 +312,52 @@ clnt_dg_call(
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ AUTH *auth;
XDR xdrs;
struct rpc_msg reply_msg;
bool_t ok;
+ int retrans; /* number of re-transmits so far */
int nrefreshes = 2; /* number of times to refresh cred */
- struct timeval timeout;
- struct timeval retransmit_time;
- struct timeval next_sendtime, starttime, time_waited, tv;
+ struct timeval *tvp;
+ int timeout;
+ int retransmit_time;
+ int next_sendtime, starttime, time_waited, tv;
struct sockaddr *sa;
socklen_t salen;
uint32_t xid;
struct mbuf *mreq = NULL;
- struct cu_request cr;
+ struct cu_request *cr;
int error;
+ cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
+
mtx_lock(&cs->cs_lock);
- cr.cr_mrep = NULL;
- cr.cr_error = 0;
+ if (cu->cu_closing) {
+ mtx_unlock(&cs->cs_lock);
+ free(cr, M_RPC);
+ return (RPC_CANTSEND);
+ }
+ cu->cu_threads++;
+
+ if (ext)
+ auth = ext->rc_auth;
+ else
+ auth = cl->cl_auth;
+
+ cr->cr_client = cl;
+ cr->cr_mrep = NULL;
+ cr->cr_error = 0;
if (cu->cu_total.tv_usec == -1) {
- timeout = utimeout; /* use supplied timeout */
+ tvp = &utimeout; /* use supplied timeout */
} else {
- timeout = cu->cu_total; /* use default timeout */
+ tvp = &cu->cu_total; /* use default timeout */
}
+ if (tvp->tv_sec || tvp->tv_usec)
+ timeout = tvtohz(tvp);
+ else
+ timeout = 0;
if (cu->cu_connect && !cu->cu_connected) {
mtx_unlock(&cs->cs_lock);
@@ -345,11 +378,11 @@ clnt_dg_call(
sa = (struct sockaddr *)&cu->cu_raddr;
salen = cu->cu_rlen;
}
- time_waited.tv_sec = 0;
- time_waited.tv_usec = 0;
- retransmit_time = next_sendtime = cu->cu_wait;
+ time_waited = 0;
+ retrans = 0;
+ retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
- getmicrotime(&starttime);
+ starttime = ticks;
call_again:
mtx_assert(&cs->cs_lock, MA_OWNED);
@@ -376,7 +409,7 @@ send_again:
goto get_reply;
if ((! XDR_PUTINT32(&xdrs, &proc)) ||
- (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) ||
+ (! AUTH_MARSHALL(auth, &xdrs)) ||
(! (*xargs)(&xdrs, argsp))) {
cu->cu_error.re_status = RPC_CANTENCODEARGS;
mtx_lock(&cs->cs_lock);
@@ -384,9 +417,9 @@ send_again:
}
m_fixhdr(mreq);
- cr.cr_xid = xid;
+ cr->cr_xid = xid;
mtx_lock(&cs->cs_lock);
- TAILQ_INSERT_TAIL(&cs->cs_pending, &cr, cr_link);
+ TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
mtx_unlock(&cs->cs_lock);
/*
@@ -406,8 +439,7 @@ send_again:
mtx_lock(&cs->cs_lock);
if (error) {
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
-
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
cu->cu_error.re_errno = error;
cu->cu_error.re_status = RPC_CANTSEND;
goto out;
@@ -415,24 +447,24 @@ send_again:
/*
* Check to see if we got an upcall while waiting for the
- * lock. In both these cases, the request has been removed
- * from cs->cs_pending.
+ * lock.
*/
- if (cr.cr_error) {
- cu->cu_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
+ cu->cu_error.re_errno = cr->cr_error;
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
- if (cr.cr_mrep) {
+ if (cr->cr_mrep) {
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
goto got_reply;
}
/*
* Hack to provide rpc-based message passing
*/
- if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
+ if (timeout == 0) {
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
cu->cu_error.re_status = RPC_TIMEDOUT;
goto out;
}
@@ -440,17 +472,23 @@ send_again:
get_reply:
for (;;) {
/* Decide how long to wait. */
- if (timevalcmp(&next_sendtime, &timeout, <)) {
+ if (next_sendtime < timeout)
tv = next_sendtime;
- } else {
+ else
tv = timeout;
+ tv -= time_waited;
+
+ if (tv > 0) {
+ if (cu->cu_closing)
+ error = 0;
+ else
+ error = msleep(cr, &cs->cs_lock,
+ cu->cu_waitflag, cu->cu_waitchan, tv);
+ } else {
+ error = EWOULDBLOCK;
}
- timevalsub(&tv, &time_waited);
- if (tv.tv_sec < 0 || tv.tv_usec < 0)
- tv.tv_sec = tv.tv_usec = 0;
- error = msleep(&cr, &cs->cs_lock, cu->cu_waitflag,
- cu->cu_waitchan, tvtohz(&tv));
+ TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
if (!error) {
/*
@@ -458,8 +496,8 @@ get_reply:
* upcall had a receive error, report that,
* otherwise we have a reply.
*/
- if (cr.cr_error) {
- cu->cu_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ cu->cu_error.re_errno = cr->cr_error;
cu->cu_error.re_status = RPC_CANTRECV;
goto out;
}
@@ -472,8 +510,6 @@ get_reply:
* re-send the request.
*/
if (error != EWOULDBLOCK) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
cu->cu_error.re_errno = error;
if (error == EINTR)
cu->cu_error.re_status = RPC_INTR;
@@ -482,29 +518,40 @@ get_reply:
goto out;
}
- getmicrotime(&tv);
- time_waited = tv;
- timevalsub(&time_waited, &starttime);
+ time_waited = ticks - starttime;
/* Check for timeout. */
- if (timevalcmp(&time_waited, &timeout, >)) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
+ if (time_waited > timeout) {
cu->cu_error.re_errno = EWOULDBLOCK;
cu->cu_error.re_status = RPC_TIMEDOUT;
goto out;
}
/* Retransmit if necessary. */
- if (timevalcmp(&time_waited, &next_sendtime, >)) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link);
+ if (time_waited >= next_sendtime) {
+ if (ext && ext->rc_feedback) {
+ mtx_unlock(&cs->cs_lock);
+ if (retrans == 0)
+ ext->rc_feedback(FEEDBACK_REXMIT1,
+ proc, ext->rc_feedback_arg);
+ else
+ ext->rc_feedback(FEEDBACK_REXMIT2,
+ proc, ext->rc_feedback_arg);
+ mtx_lock(&cs->cs_lock);
+ }
+ if (cu->cu_closing) {
+ cu->cu_error.re_errno = ESHUTDOWN;
+ cu->cu_error.re_status = RPC_CANTRECV;
+ goto out;
+ }
+ retrans++;
/* update retransmit_time */
- if (retransmit_time.tv_sec < RPC_MAX_BACKOFF)
- timevaladd(&retransmit_time, &retransmit_time);
- timevaladd(&next_sendtime, &retransmit_time);
+ if (retransmit_time < RPC_MAX_BACKOFF * hz)
+ retransmit_time = 2 * retransmit_time;
+ next_sendtime += retransmit_time;
goto send_again;
}
+ TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
}
got_reply:
@@ -514,10 +561,13 @@ got_reply:
*/
mtx_unlock(&cs->cs_lock);
- xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE);
+ if (ext && ext->rc_feedback)
+ ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
+
+ xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
ok = xdr_replymsg(&xdrs, &reply_msg);
XDR_DESTROY(&xdrs);
- cr.cr_mrep = NULL;
+ cr->cr_mrep = NULL;
mtx_lock(&cs->cs_lock);
@@ -562,10 +612,17 @@ out:
if (mreq)
m_freem(mreq);
- if (cr.cr_mrep)
- m_freem(cr.cr_mrep);
+ if (cr->cr_mrep)
+ m_freem(cr->cr_mrep);
+ cu->cu_threads--;
+ if (cu->cu_closing)
+ wakeup(cu);
+
mtx_unlock(&cs->cs_lock);
+
+ free(cr, M_RPC);
+
return (cu->cu_error.re_status);
}
@@ -732,30 +789,44 @@ clnt_dg_destroy(CLIENT *cl)
{
struct cu_data *cu = (struct cu_data *)cl->cl_private;
struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
+ struct cu_request *cr;
struct socket *so = NULL;
bool_t lastsocketref;
- SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
-
mtx_lock(&cs->cs_lock);
+
+ /*
+ * Abort any pending requests and wait until everyone
+ * has finished with clnt_vc_call.
+ */
+ cu->cu_closing = TRUE;
+ TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
+ if (cr->cr_client == cl) {
+ cr->cr_xid = 0;
+ cr->cr_error = ESHUTDOWN;
+ wakeup(cr);
+ }
+ }
+
+ while (cu->cu_threads)
+ msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
+
cs->cs_refs--;
if (cs->cs_refs == 0) {
+ mtx_destroy(&cs->cs_lock);
+ SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
cu->cu_socket->so_upcallarg = NULL;
cu->cu_socket->so_upcall = NULL;
cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL;
- mtx_destroy(&cs->cs_lock);
SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
mem_free(cs, sizeof(*cs));
lastsocketref = TRUE;
} else {
mtx_unlock(&cs->cs_lock);
- SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
lastsocketref = FALSE;
}
- if (cu->cu_closeit) {
- KASSERT(lastsocketref, ("clnt_dg_destroy(): closing a socket "
- "shared with other clients"));
+ if (cu->cu_closeit && lastsocketref) {
so = cu->cu_socket;
cu->cu_socket = NULL;
}
@@ -812,10 +883,10 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
if (error) {
mtx_lock(&cs->cs_lock);
TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
+ cr->cr_xid = 0;
cr->cr_error = error;
wakeup(cr);
}
- TAILQ_INIT(&cs->cs_pending);
mtx_unlock(&cs->cs_lock);
break;
}
@@ -825,7 +896,11 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
*/
m = m_pullup(m, sizeof(xid));
if (!m)
- break;
+ /*
+ * Should never happen.
+ */
+ continue;
+
xid = ntohl(*mtod(m, uint32_t *));
/*
@@ -836,14 +911,13 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
if (cr->cr_xid == xid) {
/*
- * This one matches. We snip it out of
- * the pending list and leave the
+ * This one matches. We leave the
* reply mbuf in cr->cr_mrep. Set the
- * XID to zero so that clnt_dg_call
- * can know not to repeat the
- * TAILQ_REMOVE.
+ * XID to zero so that we will ignore
+ * any duplicated replies that arrive
+ * before clnt_dg_call removes it from
+ * the queue.
*/
- TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
cr->cr_xid = 0;
cr->cr_mrep = m;
cr->cr_error = 0;
diff --git a/sys/rpc/clnt_rc.c b/sys/rpc/clnt_rc.c
index 8b5fc26..a6b2dfd 100644
--- a/sys/rpc/clnt_rc.c
+++ b/sys/rpc/clnt_rc.c
@@ -30,6 +30,7 @@ __FBSDID("$FreeBSD$");
#include <sys/param.h>
#include <sys/systm.h>
+#include <sys/limits.h>
#include <sys/lock.h>
#include <sys/malloc.h>
#include <sys/mbuf.h>
@@ -44,8 +45,8 @@ __FBSDID("$FreeBSD$");
#include <rpc/rpc.h>
#include <rpc/rpc_com.h>
-static enum clnt_stat clnt_reconnect_call(CLIENT *, rpcproc_t,
- xdrproc_t, void *, xdrproc_t, void *, struct timeval);
+static enum clnt_stat clnt_reconnect_call(CLIENT *, struct rpc_callextra *,
+ rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
static void clnt_reconnect_geterr(CLIENT *, struct rpc_err *);
static bool_t clnt_reconnect_freeres(CLIENT *, xdrproc_t, void *);
static void clnt_reconnect_abort(CLIENT *);
@@ -62,6 +63,7 @@ static struct clnt_ops clnt_reconnect_ops = {
};
struct rc_data {
+ struct mtx rc_lock;
struct sockaddr_storage rc_addr; /* server address */
struct netconfig* rc_nconf; /* network type */
rpcprog_t rc_prog; /* program number */
@@ -70,8 +72,10 @@ struct rc_data {
size_t rc_recvsz;
struct timeval rc_timeout;
struct timeval rc_retry;
+ int rc_retries;
const char *rc_waitchan;
int rc_intr;
+ int rc_connecting;
CLIENT* rc_client; /* underlying RPC client */
};
@@ -94,6 +98,7 @@ clnt_reconnect_create(
cl = mem_alloc(sizeof (CLIENT));
rc = mem_alloc(sizeof (*rc));
+ mtx_init(&rc->rc_lock, "rc->rc_lock", NULL, MTX_DEF);
(void) memcpy(&rc->rc_addr, svcaddr, (size_t)svcaddr->sa_len);
rc->rc_nconf = nconf;
rc->rc_prog = program;
@@ -102,12 +107,15 @@ clnt_reconnect_create(
rc->rc_recvsz = recvsz;
rc->rc_timeout.tv_sec = -1;
rc->rc_timeout.tv_usec = -1;
- rc->rc_retry.tv_sec = 15;
+ rc->rc_retry.tv_sec = 3;
rc->rc_retry.tv_usec = 0;
+ rc->rc_retries = INT_MAX;
rc->rc_waitchan = "rpcrecv";
rc->rc_intr = 0;
+ rc->rc_connecting = FALSE;
rc->rc_client = NULL;
+ cl->cl_refs = 1;
cl->cl_ops = &clnt_reconnect_ops;
cl->cl_private = (caddr_t)(void *)rc;
cl->cl_auth = authnone_create();
@@ -121,13 +129,39 @@ clnt_reconnect_connect(CLIENT *cl)
{
struct rc_data *rc = (struct rc_data *)cl->cl_private;
struct socket *so;
+ enum clnt_stat stat;
+ int error;
int one = 1;
+ mtx_lock(&rc->rc_lock);
+again:
+ if (rc->rc_connecting) {
+ while (!rc->rc_client) {
+ error = msleep(rc, &rc->rc_lock,
+ rc->rc_intr ? PCATCH : 0, "rpcrecon", 0);
+ if (error) {
+ mtx_unlock(&rc->rc_lock);
+ return (RPC_INTR);
+ }
+ }
+ /*
+ * If the other guy failed to connect, we might as
+ * well have another go.
+ */
+ if (!rc->rc_client && !rc->rc_connecting)
+ goto again;
+ mtx_unlock(&rc->rc_lock);
+ return (RPC_SUCCESS);
+ } else {
+ rc->rc_connecting = TRUE;
+ }
+ mtx_unlock(&rc->rc_lock);
+
so = __rpc_nconf2socket(rc->rc_nconf);
if (!so) {
- rpc_createerr.cf_stat = RPC_TLIERROR;
+ stat = rpc_createerr.cf_stat = RPC_TLIERROR;
rpc_createerr.cf_error.re_errno = 0;
- return (RPC_TLIERROR);
+ goto out;
}
if (rc->rc_nconf->nc_semantics == NC_TPI_CLTS)
@@ -139,8 +173,10 @@ clnt_reconnect_connect(CLIENT *cl)
(struct sockaddr *) &rc->rc_addr, rc->rc_prog, rc->rc_vers,
rc->rc_sendsz, rc->rc_recvsz);
- if (!rc->rc_client)
- return (rpc_createerr.cf_stat);
+ if (!rc->rc_client) {
+ stat = rpc_createerr.cf_stat;
+ goto out;
+ }
CLNT_CONTROL(rc->rc_client, CLSET_FD_CLOSE, 0);
CLNT_CONTROL(rc->rc_client, CLSET_CONNECT, &one);
@@ -148,13 +184,21 @@ clnt_reconnect_connect(CLIENT *cl)
CLNT_CONTROL(rc->rc_client, CLSET_RETRY_TIMEOUT, &rc->rc_retry);
CLNT_CONTROL(rc->rc_client, CLSET_WAITCHAN, &rc->rc_waitchan);
CLNT_CONTROL(rc->rc_client, CLSET_INTERRUPTIBLE, &rc->rc_intr);
+ stat = RPC_SUCCESS;
+
+out:
+ mtx_lock(&rc->rc_lock);
+ rc->rc_connecting = FALSE;
+ wakeup(rc);
+ mtx_unlock(&rc->rc_lock);
- return (RPC_SUCCESS);
+ return (stat);
}
static enum clnt_stat
clnt_reconnect_call(
- CLIENT *cl, /* client handle */
+ CLIENT *cl, /* client handle */
+ struct rpc_callextra *ext, /* call metadata */
rpcproc_t proc, /* procedure number */
xdrproc_t xargs, /* xdr routine for args */
void *argsp, /* pointer to args */
@@ -163,8 +207,11 @@ clnt_reconnect_call(
struct timeval utimeout) /* seconds to wait before giving up */
{
struct rc_data *rc = (struct rc_data *)cl->cl_private;
+ CLIENT *client;
enum clnt_stat stat;
+ int tries;
+ tries = 0;
do {
if (!rc->rc_client) {
stat = clnt_reconnect_connect(cl);
@@ -172,9 +219,14 @@ clnt_reconnect_call(
return (stat);
}
- stat = CLNT_CALL(rc->rc_client, proc, xargs, argsp,
+ mtx_lock(&rc->rc_lock);
+ CLNT_ACQUIRE(rc->rc_client);
+ client = rc->rc_client;
+ mtx_unlock(&rc->rc_lock);
+ stat = CLNT_CALL_EXT(client, ext, proc, xargs, argsp,
xresults, resultsp, utimeout);
+ CLNT_RELEASE(client);
if (stat == RPC_TIMEDOUT) {
/*
* Check for async send misfeature for NLM
@@ -184,16 +236,33 @@ clnt_reconnect_call(
&& rc->rc_timeout.tv_usec == 0)
|| (rc->rc_timeout.tv_sec == -1
&& utimeout.tv_sec == 0
- && utimeout.tv_usec == 0))
+ && utimeout.tv_usec == 0)) {
break;
+ }
}
if (stat == RPC_INTR)
break;
if (stat != RPC_SUCCESS) {
- CLNT_DESTROY(rc->rc_client);
- rc->rc_client = NULL;
+ tries++;
+ if (tries >= rc->rc_retries)
+ break;
+
+ if (ext && ext->rc_feedback)
+ ext->rc_feedback(FEEDBACK_RECONNECT, proc,
+ ext->rc_feedback_arg);
+
+ mtx_lock(&rc->rc_lock);
+ /*
+ * Make sure that someone else hasn't already
+ * reconnected.
+ */
+ if (rc->rc_client == client) {
+ CLNT_RELEASE(rc->rc_client);
+ rc->rc_client = NULL;
+ }
+ mtx_unlock(&rc->rc_lock);
}
} while (stat != RPC_SUCCESS);
@@ -294,6 +363,14 @@ clnt_reconnect_control(CLIENT *cl, u_int request, void *info)
*(int *) info = rc->rc_intr;
break;
+ case CLSET_RETRIES:
+ rc->rc_retries = *(int *) info;
+ break;
+
+ case CLGET_RETRIES:
+ *(int *) info = rc->rc_retries;
+ break;
+
default:
return (FALSE);
}
diff --git a/sys/rpc/clnt_vc.c b/sys/rpc/clnt_vc.c
index 5731e1e..cb09352 100644
--- a/sys/rpc/clnt_vc.c
+++ b/sys/rpc/clnt_vc.c
@@ -80,8 +80,8 @@ struct cmessage {
struct cmsgcred cmcred;
};
-static enum clnt_stat clnt_vc_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
- xdrproc_t, void *, struct timeval);
+static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
+ rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
static void clnt_vc_abort(CLIENT *);
@@ -100,7 +100,9 @@ static struct clnt_ops clnt_vc_ops = {
};
/*
- * A pending RPC request which awaits a reply.
+ * A pending RPC request which awaits a reply. Requests which have
+ * received their reply will have cr_xid set to zero and cr_mrep to
+ * the mbuf chain of the reply.
*/
struct ct_request {
TAILQ_ENTRY(ct_request) cr_link;
@@ -113,6 +115,8 @@ TAILQ_HEAD(ct_request_list, ct_request);
struct ct_data {
struct mtx ct_lock;
+ int ct_threads; /* number of threads in clnt_vc_call */
+ bool_t ct_closing; /* TRUE if we are destroying client */
struct socket *ct_socket; /* connection socket */
bool_t ct_closeit; /* close it on destroy */
struct timeval ct_wait; /* wait interval in milliseconds */
@@ -161,7 +165,7 @@ clnt_vc_create(
static uint32_t disrupt;
struct __rpc_sockinfo si;
XDR xdrs;
- int error;
+ int error, interrupted;
if (disrupt == 0)
disrupt = (uint32_t)(long)raddr;
@@ -170,10 +174,31 @@ clnt_vc_create(
ct = (struct ct_data *)mem_alloc(sizeof (*ct));
mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
+ ct->ct_threads = 0;
+ ct->ct_closing = FALSE;
if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
error = soconnect(so, raddr, curthread);
+ SOCK_LOCK(so);
+ interrupted = 0;
+ while ((so->so_state & SS_ISCONNECTING)
+ && so->so_error == 0) {
+ error = msleep(&so->so_timeo, SOCK_MTX(so),
+ PSOCK | PCATCH, "connec", 0);
+ if (error) {
+ if (error == EINTR || error == ERESTART)
+ interrupted = 1;
+ break;
+ }
+ }
+ if (error == 0) {
+ error = so->so_error;
+ so->so_error = 0;
+ }
+ SOCK_UNLOCK(so);
if (error) {
+ if (!interrupted)
+ so->so_state &= ~SS_ISCONNECTING;
rpc_createerr.cf_stat = RPC_SYSTEMERROR;
rpc_createerr.cf_error.re_errno = error;
goto err;
@@ -224,6 +249,7 @@ clnt_vc_create(
* Create a client handle which uses xdrrec for serialization
* and authnone for authentication.
*/
+ cl->cl_refs = 1;
cl->cl_ops = &clnt_vc_ops;
cl->cl_private = ct;
cl->cl_auth = authnone_create();
@@ -255,6 +281,7 @@ err:
static enum clnt_stat
clnt_vc_call(
CLIENT *cl,
+ struct rpc_callextra *ext,
rpcproc_t proc,
xdrproc_t xdr_args,
void *args_ptr,
@@ -263,6 +290,7 @@ clnt_vc_call(
struct timeval utimeout)
{
struct ct_data *ct = (struct ct_data *) cl->cl_private;
+ AUTH *auth;
XDR xdrs;
struct rpc_msg reply_msg;
bool_t ok;
@@ -270,13 +298,27 @@ clnt_vc_call(
struct timeval timeout;
uint32_t xid;
struct mbuf *mreq = NULL;
- struct ct_request cr;
+ struct ct_request *cr;
int error;
+ cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
+
mtx_lock(&ct->ct_lock);
- cr.cr_mrep = NULL;
- cr.cr_error = 0;
+ if (ct->ct_closing) {
+ mtx_unlock(&ct->ct_lock);
+ free(cr, M_RPC);
+ return (RPC_CANTSEND);
+ }
+ ct->ct_threads++;
+
+ if (ext)
+ auth = ext->rc_auth;
+ else
+ auth = cl->cl_auth;
+
+ cr->cr_mrep = NULL;
+ cr->cr_error = 0;
if (ct->ct_wait.tv_usec == -1) {
timeout = utimeout; /* use supplied timeout */
@@ -311,12 +353,12 @@ call_again:
ct->ct_error.re_status = RPC_SUCCESS;
if ((! XDR_PUTINT32(&xdrs, &proc)) ||
- (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) ||
+ (! AUTH_MARSHALL(auth, &xdrs)) ||
(! (*xdr_args)(&xdrs, args_ptr))) {
if (ct->ct_error.re_status == RPC_SUCCESS)
ct->ct_error.re_status = RPC_CANTENCODEARGS;
- m_freem(mreq);
- return (ct->ct_error.re_status);
+ mtx_lock(&ct->ct_lock);
+ goto out;
}
m_fixhdr(mreq);
@@ -327,9 +369,9 @@ call_again:
*mtod(mreq, uint32_t *) =
htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
- cr.cr_xid = xid;
+ cr->cr_xid = xid;
mtx_lock(&ct->ct_lock);
- TAILQ_INSERT_TAIL(&ct->ct_pending, &cr, cr_link);
+ TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
mtx_unlock(&ct->ct_lock);
/*
@@ -343,10 +385,8 @@ call_again:
reply_msg.acpted_rply.ar_results.proc = xdr_results;
mtx_lock(&ct->ct_lock);
-
if (error) {
- TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
-
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
ct->ct_error.re_errno = error;
ct->ct_error.re_status = RPC_CANTSEND;
goto out;
@@ -357,12 +397,14 @@ call_again:
* lock. In both these cases, the request has been removed
* from ct->ct_pending.
*/
- if (cr.cr_error) {
- ct->ct_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
+ ct->ct_error.re_errno = cr->cr_error;
ct->ct_error.re_status = RPC_CANTRECV;
goto out;
}
- if (cr.cr_mrep) {
+ if (cr->cr_mrep) {
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
goto got_reply;
}
@@ -370,23 +412,22 @@ call_again:
* Hack to provide rpc-based message passing
*/
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
- if (cr.cr_xid)
- TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
ct->ct_error.re_status = RPC_TIMEDOUT;
goto out;
}
- error = msleep(&cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
+ error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
tvtohz(&timeout));
+ TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
+
if (error) {
/*
* The sleep returned an error so our request is still
* on the list. Turn the error code into an
* appropriate client status.
*/
- if (cr.cr_xid)
- TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
ct->ct_error.re_errno = error;
switch (error) {
case EINTR:
@@ -405,8 +446,8 @@ call_again:
* upcall had a receive error, report that,
* otherwise we have a reply.
*/
- if (cr.cr_error) {
- ct->ct_error.re_errno = cr.cr_error;
+ if (cr->cr_error) {
+ ct->ct_error.re_errno = cr->cr_error;
ct->ct_error.re_status = RPC_CANTRECV;
goto out;
}
@@ -419,10 +460,10 @@ got_reply:
*/
mtx_unlock(&ct->ct_lock);
- xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE);
+ xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
ok = xdr_replymsg(&xdrs, &reply_msg);
XDR_DESTROY(&xdrs);
- cr.cr_mrep = NULL;
+ cr->cr_mrep = NULL;
mtx_lock(&ct->ct_lock);
@@ -466,10 +507,17 @@ out:
if (mreq)
m_freem(mreq);
- if (cr.cr_mrep)
- m_freem(cr.cr_mrep);
+ if (cr->cr_mrep)
+ m_freem(cr->cr_mrep);
+ ct->ct_threads--;
+ if (ct->ct_closing)
+ wakeup(ct);
+
mtx_unlock(&ct->ct_lock);
+
+ free(cr, M_RPC);
+
return (ct->ct_error.re_status);
}
@@ -628,6 +676,7 @@ static void
clnt_vc_destroy(CLIENT *cl)
{
struct ct_data *ct = (struct ct_data *) cl->cl_private;
+ struct ct_request *cr;
struct socket *so = NULL;
mtx_lock(&ct->ct_lock);
@@ -639,8 +688,19 @@ clnt_vc_destroy(CLIENT *cl)
ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
- KASSERT(!TAILQ_FIRST(&ct->ct_pending),
- ("Destroying RPC client with pending RPC requests"));
+ /*
+ * Abort any pending requests and wait until everyone
+ * has finished with clnt_vc_call.
+ */
+ ct->ct_closing = TRUE;
+ TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
+ cr->cr_xid = 0;
+ cr->cr_error = ESHUTDOWN;
+ wakeup(cr);
+ }
+
+ while (ct->ct_threads)
+ msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
if (ct->ct_closeit) {
so = ct->ct_socket;
@@ -732,7 +792,6 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
cr->cr_error = error;
wakeup(cr);
}
- TAILQ_INIT(&ct->ct_pending);
mtx_unlock(&ct->ct_lock);
break;
}
@@ -795,19 +854,14 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
if (cr->cr_xid == xid) {
/*
* This one
- * matches. We snip it
- * out of the pending
- * list and leave the
- * reply mbuf in
+ * matches. We leave
+ * the reply mbuf in
* cr->cr_mrep. Set
* the XID to zero so
- * that clnt_vc_call
- * can know not to
- * repeat the
- * TAILQ_REMOVE.
+ * that we will ignore
+ * any duplicaed
+ * replies.
*/
- TAILQ_REMOVE(&ct->ct_pending,
- cr, cr_link);
cr->cr_xid = 0;
cr->cr_mrep = ct->ct_record;
cr->cr_error = 0;
diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c
index 54edfd0..47530da 100644
--- a/sys/rpc/svc_vc.c
+++ b/sys/rpc/svc_vc.c
@@ -132,6 +132,15 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
struct sockaddr* sa;
int error;
+ if (so->so_state & SS_ISCONNECTED) {
+ error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa);
+ if (error)
+ return (NULL);
+ xprt = svc_vc_create_conn(pool, so, sa);
+ free(sa, M_SONAME);
+ return (xprt);
+ }
+
xprt = mem_alloc(sizeof(SVCXPRT));
mtx_init(&xprt->xp_lock, "xprt->xp_lock", NULL, MTX_DEF);
xprt->xp_pool = pool;
@@ -180,8 +189,32 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
SVCXPRT *xprt = NULL;
struct cf_conn *cd = NULL;
struct sockaddr* sa = NULL;
+ struct sockopt opt;
+ int one = 1;
int error;
+ bzero(&opt, sizeof(struct sockopt));
+ opt.sopt_dir = SOPT_SET;
+ opt.sopt_level = SOL_SOCKET;
+ opt.sopt_name = SO_KEEPALIVE;
+ opt.sopt_val = &one;
+ opt.sopt_valsize = sizeof(one);
+ error = sosetopt(so, &opt);
+ if (error)
+ return (NULL);
+
+ if (so->so_proto->pr_protocol == IPPROTO_TCP) {
+ bzero(&opt, sizeof(struct sockopt));
+ opt.sopt_dir = SOPT_SET;
+ opt.sopt_level = IPPROTO_TCP;
+ opt.sopt_name = TCP_NODELAY;
+ opt.sopt_val = &one;
+ opt.sopt_valsize = sizeof(one);
+ error = sosetopt(so, &opt);
+ if (error)
+ return (NULL);
+ }
+
cd = mem_alloc(sizeof(*cd));
cd->strm_stat = XPRT_IDLE;
@@ -306,8 +339,6 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg)
{
struct socket *so = NULL;
struct sockaddr *sa = NULL;
- struct sockopt opt;
- int one = 1;
int error;
/*
@@ -351,16 +382,6 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg)
sa = 0;
error = soaccept(so, &sa);
- if (!error) {
- bzero(&opt, sizeof(struct sockopt));
- opt.sopt_dir = SOPT_SET;
- opt.sopt_level = IPPROTO_TCP;
- opt.sopt_name = TCP_NODELAY;
- opt.sopt_val = &one;
- opt.sopt_valsize = sizeof(one);
- error = sosetopt(so, &opt);
- }
-
if (error) {
/*
* XXX not sure if I need to call sofree or soclose here.
@@ -374,7 +395,9 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg)
* svc_vc_create_conn will call xprt_register - we don't need
* to do anything with the new connection.
*/
- svc_vc_create_conn(xprt->xp_pool, so, sa);
+ if (!svc_vc_create_conn(xprt->xp_pool, so, sa))
+ soclose(so);
+
free(sa, M_SONAME);
return (FALSE); /* there is never an rpc msg to be processed */
OpenPOWER on IntegriCloud