diff options
Diffstat (limited to 'sys/rpc')
-rw-r--r-- | sys/rpc/auth_unix.c | 127 | ||||
-rw-r--r-- | sys/rpc/authunix_prot.c | 5 | ||||
-rw-r--r-- | sys/rpc/clnt.h | 98 | ||||
-rw-r--r-- | sys/rpc/clnt_dg.c | 218 | ||||
-rw-r--r-- | sys/rpc/clnt_rc.c | 103 | ||||
-rw-r--r-- | sys/rpc/clnt_vc.c | 138 | ||||
-rw-r--r-- | sys/rpc/svc_vc.c | 49 |
7 files changed, 556 insertions, 182 deletions
diff --git a/sys/rpc/auth_unix.c b/sys/rpc/auth_unix.c index a2f5fd2..757d2dd 100644 --- a/sys/rpc/auth_unix.c +++ b/sys/rpc/auth_unix.c @@ -50,9 +50,11 @@ __FBSDID("$FreeBSD$"); #include <sys/param.h> #include <sys/systm.h> +#include <sys/hash.h> +#include <sys/kernel.h> #include <sys/lock.h> #include <sys/malloc.h> -#include <sys/mutex.h> +#include <sys/sx.h> #include <sys/ucred.h> #include <rpc/types.h> @@ -81,14 +83,39 @@ static struct auth_ops authunix_ops = { * This struct is pointed to by the ah_private field of an auth_handle. */ struct audata { + TAILQ_ENTRY(audata) au_link; + TAILQ_ENTRY(audata) au_alllink; + int au_refs; + struct xucred au_xcred; struct opaque_auth au_origcred; /* original credentials */ struct opaque_auth au_shcred; /* short hand cred */ u_long au_shfaults; /* short hand cache faults */ char au_marshed[MAX_AUTH_BYTES]; u_int au_mpos; /* xdr pos at end of marshed */ + AUTH *au_auth; /* link back to AUTH */ }; +TAILQ_HEAD(audata_list, audata); #define AUTH_PRIVATE(auth) ((struct audata *)auth->ah_private) +#define AUTH_UNIX_HASH_SIZE 16 +#define AUTH_UNIX_MAX 256 +static struct audata_list auth_unix_cache[AUTH_UNIX_HASH_SIZE]; +static struct audata_list auth_unix_all; +static struct sx auth_unix_lock; +static int auth_unix_count; + +static void +authunix_init(void *dummy) +{ + int i; + + for (i = 0; i < AUTH_UNIX_HASH_SIZE; i++) + TAILQ_INIT(&auth_unix_cache[i]); + TAILQ_INIT(&auth_unix_all); + sx_init(&auth_unix_lock, "auth_unix_lock"); +} +SYSINIT(authunix_init, SI_SUB_KMEM, SI_ORDER_ANY, authunix_init, NULL); + /* * Create a unix style authenticator. * Returns an auth handle with the given stuff in it. @@ -96,38 +123,70 @@ struct audata { AUTH * authunix_create(struct ucred *cred) { + uint32_t h, th; struct xucred xcr; char mymem[MAX_AUTH_BYTES]; XDR xdrs; AUTH *auth; - struct audata *au; + struct audata *au, *tau; struct timeval now; uint32_t time; int len; + if (auth_unix_count > AUTH_UNIX_MAX) { + while (auth_unix_count > AUTH_UNIX_MAX) { + sx_xlock(&auth_unix_lock); + tau = TAILQ_FIRST(&auth_unix_all); + th = HASHSTEP(HASHINIT, tau->au_xcred.cr_uid) + % AUTH_UNIX_HASH_SIZE; + TAILQ_REMOVE(&auth_unix_cache[th], tau, au_link); + TAILQ_REMOVE(&auth_unix_all, tau, au_alllink); + auth_unix_count--; + sx_xunlock(&auth_unix_lock); + AUTH_DESTROY(tau->au_auth); + } + } + + /* + * Hash the uid to see if we already have an AUTH with this cred. + */ + h = HASHSTEP(HASHINIT, cred->cr_uid) % AUTH_UNIX_HASH_SIZE; + cru2x(cred, &xcr); +again: + sx_slock(&auth_unix_lock); + TAILQ_FOREACH(au, &auth_unix_cache[h], au_link) { + if (!memcmp(&xcr, &au->au_xcred, sizeof(xcr))) { + if (sx_try_upgrade(&auth_unix_lock)) { + /* + * Keep auth_unix_all LRU sorted. + */ + TAILQ_REMOVE(&auth_unix_all, au, au_alllink); + TAILQ_INSERT_TAIL(&auth_unix_all, au, + au_alllink); + au->au_refs++; + sx_xunlock(&auth_unix_lock); + return (au->au_auth); + } else { + sx_sunlock(&auth_unix_lock); + goto again; + } + } + } + /* * Allocate and set up auth handle */ au = NULL; auth = mem_alloc(sizeof(*auth)); -#ifndef _KERNEL - if (auth == NULL) { - printf("authunix_create: out of memory"); - goto cleanup_authunix_create; - } -#endif au = mem_alloc(sizeof(*au)); -#ifndef _KERNEL - if (au == NULL) { - printf("authunix_create: out of memory"); - goto cleanup_authunix_create; - } -#endif auth->ah_ops = &authunix_ops; auth->ah_private = (caddr_t)au; auth->ah_verf = au->au_shcred = _null_auth; + au->au_refs = 1; + au->au_xcred = xcr; au->au_shfaults = 0; au->au_origcred.oa_base = NULL; + au->au_auth = auth; getmicrotime(&now); time = now.tv_sec; @@ -141,14 +200,7 @@ authunix_create(struct ucred *cred) panic("authunix_create: failed to encode creds"); au->au_origcred.oa_length = len = XDR_GETPOS(&xdrs); au->au_origcred.oa_flavor = AUTH_UNIX; -#ifdef _KERNEL au->au_origcred.oa_base = mem_alloc((u_int) len); -#else - if ((au->au_origcred.oa_base = mem_alloc((u_int) len)) == NULL) { - printf("authunix_create: out of memory"); - goto cleanup_authunix_create; - } -#endif memcpy(au->au_origcred.oa_base, mymem, (size_t)len); /* @@ -156,18 +208,19 @@ authunix_create(struct ucred *cred) */ auth->ah_cred = au->au_origcred; marshal_new_auth(auth); - return (auth); -#ifndef _KERNEL - cleanup_authunix_create: - if (auth) - mem_free(auth, sizeof(*auth)); - if (au) { - if (au->au_origcred.oa_base) - mem_free(au->au_origcred.oa_base, (u_int)len); - mem_free(au, sizeof(*au)); + + if (sx_try_upgrade(&auth_unix_lock)) { + auth_unix_count++; + TAILQ_INSERT_TAIL(&auth_unix_cache[h], au, au_link); + TAILQ_INSERT_TAIL(&auth_unix_all, au, au_alllink); + au->au_refs++; /* one for the cache, one for user */ + sx_xunlock(&auth_unix_lock); + return (auth); + } else { + sx_sunlock(&auth_unix_lock); + AUTH_DESTROY(auth); + goto again; } - return (NULL); -#endif } /* @@ -262,8 +315,18 @@ static void authunix_destroy(AUTH *auth) { struct audata *au; + int refs; au = AUTH_PRIVATE(auth); + + sx_xlock(&auth_unix_lock); + au->au_refs--; + refs = au->au_refs; + sx_xunlock(&auth_unix_lock); + + if (refs > 0) + return; + mem_free(au->au_origcred.oa_base, au->au_origcred.oa_length); if (au->au_shcred.oa_base != NULL) diff --git a/sys/rpc/authunix_prot.c b/sys/rpc/authunix_prot.c index 3092b1f..141f594 100644 --- a/sys/rpc/authunix_prot.c +++ b/sys/rpc/authunix_prot.c @@ -68,7 +68,12 @@ xdr_authunix_parms(XDR *xdrs, uint32_t *time, struct xucred *cred) uint32_t junk; if (xdrs->x_op == XDR_ENCODE) { + /* + * Restrict name length to 255 according to RFC 1057. + */ namelen = strlen(hostname); + if (namelen > 255) + namelen = 255; } else { namelen = 0; } diff --git a/sys/rpc/clnt.h b/sys/rpc/clnt.h index 4d6a778..03e3112 100644 --- a/sys/rpc/clnt.h +++ b/sys/rpc/clnt.h @@ -62,6 +62,7 @@ #include <rpc/clnt_stat.h> #include <sys/cdefs.h> #ifdef _KERNEL +#include <sys/refcount.h> #include <rpc/netconfig.h> #else #include <netconfig.h> @@ -109,6 +110,23 @@ struct rpc_err { #define re_lb ru.RE_lb }; +#ifdef _KERNEL +/* + * Functions of this type may be used to receive notification when RPC + * calls have to be re-transmitted etc. + */ +typedef void rpc_feedback(int cmd, int procnum, void *); + +/* + * A structure used with CLNT_CALL_EXT to pass extra information used + * while processing an RPC call. + */ +struct rpc_callextra { + AUTH *rc_auth; /* auth handle to use for this call */ + rpc_feedback *rc_feedback; /* callback for retransmits etc. */ + void *rc_feedback_arg; /* argument for callback */ +}; +#endif /* * Client rpc handle. @@ -116,12 +134,35 @@ struct rpc_err { * Client is responsible for initializing auth, see e.g. auth_none.c. */ typedef struct __rpc_client { +#ifdef _KERNEL + volatile u_int cl_refs; /* reference count */ + AUTH *cl_auth; /* authenticator */ + struct clnt_ops { + /* call remote procedure */ + enum clnt_stat (*cl_call)(struct __rpc_client *, + struct rpc_callextra *, rpcproc_t, xdrproc_t, void *, + xdrproc_t, void *, struct timeval); + /* abort a call */ + void (*cl_abort)(struct __rpc_client *); + /* get specific error code */ + void (*cl_geterr)(struct __rpc_client *, + struct rpc_err *); + /* frees results */ + bool_t (*cl_freeres)(struct __rpc_client *, + xdrproc_t, void *); + /* destroy this structure */ + void (*cl_destroy)(struct __rpc_client *); + /* the ioctl() of rpc */ + bool_t (*cl_control)(struct __rpc_client *, u_int, + void *); + } *cl_ops; +#else AUTH *cl_auth; /* authenticator */ struct clnt_ops { /* call remote procedure */ enum clnt_stat (*cl_call)(struct __rpc_client *, - rpcproc_t, xdrproc_t, void *, xdrproc_t, - void *, struct timeval); + rpcproc_t, xdrproc_t, void *, xdrproc_t, + void *, struct timeval); /* abort a call */ void (*cl_abort)(struct __rpc_client *); /* get specific error code */ @@ -136,12 +177,12 @@ typedef struct __rpc_client { bool_t (*cl_control)(struct __rpc_client *, u_int, void *); } *cl_ops; +#endif void *cl_private; /* private stuff */ char *cl_netid; /* network token */ char *cl_tp; /* device name */ } CLIENT; - /* * Timers used for the pseudo-transport protocol when using datagrams */ @@ -154,8 +195,10 @@ struct rpc_timers { /* * Feedback values used for possible congestion and rate control */ -#define FEEDBACK_REXMIT1 1 /* first retransmit */ -#define FEEDBACK_OK 2 /* no retransmits */ +#define FEEDBACK_OK 1 /* no retransmits */ +#define FEEDBACK_REXMIT1 2 /* first retransmit */ +#define FEEDBACK_REXMIT2 3 /* second and subsequent retransmit */ +#define FEEDBACK_RECONNECT 4 /* client reconnect */ /* Used to set version of portmapper used in broadcast */ @@ -171,6 +214,30 @@ struct rpc_timers { * */ +#ifdef _KERNEL +#define CLNT_ACQUIRE(rh) \ + refcount_acquire(&(rh)->cl_refs) +#define CLNT_RELEASE(rh) \ + if (refcount_release(&(rh)->cl_refs)) \ + CLNT_DESTROY(rh) + +/* + * enum clnt_stat + * CLNT_CALL_EXT(rh, ext, proc, xargs, argsp, xres, resp, timeout) + * CLIENT *rh; + * struct rpc_callextra *ext; + * rpcproc_t proc; + * xdrproc_t xargs; + * void *argsp; + * xdrproc_t xres; + * void *resp; + * struct timeval timeout; + */ +#define CLNT_CALL_EXT(rh, ext, proc, xargs, argsp, xres, resp, secs) \ + ((*(rh)->cl_ops->cl_call)(rh, ext, proc, xargs, \ + argsp, xres, resp, secs)) +#endif + /* * enum clnt_stat * CLNT_CALL(rh, proc, xargs, argsp, xres, resp, timeout) @@ -182,12 +249,21 @@ struct rpc_timers { * void *resp; * struct timeval timeout; */ -#define CLNT_CALL(rh, proc, xargs, argsp, xres, resp, secs) \ - ((*(rh)->cl_ops->cl_call)(rh, proc, xargs, \ +#ifdef _KERNEL +#define CLNT_CALL(rh, proc, xargs, argsp, xres, resp, secs) \ + ((*(rh)->cl_ops->cl_call)(rh, NULL, proc, xargs, \ argsp, xres, resp, secs)) -#define clnt_call(rh, proc, xargs, argsp, xres, resp, secs) \ - ((*(rh)->cl_ops->cl_call)(rh, proc, xargs, \ +#define clnt_call(rh, proc, xargs, argsp, xres, resp, secs) \ + ((*(rh)->cl_ops->cl_call)(rh, NULL, proc, xargs, \ argsp, xres, resp, secs)) +#else +#define CLNT_CALL(rh, proc, xargs, argsp, xres, resp, secs) \ + ((*(rh)->cl_ops->cl_call)(rh, proc, xargs, \ + argsp, xres, resp, secs)) +#define clnt_call(rh, proc, xargs, argsp, xres, resp, secs) \ + ((*(rh)->cl_ops->cl_call)(rh, proc, xargs, \ + argsp, xres, resp, secs)) +#endif /* * void @@ -262,6 +338,8 @@ struct rpc_timers { #define CLGET_WAITCHAN 22 /* get string used in msleep call */ #define CLSET_INTERRUPTIBLE 23 /* set interruptible flag */ #define CLGET_INTERRUPTIBLE 24 /* set interruptible flag */ +#define CLSET_RETRIES 25 /* set retry count for reconnect */ +#define CLGET_RETRIES 26 /* get retry count for reconnect */ #endif @@ -534,6 +612,7 @@ __END_DECLS #define rpc_createerr (*(__rpc_createerr())) #endif +#ifndef _KERNEL /* * The simplified interface: * enum clnt_stat @@ -612,7 +691,6 @@ extern enum clnt_stat rpc_broadcast_exp(const rpcprog_t, const rpcvers_t, const int, const char *); __END_DECLS -#ifndef _KERNEL /* For backward compatibility */ #include <rpc/clnt_soc.h> #endif diff --git a/sys/rpc/clnt_dg.c b/sys/rpc/clnt_dg.c index c66ac50..f14e1d6 100644 --- a/sys/rpc/clnt_dg.c +++ b/sys/rpc/clnt_dg.c @@ -45,6 +45,7 @@ __FBSDID("$FreeBSD$"); #include <sys/param.h> #include <sys/systm.h> +#include <sys/kernel.h> #include <sys/lock.h> #include <sys/malloc.h> #include <sys/mbuf.h> @@ -70,8 +71,8 @@ __FBSDID("$FreeBSD$"); #endif static bool_t time_not_ok(struct timeval *); -static enum clnt_stat clnt_dg_call(CLIENT *, rpcproc_t, xdrproc_t, void *, - xdrproc_t, void *, struct timeval); +static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, + rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval); static void clnt_dg_geterr(CLIENT *, struct rpc_err *); static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); static void clnt_dg_abort(CLIENT *); @@ -91,10 +92,13 @@ static struct clnt_ops clnt_dg_ops = { static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; /* - * A pending RPC request which awaits a reply. + * A pending RPC request which awaits a reply. Requests which have + * received their reply will have cr_xid set to zero and cr_mrep to + * the mbuf chain of the reply. */ struct cu_request { TAILQ_ENTRY(cu_request) cr_link; + CLIENT *cr_client; /* owner */ uint32_t cr_xid; /* XID of request */ struct mbuf *cr_mrep; /* reply received by upcall */ int cr_error; /* any error from upcall */ @@ -123,6 +127,8 @@ struct cu_socket { * Private data kept per client handle */ struct cu_data { + int cu_threads; /* # threads in clnt_vc_call */ + bool_t cu_closing; /* TRUE if we are destroying */ struct socket *cu_socket; /* connection socket */ bool_t cu_closeit; /* opened by library */ struct sockaddr_storage cu_raddr; /* remote address */ @@ -203,10 +209,12 @@ clnt_dg_create( sendsz = ((sendsz + 3) / 4) * 4; recvsz = ((recvsz + 3) / 4) * 4; cu = mem_alloc(sizeof (*cu)); + cu->cu_threads = 0; + cu->cu_closing = FALSE; (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); cu->cu_rlen = svcaddr->sa_len; /* Other values can also be set through clnt_control() */ - cu->cu_wait.tv_sec = 15; /* heuristically chosen */ + cu->cu_wait.tv_sec = 3; /* heuristically chosen */ cu->cu_wait.tv_usec = 0; cu->cu_total.tv_sec = -1; cu->cu_total.tv_usec = -1; @@ -237,6 +245,7 @@ clnt_dg_create( */ cu->cu_closeit = FALSE; cu->cu_socket = so; + soreserve(so, 256*1024, 256*1024); SOCKBUF_LOCK(&so->so_rcv); recheck_socket: @@ -274,6 +283,7 @@ recheck_socket: } SOCKBUF_UNLOCK(&so->so_rcv); + cl->cl_refs = 1; cl->cl_ops = &clnt_dg_ops; cl->cl_private = (caddr_t)(void *)cu; cl->cl_auth = authnone_create(); @@ -291,7 +301,8 @@ err2: static enum clnt_stat clnt_dg_call( - CLIENT *cl, /* client handle */ + CLIENT *cl, /* client handle */ + struct rpc_callextra *ext, /* call metadata */ rpcproc_t proc, /* procedure number */ xdrproc_t xargs, /* xdr routine for args */ void *argsp, /* pointer to args */ @@ -301,30 +312,52 @@ clnt_dg_call( { struct cu_data *cu = (struct cu_data *)cl->cl_private; struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; + AUTH *auth; XDR xdrs; struct rpc_msg reply_msg; bool_t ok; + int retrans; /* number of re-transmits so far */ int nrefreshes = 2; /* number of times to refresh cred */ - struct timeval timeout; - struct timeval retransmit_time; - struct timeval next_sendtime, starttime, time_waited, tv; + struct timeval *tvp; + int timeout; + int retransmit_time; + int next_sendtime, starttime, time_waited, tv; struct sockaddr *sa; socklen_t salen; uint32_t xid; struct mbuf *mreq = NULL; - struct cu_request cr; + struct cu_request *cr; int error; + cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); + mtx_lock(&cs->cs_lock); - cr.cr_mrep = NULL; - cr.cr_error = 0; + if (cu->cu_closing) { + mtx_unlock(&cs->cs_lock); + free(cr, M_RPC); + return (RPC_CANTSEND); + } + cu->cu_threads++; + + if (ext) + auth = ext->rc_auth; + else + auth = cl->cl_auth; + + cr->cr_client = cl; + cr->cr_mrep = NULL; + cr->cr_error = 0; if (cu->cu_total.tv_usec == -1) { - timeout = utimeout; /* use supplied timeout */ + tvp = &utimeout; /* use supplied timeout */ } else { - timeout = cu->cu_total; /* use default timeout */ + tvp = &cu->cu_total; /* use default timeout */ } + if (tvp->tv_sec || tvp->tv_usec) + timeout = tvtohz(tvp); + else + timeout = 0; if (cu->cu_connect && !cu->cu_connected) { mtx_unlock(&cs->cs_lock); @@ -345,11 +378,11 @@ clnt_dg_call( sa = (struct sockaddr *)&cu->cu_raddr; salen = cu->cu_rlen; } - time_waited.tv_sec = 0; - time_waited.tv_usec = 0; - retransmit_time = next_sendtime = cu->cu_wait; + time_waited = 0; + retrans = 0; + retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); - getmicrotime(&starttime); + starttime = ticks; call_again: mtx_assert(&cs->cs_lock, MA_OWNED); @@ -376,7 +409,7 @@ send_again: goto get_reply; if ((! XDR_PUTINT32(&xdrs, &proc)) || - (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) || + (! AUTH_MARSHALL(auth, &xdrs)) || (! (*xargs)(&xdrs, argsp))) { cu->cu_error.re_status = RPC_CANTENCODEARGS; mtx_lock(&cs->cs_lock); @@ -384,9 +417,9 @@ send_again: } m_fixhdr(mreq); - cr.cr_xid = xid; + cr->cr_xid = xid; mtx_lock(&cs->cs_lock); - TAILQ_INSERT_TAIL(&cs->cs_pending, &cr, cr_link); + TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); mtx_unlock(&cs->cs_lock); /* @@ -406,8 +439,7 @@ send_again: mtx_lock(&cs->cs_lock); if (error) { - TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); - + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); cu->cu_error.re_errno = error; cu->cu_error.re_status = RPC_CANTSEND; goto out; @@ -415,24 +447,24 @@ send_again: /* * Check to see if we got an upcall while waiting for the - * lock. In both these cases, the request has been removed - * from cs->cs_pending. + * lock. */ - if (cr.cr_error) { - cu->cu_error.re_errno = cr.cr_error; + if (cr->cr_error) { + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); + cu->cu_error.re_errno = cr->cr_error; cu->cu_error.re_status = RPC_CANTRECV; goto out; } - if (cr.cr_mrep) { + if (cr->cr_mrep) { + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); goto got_reply; } /* * Hack to provide rpc-based message passing */ - if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { - if (cr.cr_xid) - TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); + if (timeout == 0) { + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); cu->cu_error.re_status = RPC_TIMEDOUT; goto out; } @@ -440,17 +472,23 @@ send_again: get_reply: for (;;) { /* Decide how long to wait. */ - if (timevalcmp(&next_sendtime, &timeout, <)) { + if (next_sendtime < timeout) tv = next_sendtime; - } else { + else tv = timeout; + tv -= time_waited; + + if (tv > 0) { + if (cu->cu_closing) + error = 0; + else + error = msleep(cr, &cs->cs_lock, + cu->cu_waitflag, cu->cu_waitchan, tv); + } else { + error = EWOULDBLOCK; } - timevalsub(&tv, &time_waited); - if (tv.tv_sec < 0 || tv.tv_usec < 0) - tv.tv_sec = tv.tv_usec = 0; - error = msleep(&cr, &cs->cs_lock, cu->cu_waitflag, - cu->cu_waitchan, tvtohz(&tv)); + TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); if (!error) { /* @@ -458,8 +496,8 @@ get_reply: * upcall had a receive error, report that, * otherwise we have a reply. */ - if (cr.cr_error) { - cu->cu_error.re_errno = cr.cr_error; + if (cr->cr_error) { + cu->cu_error.re_errno = cr->cr_error; cu->cu_error.re_status = RPC_CANTRECV; goto out; } @@ -472,8 +510,6 @@ get_reply: * re-send the request. */ if (error != EWOULDBLOCK) { - if (cr.cr_xid) - TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); cu->cu_error.re_errno = error; if (error == EINTR) cu->cu_error.re_status = RPC_INTR; @@ -482,29 +518,40 @@ get_reply: goto out; } - getmicrotime(&tv); - time_waited = tv; - timevalsub(&time_waited, &starttime); + time_waited = ticks - starttime; /* Check for timeout. */ - if (timevalcmp(&time_waited, &timeout, >)) { - if (cr.cr_xid) - TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); + if (time_waited > timeout) { cu->cu_error.re_errno = EWOULDBLOCK; cu->cu_error.re_status = RPC_TIMEDOUT; goto out; } /* Retransmit if necessary. */ - if (timevalcmp(&time_waited, &next_sendtime, >)) { - if (cr.cr_xid) - TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); + if (time_waited >= next_sendtime) { + if (ext && ext->rc_feedback) { + mtx_unlock(&cs->cs_lock); + if (retrans == 0) + ext->rc_feedback(FEEDBACK_REXMIT1, + proc, ext->rc_feedback_arg); + else + ext->rc_feedback(FEEDBACK_REXMIT2, + proc, ext->rc_feedback_arg); + mtx_lock(&cs->cs_lock); + } + if (cu->cu_closing) { + cu->cu_error.re_errno = ESHUTDOWN; + cu->cu_error.re_status = RPC_CANTRECV; + goto out; + } + retrans++; /* update retransmit_time */ - if (retransmit_time.tv_sec < RPC_MAX_BACKOFF) - timevaladd(&retransmit_time, &retransmit_time); - timevaladd(&next_sendtime, &retransmit_time); + if (retransmit_time < RPC_MAX_BACKOFF * hz) + retransmit_time = 2 * retransmit_time; + next_sendtime += retransmit_time; goto send_again; } + TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); } got_reply: @@ -514,10 +561,13 @@ got_reply: */ mtx_unlock(&cs->cs_lock); - xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE); + if (ext && ext->rc_feedback) + ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); + + xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); ok = xdr_replymsg(&xdrs, &reply_msg); XDR_DESTROY(&xdrs); - cr.cr_mrep = NULL; + cr->cr_mrep = NULL; mtx_lock(&cs->cs_lock); @@ -562,10 +612,17 @@ out: if (mreq) m_freem(mreq); - if (cr.cr_mrep) - m_freem(cr.cr_mrep); + if (cr->cr_mrep) + m_freem(cr->cr_mrep); + cu->cu_threads--; + if (cu->cu_closing) + wakeup(cu); + mtx_unlock(&cs->cs_lock); + + free(cr, M_RPC); + return (cu->cu_error.re_status); } @@ -732,30 +789,44 @@ clnt_dg_destroy(CLIENT *cl) { struct cu_data *cu = (struct cu_data *)cl->cl_private; struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; + struct cu_request *cr; struct socket *so = NULL; bool_t lastsocketref; - SOCKBUF_LOCK(&cu->cu_socket->so_rcv); - mtx_lock(&cs->cs_lock); + + /* + * Abort any pending requests and wait until everyone + * has finished with clnt_vc_call. + */ + cu->cu_closing = TRUE; + TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { + if (cr->cr_client == cl) { + cr->cr_xid = 0; + cr->cr_error = ESHUTDOWN; + wakeup(cr); + } + } + + while (cu->cu_threads) + msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); + cs->cs_refs--; if (cs->cs_refs == 0) { + mtx_destroy(&cs->cs_lock); + SOCKBUF_LOCK(&cu->cu_socket->so_rcv); cu->cu_socket->so_upcallarg = NULL; cu->cu_socket->so_upcall = NULL; cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL; - mtx_destroy(&cs->cs_lock); SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); mem_free(cs, sizeof(*cs)); lastsocketref = TRUE; } else { mtx_unlock(&cs->cs_lock); - SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); lastsocketref = FALSE; } - if (cu->cu_closeit) { - KASSERT(lastsocketref, ("clnt_dg_destroy(): closing a socket " - "shared with other clients")); + if (cu->cu_closeit && lastsocketref) { so = cu->cu_socket; cu->cu_socket = NULL; } @@ -812,10 +883,10 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) if (error) { mtx_lock(&cs->cs_lock); TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { + cr->cr_xid = 0; cr->cr_error = error; wakeup(cr); } - TAILQ_INIT(&cs->cs_pending); mtx_unlock(&cs->cs_lock); break; } @@ -825,7 +896,11 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) */ m = m_pullup(m, sizeof(xid)); if (!m) - break; + /* + * Should never happen. + */ + continue; + xid = ntohl(*mtod(m, uint32_t *)); /* @@ -836,14 +911,13 @@ clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { if (cr->cr_xid == xid) { /* - * This one matches. We snip it out of - * the pending list and leave the + * This one matches. We leave the * reply mbuf in cr->cr_mrep. Set the - * XID to zero so that clnt_dg_call - * can know not to repeat the - * TAILQ_REMOVE. + * XID to zero so that we will ignore + * any duplicated replies that arrive + * before clnt_dg_call removes it from + * the queue. */ - TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); cr->cr_xid = 0; cr->cr_mrep = m; cr->cr_error = 0; diff --git a/sys/rpc/clnt_rc.c b/sys/rpc/clnt_rc.c index 8b5fc26..a6b2dfd 100644 --- a/sys/rpc/clnt_rc.c +++ b/sys/rpc/clnt_rc.c @@ -30,6 +30,7 @@ __FBSDID("$FreeBSD$"); #include <sys/param.h> #include <sys/systm.h> +#include <sys/limits.h> #include <sys/lock.h> #include <sys/malloc.h> #include <sys/mbuf.h> @@ -44,8 +45,8 @@ __FBSDID("$FreeBSD$"); #include <rpc/rpc.h> #include <rpc/rpc_com.h> -static enum clnt_stat clnt_reconnect_call(CLIENT *, rpcproc_t, - xdrproc_t, void *, xdrproc_t, void *, struct timeval); +static enum clnt_stat clnt_reconnect_call(CLIENT *, struct rpc_callextra *, + rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval); static void clnt_reconnect_geterr(CLIENT *, struct rpc_err *); static bool_t clnt_reconnect_freeres(CLIENT *, xdrproc_t, void *); static void clnt_reconnect_abort(CLIENT *); @@ -62,6 +63,7 @@ static struct clnt_ops clnt_reconnect_ops = { }; struct rc_data { + struct mtx rc_lock; struct sockaddr_storage rc_addr; /* server address */ struct netconfig* rc_nconf; /* network type */ rpcprog_t rc_prog; /* program number */ @@ -70,8 +72,10 @@ struct rc_data { size_t rc_recvsz; struct timeval rc_timeout; struct timeval rc_retry; + int rc_retries; const char *rc_waitchan; int rc_intr; + int rc_connecting; CLIENT* rc_client; /* underlying RPC client */ }; @@ -94,6 +98,7 @@ clnt_reconnect_create( cl = mem_alloc(sizeof (CLIENT)); rc = mem_alloc(sizeof (*rc)); + mtx_init(&rc->rc_lock, "rc->rc_lock", NULL, MTX_DEF); (void) memcpy(&rc->rc_addr, svcaddr, (size_t)svcaddr->sa_len); rc->rc_nconf = nconf; rc->rc_prog = program; @@ -102,12 +107,15 @@ clnt_reconnect_create( rc->rc_recvsz = recvsz; rc->rc_timeout.tv_sec = -1; rc->rc_timeout.tv_usec = -1; - rc->rc_retry.tv_sec = 15; + rc->rc_retry.tv_sec = 3; rc->rc_retry.tv_usec = 0; + rc->rc_retries = INT_MAX; rc->rc_waitchan = "rpcrecv"; rc->rc_intr = 0; + rc->rc_connecting = FALSE; rc->rc_client = NULL; + cl->cl_refs = 1; cl->cl_ops = &clnt_reconnect_ops; cl->cl_private = (caddr_t)(void *)rc; cl->cl_auth = authnone_create(); @@ -121,13 +129,39 @@ clnt_reconnect_connect(CLIENT *cl) { struct rc_data *rc = (struct rc_data *)cl->cl_private; struct socket *so; + enum clnt_stat stat; + int error; int one = 1; + mtx_lock(&rc->rc_lock); +again: + if (rc->rc_connecting) { + while (!rc->rc_client) { + error = msleep(rc, &rc->rc_lock, + rc->rc_intr ? PCATCH : 0, "rpcrecon", 0); + if (error) { + mtx_unlock(&rc->rc_lock); + return (RPC_INTR); + } + } + /* + * If the other guy failed to connect, we might as + * well have another go. + */ + if (!rc->rc_client && !rc->rc_connecting) + goto again; + mtx_unlock(&rc->rc_lock); + return (RPC_SUCCESS); + } else { + rc->rc_connecting = TRUE; + } + mtx_unlock(&rc->rc_lock); + so = __rpc_nconf2socket(rc->rc_nconf); if (!so) { - rpc_createerr.cf_stat = RPC_TLIERROR; + stat = rpc_createerr.cf_stat = RPC_TLIERROR; rpc_createerr.cf_error.re_errno = 0; - return (RPC_TLIERROR); + goto out; } if (rc->rc_nconf->nc_semantics == NC_TPI_CLTS) @@ -139,8 +173,10 @@ clnt_reconnect_connect(CLIENT *cl) (struct sockaddr *) &rc->rc_addr, rc->rc_prog, rc->rc_vers, rc->rc_sendsz, rc->rc_recvsz); - if (!rc->rc_client) - return (rpc_createerr.cf_stat); + if (!rc->rc_client) { + stat = rpc_createerr.cf_stat; + goto out; + } CLNT_CONTROL(rc->rc_client, CLSET_FD_CLOSE, 0); CLNT_CONTROL(rc->rc_client, CLSET_CONNECT, &one); @@ -148,13 +184,21 @@ clnt_reconnect_connect(CLIENT *cl) CLNT_CONTROL(rc->rc_client, CLSET_RETRY_TIMEOUT, &rc->rc_retry); CLNT_CONTROL(rc->rc_client, CLSET_WAITCHAN, &rc->rc_waitchan); CLNT_CONTROL(rc->rc_client, CLSET_INTERRUPTIBLE, &rc->rc_intr); + stat = RPC_SUCCESS; + +out: + mtx_lock(&rc->rc_lock); + rc->rc_connecting = FALSE; + wakeup(rc); + mtx_unlock(&rc->rc_lock); - return (RPC_SUCCESS); + return (stat); } static enum clnt_stat clnt_reconnect_call( - CLIENT *cl, /* client handle */ + CLIENT *cl, /* client handle */ + struct rpc_callextra *ext, /* call metadata */ rpcproc_t proc, /* procedure number */ xdrproc_t xargs, /* xdr routine for args */ void *argsp, /* pointer to args */ @@ -163,8 +207,11 @@ clnt_reconnect_call( struct timeval utimeout) /* seconds to wait before giving up */ { struct rc_data *rc = (struct rc_data *)cl->cl_private; + CLIENT *client; enum clnt_stat stat; + int tries; + tries = 0; do { if (!rc->rc_client) { stat = clnt_reconnect_connect(cl); @@ -172,9 +219,14 @@ clnt_reconnect_call( return (stat); } - stat = CLNT_CALL(rc->rc_client, proc, xargs, argsp, + mtx_lock(&rc->rc_lock); + CLNT_ACQUIRE(rc->rc_client); + client = rc->rc_client; + mtx_unlock(&rc->rc_lock); + stat = CLNT_CALL_EXT(client, ext, proc, xargs, argsp, xresults, resultsp, utimeout); + CLNT_RELEASE(client); if (stat == RPC_TIMEDOUT) { /* * Check for async send misfeature for NLM @@ -184,16 +236,33 @@ clnt_reconnect_call( && rc->rc_timeout.tv_usec == 0) || (rc->rc_timeout.tv_sec == -1 && utimeout.tv_sec == 0 - && utimeout.tv_usec == 0)) + && utimeout.tv_usec == 0)) { break; + } } if (stat == RPC_INTR) break; if (stat != RPC_SUCCESS) { - CLNT_DESTROY(rc->rc_client); - rc->rc_client = NULL; + tries++; + if (tries >= rc->rc_retries) + break; + + if (ext && ext->rc_feedback) + ext->rc_feedback(FEEDBACK_RECONNECT, proc, + ext->rc_feedback_arg); + + mtx_lock(&rc->rc_lock); + /* + * Make sure that someone else hasn't already + * reconnected. + */ + if (rc->rc_client == client) { + CLNT_RELEASE(rc->rc_client); + rc->rc_client = NULL; + } + mtx_unlock(&rc->rc_lock); } } while (stat != RPC_SUCCESS); @@ -294,6 +363,14 @@ clnt_reconnect_control(CLIENT *cl, u_int request, void *info) *(int *) info = rc->rc_intr; break; + case CLSET_RETRIES: + rc->rc_retries = *(int *) info; + break; + + case CLGET_RETRIES: + *(int *) info = rc->rc_retries; + break; + default: return (FALSE); } diff --git a/sys/rpc/clnt_vc.c b/sys/rpc/clnt_vc.c index 5731e1e..cb09352 100644 --- a/sys/rpc/clnt_vc.c +++ b/sys/rpc/clnt_vc.c @@ -80,8 +80,8 @@ struct cmessage { struct cmsgcred cmcred; }; -static enum clnt_stat clnt_vc_call(CLIENT *, rpcproc_t, xdrproc_t, void *, - xdrproc_t, void *, struct timeval); +static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *, + rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval); static void clnt_vc_geterr(CLIENT *, struct rpc_err *); static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *); static void clnt_vc_abort(CLIENT *); @@ -100,7 +100,9 @@ static struct clnt_ops clnt_vc_ops = { }; /* - * A pending RPC request which awaits a reply. + * A pending RPC request which awaits a reply. Requests which have + * received their reply will have cr_xid set to zero and cr_mrep to + * the mbuf chain of the reply. */ struct ct_request { TAILQ_ENTRY(ct_request) cr_link; @@ -113,6 +115,8 @@ TAILQ_HEAD(ct_request_list, ct_request); struct ct_data { struct mtx ct_lock; + int ct_threads; /* number of threads in clnt_vc_call */ + bool_t ct_closing; /* TRUE if we are destroying client */ struct socket *ct_socket; /* connection socket */ bool_t ct_closeit; /* close it on destroy */ struct timeval ct_wait; /* wait interval in milliseconds */ @@ -161,7 +165,7 @@ clnt_vc_create( static uint32_t disrupt; struct __rpc_sockinfo si; XDR xdrs; - int error; + int error, interrupted; if (disrupt == 0) disrupt = (uint32_t)(long)raddr; @@ -170,10 +174,31 @@ clnt_vc_create( ct = (struct ct_data *)mem_alloc(sizeof (*ct)); mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF); + ct->ct_threads = 0; + ct->ct_closing = FALSE; if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) { error = soconnect(so, raddr, curthread); + SOCK_LOCK(so); + interrupted = 0; + while ((so->so_state & SS_ISCONNECTING) + && so->so_error == 0) { + error = msleep(&so->so_timeo, SOCK_MTX(so), + PSOCK | PCATCH, "connec", 0); + if (error) { + if (error == EINTR || error == ERESTART) + interrupted = 1; + break; + } + } + if (error == 0) { + error = so->so_error; + so->so_error = 0; + } + SOCK_UNLOCK(so); if (error) { + if (!interrupted) + so->so_state &= ~SS_ISCONNECTING; rpc_createerr.cf_stat = RPC_SYSTEMERROR; rpc_createerr.cf_error.re_errno = error; goto err; @@ -224,6 +249,7 @@ clnt_vc_create( * Create a client handle which uses xdrrec for serialization * and authnone for authentication. */ + cl->cl_refs = 1; cl->cl_ops = &clnt_vc_ops; cl->cl_private = ct; cl->cl_auth = authnone_create(); @@ -255,6 +281,7 @@ err: static enum clnt_stat clnt_vc_call( CLIENT *cl, + struct rpc_callextra *ext, rpcproc_t proc, xdrproc_t xdr_args, void *args_ptr, @@ -263,6 +290,7 @@ clnt_vc_call( struct timeval utimeout) { struct ct_data *ct = (struct ct_data *) cl->cl_private; + AUTH *auth; XDR xdrs; struct rpc_msg reply_msg; bool_t ok; @@ -270,13 +298,27 @@ clnt_vc_call( struct timeval timeout; uint32_t xid; struct mbuf *mreq = NULL; - struct ct_request cr; + struct ct_request *cr; int error; + cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK); + mtx_lock(&ct->ct_lock); - cr.cr_mrep = NULL; - cr.cr_error = 0; + if (ct->ct_closing) { + mtx_unlock(&ct->ct_lock); + free(cr, M_RPC); + return (RPC_CANTSEND); + } + ct->ct_threads++; + + if (ext) + auth = ext->rc_auth; + else + auth = cl->cl_auth; + + cr->cr_mrep = NULL; + cr->cr_error = 0; if (ct->ct_wait.tv_usec == -1) { timeout = utimeout; /* use supplied timeout */ @@ -311,12 +353,12 @@ call_again: ct->ct_error.re_status = RPC_SUCCESS; if ((! XDR_PUTINT32(&xdrs, &proc)) || - (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) || + (! AUTH_MARSHALL(auth, &xdrs)) || (! (*xdr_args)(&xdrs, args_ptr))) { if (ct->ct_error.re_status == RPC_SUCCESS) ct->ct_error.re_status = RPC_CANTENCODEARGS; - m_freem(mreq); - return (ct->ct_error.re_status); + mtx_lock(&ct->ct_lock); + goto out; } m_fixhdr(mreq); @@ -327,9 +369,9 @@ call_again: *mtod(mreq, uint32_t *) = htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t))); - cr.cr_xid = xid; + cr->cr_xid = xid; mtx_lock(&ct->ct_lock); - TAILQ_INSERT_TAIL(&ct->ct_pending, &cr, cr_link); + TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link); mtx_unlock(&ct->ct_lock); /* @@ -343,10 +385,8 @@ call_again: reply_msg.acpted_rply.ar_results.proc = xdr_results; mtx_lock(&ct->ct_lock); - if (error) { - TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link); - + TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); ct->ct_error.re_errno = error; ct->ct_error.re_status = RPC_CANTSEND; goto out; @@ -357,12 +397,14 @@ call_again: * lock. In both these cases, the request has been removed * from ct->ct_pending. */ - if (cr.cr_error) { - ct->ct_error.re_errno = cr.cr_error; + if (cr->cr_error) { + TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); + ct->ct_error.re_errno = cr->cr_error; ct->ct_error.re_status = RPC_CANTRECV; goto out; } - if (cr.cr_mrep) { + if (cr->cr_mrep) { + TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); goto got_reply; } @@ -370,23 +412,22 @@ call_again: * Hack to provide rpc-based message passing */ if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { - if (cr.cr_xid) - TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link); + TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); ct->ct_error.re_status = RPC_TIMEDOUT; goto out; } - error = msleep(&cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan, + error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan, tvtohz(&timeout)); + TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); + if (error) { /* * The sleep returned an error so our request is still * on the list. Turn the error code into an * appropriate client status. */ - if (cr.cr_xid) - TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link); ct->ct_error.re_errno = error; switch (error) { case EINTR: @@ -405,8 +446,8 @@ call_again: * upcall had a receive error, report that, * otherwise we have a reply. */ - if (cr.cr_error) { - ct->ct_error.re_errno = cr.cr_error; + if (cr->cr_error) { + ct->ct_error.re_errno = cr->cr_error; ct->ct_error.re_status = RPC_CANTRECV; goto out; } @@ -419,10 +460,10 @@ got_reply: */ mtx_unlock(&ct->ct_lock); - xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE); + xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); ok = xdr_replymsg(&xdrs, &reply_msg); XDR_DESTROY(&xdrs); - cr.cr_mrep = NULL; + cr->cr_mrep = NULL; mtx_lock(&ct->ct_lock); @@ -466,10 +507,17 @@ out: if (mreq) m_freem(mreq); - if (cr.cr_mrep) - m_freem(cr.cr_mrep); + if (cr->cr_mrep) + m_freem(cr->cr_mrep); + ct->ct_threads--; + if (ct->ct_closing) + wakeup(ct); + mtx_unlock(&ct->ct_lock); + + free(cr, M_RPC); + return (ct->ct_error.re_status); } @@ -628,6 +676,7 @@ static void clnt_vc_destroy(CLIENT *cl) { struct ct_data *ct = (struct ct_data *) cl->cl_private; + struct ct_request *cr; struct socket *so = NULL; mtx_lock(&ct->ct_lock); @@ -639,8 +688,19 @@ clnt_vc_destroy(CLIENT *cl) ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL; SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); - KASSERT(!TAILQ_FIRST(&ct->ct_pending), - ("Destroying RPC client with pending RPC requests")); + /* + * Abort any pending requests and wait until everyone + * has finished with clnt_vc_call. + */ + ct->ct_closing = TRUE; + TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { + cr->cr_xid = 0; + cr->cr_error = ESHUTDOWN; + wakeup(cr); + } + + while (ct->ct_threads) + msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); if (ct->ct_closeit) { so = ct->ct_socket; @@ -732,7 +792,6 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) cr->cr_error = error; wakeup(cr); } - TAILQ_INIT(&ct->ct_pending); mtx_unlock(&ct->ct_lock); break; } @@ -795,19 +854,14 @@ clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) if (cr->cr_xid == xid) { /* * This one - * matches. We snip it - * out of the pending - * list and leave the - * reply mbuf in + * matches. We leave + * the reply mbuf in * cr->cr_mrep. Set * the XID to zero so - * that clnt_vc_call - * can know not to - * repeat the - * TAILQ_REMOVE. + * that we will ignore + * any duplicaed + * replies. */ - TAILQ_REMOVE(&ct->ct_pending, - cr, cr_link); cr->cr_xid = 0; cr->cr_mrep = ct->ct_record; cr->cr_error = 0; diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c index 54edfd0..47530da 100644 --- a/sys/rpc/svc_vc.c +++ b/sys/rpc/svc_vc.c @@ -132,6 +132,15 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, struct sockaddr* sa; int error; + if (so->so_state & SS_ISCONNECTED) { + error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa); + if (error) + return (NULL); + xprt = svc_vc_create_conn(pool, so, sa); + free(sa, M_SONAME); + return (xprt); + } + xprt = mem_alloc(sizeof(SVCXPRT)); mtx_init(&xprt->xp_lock, "xprt->xp_lock", NULL, MTX_DEF); xprt->xp_pool = pool; @@ -180,8 +189,32 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) SVCXPRT *xprt = NULL; struct cf_conn *cd = NULL; struct sockaddr* sa = NULL; + struct sockopt opt; + int one = 1; int error; + bzero(&opt, sizeof(struct sockopt)); + opt.sopt_dir = SOPT_SET; + opt.sopt_level = SOL_SOCKET; + opt.sopt_name = SO_KEEPALIVE; + opt.sopt_val = &one; + opt.sopt_valsize = sizeof(one); + error = sosetopt(so, &opt); + if (error) + return (NULL); + + if (so->so_proto->pr_protocol == IPPROTO_TCP) { + bzero(&opt, sizeof(struct sockopt)); + opt.sopt_dir = SOPT_SET; + opt.sopt_level = IPPROTO_TCP; + opt.sopt_name = TCP_NODELAY; + opt.sopt_val = &one; + opt.sopt_valsize = sizeof(one); + error = sosetopt(so, &opt); + if (error) + return (NULL); + } + cd = mem_alloc(sizeof(*cd)); cd->strm_stat = XPRT_IDLE; @@ -306,8 +339,6 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg) { struct socket *so = NULL; struct sockaddr *sa = NULL; - struct sockopt opt; - int one = 1; int error; /* @@ -351,16 +382,6 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg) sa = 0; error = soaccept(so, &sa); - if (!error) { - bzero(&opt, sizeof(struct sockopt)); - opt.sopt_dir = SOPT_SET; - opt.sopt_level = IPPROTO_TCP; - opt.sopt_name = TCP_NODELAY; - opt.sopt_val = &one; - opt.sopt_valsize = sizeof(one); - error = sosetopt(so, &opt); - } - if (error) { /* * XXX not sure if I need to call sofree or soclose here. @@ -374,7 +395,9 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg) * svc_vc_create_conn will call xprt_register - we don't need * to do anything with the new connection. */ - svc_vc_create_conn(xprt->xp_pool, so, sa); + if (!svc_vc_create_conn(xprt->xp_pool, so, sa)) + soclose(so); + free(sa, M_SONAME); return (FALSE); /* there is never an rpc msg to be processed */ |