diff options
Diffstat (limited to 'lib/libc/rpc')
-rw-r--r-- | lib/libc/rpc/rpc_com.h | 3 | ||||
-rw-r--r-- | lib/libc/rpc/svc.c | 67 | ||||
-rw-r--r-- | lib/libc/rpc/svc_run.c | 10 | ||||
-rw-r--r-- | lib/libc/rpc/svc_vc.c | 228 |
4 files changed, 259 insertions, 49 deletions
diff --git a/lib/libc/rpc/rpc_com.h b/lib/libc/rpc/rpc_com.h index 9686e11..1f1fd9e 100644 --- a/lib/libc/rpc/rpc_com.h +++ b/lib/libc/rpc/rpc_com.h @@ -84,6 +84,9 @@ bool_t __rpc_control(int,void *); char *_get_next_token(char *, int); +SVCXPRT **__svc_xports; +int __svc_maxrec; + __END_DECLS #endif /* _RPC_RPCCOM_H */ diff --git a/lib/libc/rpc/svc.c b/lib/libc/rpc/svc.c index c4584ed..9d7435e 100644 --- a/lib/libc/rpc/svc.c +++ b/lib/libc/rpc/svc.c @@ -63,8 +63,6 @@ __FBSDID("$FreeBSD$"); #include "rpc_com.h" -static SVCXPRT **xports; - #define RQCRED_SIZE 400 /* this size is excessive */ #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ @@ -91,6 +89,7 @@ extern rwlock_t svc_fd_lock; static struct svc_callout *svc_find(rpcprog_t, rpcvers_t, struct svc_callout **, char *); +static void __xprt_do_unregister (SVCXPRT *xprt, bool_t dolock); /* *************** SVCXPRT related stuff **************** */ @@ -108,27 +107,40 @@ xprt_register(xprt) sock = xprt->xp_fd; rwlock_wrlock(&svc_fd_lock); - if (xports == NULL) { - xports = (SVCXPRT **) + if (__svc_xports == NULL) { + __svc_xports = (SVCXPRT **) mem_alloc(FD_SETSIZE * sizeof(SVCXPRT *)); - if (xports == NULL) + if (__svc_xports == NULL) return; - memset(xports, '\0', FD_SETSIZE * sizeof(SVCXPRT *)); + memset(__svc_xports, '\0', FD_SETSIZE * sizeof(SVCXPRT *)); } if (sock < FD_SETSIZE) { - xports[sock] = xprt; + __svc_xports[sock] = xprt; FD_SET(sock, &svc_fdset); svc_maxfd = max(svc_maxfd, sock); } rwlock_unlock(&svc_fd_lock); } +void +xprt_unregister(SVCXPRT *xprt) +{ + __xprt_do_unregister(xprt, TRUE); +} + +void +__xprt_unregister_unlocked(SVCXPRT *xprt) +{ + __xprt_do_unregister(xprt, FALSE); +} + /* * De-activate a transport handle. */ -void -xprt_unregister(xprt) +static void +__xprt_do_unregister(xprt, dolock) SVCXPRT *xprt; + bool_t dolock; { int sock; @@ -136,17 +148,19 @@ xprt_unregister(xprt) sock = xprt->xp_fd; - rwlock_wrlock(&svc_fd_lock); - if ((sock < FD_SETSIZE) && (xports[sock] == xprt)) { - xports[sock] = NULL; + if (dolock) + rwlock_wrlock(&svc_fd_lock); + if ((sock < FD_SETSIZE) && (__svc_xports[sock] == xprt)) { + __svc_xports[sock] = NULL; FD_CLR(sock, &svc_fdset); if (sock >= svc_maxfd) { for (svc_maxfd--; svc_maxfd>=0; svc_maxfd--) - if (xports[svc_maxfd]) + if (__svc_xports[svc_maxfd]) break; } } - rwlock_unlock(&svc_fd_lock); + if (dolock) + rwlock_unlock(&svc_fd_lock); } /* @@ -611,7 +625,7 @@ svc_getreq_common(fd) r.rq_clntcred = &(cred_area[2*MAX_AUTH_BYTES]); rwlock_rdlock(&svc_fd_lock); - xprt = xports[fd]; + xprt = __svc_xports[fd]; rwlock_unlock(&svc_fd_lock); if (xprt == NULL) /* But do we control sock? */ @@ -667,7 +681,7 @@ svc_getreq_common(fd) * If so, then break. */ rwlock_rdlock(&svc_fd_lock); - if (xprt != xports[fd]) { + if (xprt != __svc_xports[fd]) { rwlock_unlock(&svc_fd_lock); break; } @@ -715,3 +729,24 @@ svc_getreq_poll(pfdp, pollretval) } } } + +bool_t +rpc_control(int what, void *arg) +{ + int val; + + switch (what) { + case RPC_SVC_CONNMAXREC_SET: + val = *(int *)arg; + if (val <= 0) + return FALSE; + __svc_maxrec = val; + return TRUE; + case RPC_SVC_CONNMAXREC_GET: + *(int *)arg = __svc_maxrec; + return TRUE; + default: + break; + } + return FALSE; +} diff --git a/lib/libc/rpc/svc_run.c b/lib/libc/rpc/svc_run.c index 6d13ee0..53d73f3 100644 --- a/lib/libc/rpc/svc_run.c +++ b/lib/libc/rpc/svc_run.c @@ -55,14 +55,19 @@ __FBSDID("$FreeBSD$"); void svc_run() { - fd_set readfds; + fd_set readfds, cleanfds; + struct timeval timeout; extern rwlock_t svc_fd_lock; + timeout.tv_sec = 30; + timeout.tv_usec = 0; + for (;;) { rwlock_rdlock(&svc_fd_lock); readfds = svc_fdset; + cleanfds = svc_fdset; rwlock_unlock(&svc_fd_lock); - switch (_select(svc_maxfd+1, &readfds, NULL, NULL, NULL)) { + switch (select(svc_maxfd+1, &readfds, NULL, NULL, &timeout)) { case -1: FD_ZERO(&readfds); if (errno == EINTR) { @@ -71,6 +76,7 @@ svc_run() _warn("svc_run: - select failed"); return; case 0: + __svc_clean_idle(&cleanfds, 30, FALSE); continue; default: svc_getreqset(&readfds); diff --git a/lib/libc/rpc/svc_vc.c b/lib/libc/rpc/svc_vc.c index e6cc187..ee802e2 100644 --- a/lib/libc/rpc/svc_vc.c +++ b/lib/libc/rpc/svc_vc.c @@ -51,6 +51,7 @@ __FBSDID("$FreeBSD$"); #include <sys/poll.h> #include <sys/socket.h> #include <sys/un.h> +#include <sys/time.h> #include <sys/uio.h> #include <netinet/in.h> #include <netinet/tcp.h> @@ -58,6 +59,7 @@ __FBSDID("$FreeBSD$"); #include <assert.h> #include <err.h> #include <errno.h> +#include <fcntl.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -73,10 +75,13 @@ struct cmessage { struct cmsgcred cmcred; }; +extern rwlock_t svc_fd_lock; + static SVCXPRT *makefd_xprt(int, u_int, u_int); static bool_t rendezvous_request(SVCXPRT *, struct rpc_msg *); static enum xprt_stat rendezvous_stat(SVCXPRT *); static void svc_vc_destroy(SVCXPRT *); +static void __svc_vc_dodestroy (SVCXPRT *); static int read_vc(void *, void *, int); static int write_vc(void *, void *, int); static enum xprt_stat svc_vc_stat(SVCXPRT *); @@ -87,12 +92,15 @@ static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *); static void svc_vc_rendezvous_ops(SVCXPRT *); static void svc_vc_ops(SVCXPRT *); static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in); +static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, + void *in); static int __msgread_withcred(int, void *, size_t, struct cmessage *); static int __msgwrite(int, void *, size_t); struct cf_rendezvous { /* kept in xprt->xp_p1 for rendezvouser */ u_int sendsize; u_int recvsize; + int maxrec; }; struct cf_conn { /* kept in xprt->xp_p1 for actual connection */ @@ -100,6 +108,11 @@ struct cf_conn { /* kept in xprt->xp_p1 for actual connection */ u_int32_t x_id; XDR xdrs; char verf_body[MAX_AUTH_BYTES]; + u_int sendsize; + u_int recvsize; + int maxrec; + bool_t nonblock; + struct timeval last_recv_time; }; /* @@ -139,6 +152,7 @@ svc_vc_create(fd, sendsize, recvsize) return NULL; r->sendsize = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsize); r->recvsize = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsize); + r->maxrec = __svc_maxrec; xprt = mem_alloc(sizeof(SVCXPRT)); if (xprt == NULL) { warnx("svc_vc_create: out of memory"); @@ -285,11 +299,14 @@ rendezvous_request(xprt, msg) SVCXPRT *xprt; struct rpc_msg *msg; { - int sock; + int sock, flags; struct cf_rendezvous *r; + struct cf_conn *cd; struct sockaddr_storage addr; socklen_t len; struct __rpc_sockinfo si; + SVCXPRT *newxprt; + fd_set cleanfds; assert(xprt != NULL); assert(msg != NULL); @@ -301,21 +318,30 @@ again: &len)) < 0) { if (errno == EINTR) goto again; - return (FALSE); + /* + * Clean out the most idle file descriptor when we're + * running out. + */ + if (errno == EMFILE || errno == ENFILE) { + cleanfds = svc_fdset; + __svc_clean_idle(&cleanfds, 0, FALSE); + goto again; + } + return (FALSE); } /* * make a new transporter (re-uses xprt) */ - xprt = makefd_xprt(sock, r->sendsize, r->recvsize); - xprt->xp_rtaddr.buf = mem_alloc(len); - if (xprt->xp_rtaddr.buf == NULL) + newxprt = makefd_xprt(sock, r->sendsize, r->recvsize); + newxprt->xp_rtaddr.buf = mem_alloc(len); + if (newxprt->xp_rtaddr.buf == NULL) return (FALSE); - memcpy(xprt->xp_rtaddr.buf, &addr, len); - xprt->xp_rtaddr.len = len; + memcpy(newxprt->xp_rtaddr.buf, &addr, len); + newxprt->xp_rtaddr.len = len; #ifdef PORTMAP if (addr.ss_family == AF_INET || addr.ss_family == AF_LOCAL) { - xprt->xp_raddr = *(struct sockaddr_in *)xprt->xp_rtaddr.buf; - xprt->xp_addrlen = sizeof (struct sockaddr_in); + newxprt->xp_raddr = *(struct sockaddr_in *)newxprt->xp_rtaddr.buf; + newxprt->xp_addrlen = sizeof (struct sockaddr_in); } #endif /* PORTMAP */ if (__rpc_fd2sockinfo(sock, &si) && si.si_proto == IPPROTO_TCP) { @@ -323,6 +349,28 @@ again: /* XXX fvdl - is this useful? */ _setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &len, sizeof (len)); } + + cd = (struct cf_conn *)newxprt->xp_p1; + + cd->recvsize = r->recvsize; + cd->sendsize = r->sendsize; + cd->maxrec = r->maxrec; + + if (cd->maxrec != 0) { + flags = fcntl(sock, F_GETFL, 0); + if (flags == -1) + return (FALSE); + if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) + return (FALSE); + if (cd->recvsize > cd->maxrec) + cd->recvsize = cd->maxrec; + cd->nonblock = TRUE; + __xdrrec_setnonblock(&cd->xdrs, cd->maxrec); + } else + cd->nonblock = FALSE; + + gettimeofday(&cd->last_recv_time, NULL); + return (FALSE); /* there is never an rpc msg to be processed */ } @@ -339,14 +387,21 @@ static void svc_vc_destroy(xprt) SVCXPRT *xprt; { + assert(xprt != NULL); + + xprt_unregister(xprt); + __svc_vc_dodestroy(xprt); +} + +static void +__svc_vc_dodestroy(xprt) + SVCXPRT *xprt; +{ struct cf_conn *cd; struct cf_rendezvous *r; - assert(xprt != NULL); - cd = (struct cf_conn *)xprt->xp_p1; - xprt_unregister(xprt); if (xprt->xp_fd != RPC_ANYFD) (void)_close(xprt->xp_fd); if (xprt->xp_port != 0) { @@ -380,6 +435,30 @@ svc_vc_control(xprt, rq, in) return (FALSE); } +static bool_t +svc_vc_rendezvous_control(xprt, rq, in) + SVCXPRT *xprt; + const u_int rq; + void *in; +{ + struct cf_rendezvous *cfp; + + cfp = (struct cf_rendezvous *)xprt->xp_p1; + if (cfp == NULL) + return (FALSE); + switch (rq) { + case SVCGET_CONNMAXREC: + *(int *)in = cfp->maxrec; + break; + case SVCSET_CONNMAXREC: + cfp->maxrec = *(int *)in; + break; + default: + return (FALSE); + } + return (TRUE); +} + /* * reads data from the tcp or uip connection. * any error is fatal and the connection is closed. @@ -399,12 +478,28 @@ read_vc(xprtp, buf, len) struct pollfd pollfd; struct sockaddr *sa; struct cmessage *cm; + struct cf_conn *cfp; xprt = (SVCXPRT *)xprtp; assert(xprt != NULL); sock = xprt->xp_fd; + cfp = (struct cf_conn *)xprt->xp_p1; + + if (cfp->nonblock) { + len = read(sock, buf, (size_t)len); + if (len < 0) { + if (errno == EAGAIN) + len = 0; + else + goto fatal_err; + } + if (len != 0) + gettimeofday(&cfp->last_recv_time, NULL); + return len; + } + do { pollfd.fd = sock; pollfd.events = POLLIN; @@ -432,8 +527,10 @@ read_vc(xprtp, buf, len) } else goto fatal_err; } else { - if ((len = _read(sock, buf, (size_t)len)) > 0) + if ((len = read(sock, buf, (size_t)len)) > 0) { + gettimeofday(&cfp->last_recv_time, NULL); return (len); + } } fatal_err: @@ -454,27 +551,41 @@ write_vc(xprtp, buf, len) SVCXPRT *xprt; int i, cnt; struct sockaddr *sa; + struct cf_conn *cd; + struct timeval tv0, tv1; xprt = (SVCXPRT *)xprtp; assert(xprt != NULL); + + cd = (struct cf_conn *)xprt->xp_p1; + + if (cd->nonblock) + gettimeofday(&tv0, NULL); sa = (struct sockaddr *)xprt->xp_rtaddr.buf; - if (sa->sa_family == AF_LOCAL) { - for (cnt = len; cnt > 0; cnt -= i, buf += i) { - if ((i = __msgwrite(xprt->xp_fd, buf, - (size_t)cnt)) < 0) { - ((struct cf_conn *)(xprt->xp_p1))->strm_stat = - XPRT_DIED; + for (cnt = len; cnt > 0; cnt -= i, buf += i) { + if (sa->sa_family == AF_LOCAL) + i = __msgwrite(xprt->xp_fd, buf, (size_t)cnt); + else + i = _write(xprt->xp_fd, buf, (size_t)cnt); + if (i < 0) { + if (errno != EAGAIN || !cd->nonblock) { + cd->strm_stat = XPRT_DIED; return (-1); } - } - } else { - for (cnt = len; cnt > 0; cnt -= i, buf += i) { - if ((i = _write(xprt->xp_fd, buf, - (size_t)cnt)) < 0) { - ((struct cf_conn *)(xprt->xp_p1))->strm_stat = - XPRT_DIED; - return (-1); + if (cd->nonblock && i != cnt) { + /* + * For non-blocking connections, do not + * take more than 2 seconds writing the + * data out. + * + * XXX 2 is an arbitrary amount. + */ + gettimeofday(&tv1, NULL); + if (tv1.tv_sec - tv0.tv_sec >= 2) { + cd->strm_stat = XPRT_DIED; + return (-1); + } } } } @@ -513,6 +624,11 @@ svc_vc_recv(xprt, msg) cd = (struct cf_conn *)(xprt->xp_p1); xdrs = &(cd->xdrs); + if (cd->nonblock) { + if (!__xdrrec_getrec(xdrs, &cd->strm_stat, TRUE)) + return FALSE; + } + xdrs->x_op = XDR_DECODE; (void)xdrrec_skiprecord(xdrs); if (xdr_callmsg(xdrs, msg)) { @@ -560,7 +676,7 @@ svc_vc_reply(xprt, msg) { struct cf_conn *cd; XDR *xdrs; - bool_t stat; + bool_t rstat; assert(xprt != NULL); assert(msg != NULL); @@ -570,9 +686,9 @@ svc_vc_reply(xprt, msg) xdrs->x_op = XDR_ENCODE; msg->rm_xid = cd->x_id; - stat = xdr_replymsg(xdrs, msg); + rstat = xdr_replymsg(xdrs, msg); (void)xdrrec_endofrecord(xdrs, TRUE); - return (stat); + return (rstat); } static void @@ -619,7 +735,7 @@ svc_vc_rendezvous_ops(xprt) ops.xp_freeargs = (bool_t (*)(SVCXPRT *, xdrproc_t, void *))abort, ops.xp_destroy = svc_vc_destroy; - ops2.xp_control = svc_vc_control; + ops2.xp_control = svc_vc_rendezvous_control; } xprt->xp_ops = &ops; xprt->xp_ops2 = &ops2; @@ -720,3 +836,53 @@ __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) *uid = cmcred->cmcred_euid; return (0); } + +/* + * Destroy xprts that have not have had any activity in 'timeout' seconds. + * If 'cleanblock' is true, blocking connections (the default) are also + * cleaned. If timeout is 0, the least active connection is picked. + */ +bool_t +__svc_clean_idle(fd_set *fds, int timeout, bool_t cleanblock) +{ + int i, ncleaned; + SVCXPRT *xprt, *least_active; + struct timeval tv, tdiff, tmax; + struct cf_conn *cd; + + gettimeofday(&tv, NULL); + tmax.tv_sec = tmax.tv_usec = 0; + least_active = NULL; + rwlock_wrlock(&svc_fd_lock); + for (i = ncleaned = 0; i <= svc_maxfd; i++) { + if (FD_ISSET(i, fds)) { + xprt = __svc_xports[i]; + if (xprt == NULL || xprt->xp_ops == NULL || + xprt->xp_ops->xp_recv != svc_vc_recv) + continue; + cd = (struct cf_conn *)xprt->xp_p1; + if (!cleanblock && !cd->nonblock) + continue; + if (timeout == 0) { + timersub(&tv, &cd->last_recv_time, &tdiff); + if (timercmp(&tdiff, &tmax, >)) { + tmax = tdiff; + least_active = xprt; + } + continue; + } + if (tv.tv_sec - cd->last_recv_time.tv_sec > timeout) { + __xprt_unregister_unlocked(xprt); + __svc_vc_dodestroy(xprt); + ncleaned++; + } + } + } + if (timeout == 0 && least_active != NULL) { + __xprt_unregister_unlocked(least_active); + __svc_vc_dodestroy(least_active); + ncleaned++; + } + rwlock_unlock(&svc_fd_lock); + return ncleaned > 0 ? TRUE : FALSE; +} |