summaryrefslogtreecommitdiffstats
path: root/lib/libc/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libc/rpc')
-rw-r--r--lib/libc/rpc/rpc_com.h3
-rw-r--r--lib/libc/rpc/svc.c67
-rw-r--r--lib/libc/rpc/svc_run.c10
-rw-r--r--lib/libc/rpc/svc_vc.c228
4 files changed, 259 insertions, 49 deletions
diff --git a/lib/libc/rpc/rpc_com.h b/lib/libc/rpc/rpc_com.h
index 9686e11..1f1fd9e 100644
--- a/lib/libc/rpc/rpc_com.h
+++ b/lib/libc/rpc/rpc_com.h
@@ -84,6 +84,9 @@ bool_t __rpc_control(int,void *);
char *_get_next_token(char *, int);
+SVCXPRT **__svc_xports;
+int __svc_maxrec;
+
__END_DECLS
#endif /* _RPC_RPCCOM_H */
diff --git a/lib/libc/rpc/svc.c b/lib/libc/rpc/svc.c
index c4584ed..9d7435e 100644
--- a/lib/libc/rpc/svc.c
+++ b/lib/libc/rpc/svc.c
@@ -63,8 +63,6 @@ __FBSDID("$FreeBSD$");
#include "rpc_com.h"
-static SVCXPRT **xports;
-
#define RQCRED_SIZE 400 /* this size is excessive */
#define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
@@ -91,6 +89,7 @@ extern rwlock_t svc_fd_lock;
static struct svc_callout *svc_find(rpcprog_t, rpcvers_t,
struct svc_callout **, char *);
+static void __xprt_do_unregister (SVCXPRT *xprt, bool_t dolock);
/* *************** SVCXPRT related stuff **************** */
@@ -108,27 +107,40 @@ xprt_register(xprt)
sock = xprt->xp_fd;
rwlock_wrlock(&svc_fd_lock);
- if (xports == NULL) {
- xports = (SVCXPRT **)
+ if (__svc_xports == NULL) {
+ __svc_xports = (SVCXPRT **)
mem_alloc(FD_SETSIZE * sizeof(SVCXPRT *));
- if (xports == NULL)
+ if (__svc_xports == NULL)
return;
- memset(xports, '\0', FD_SETSIZE * sizeof(SVCXPRT *));
+ memset(__svc_xports, '\0', FD_SETSIZE * sizeof(SVCXPRT *));
}
if (sock < FD_SETSIZE) {
- xports[sock] = xprt;
+ __svc_xports[sock] = xprt;
FD_SET(sock, &svc_fdset);
svc_maxfd = max(svc_maxfd, sock);
}
rwlock_unlock(&svc_fd_lock);
}
+void
+xprt_unregister(SVCXPRT *xprt)
+{
+ __xprt_do_unregister(xprt, TRUE);
+}
+
+void
+__xprt_unregister_unlocked(SVCXPRT *xprt)
+{
+ __xprt_do_unregister(xprt, FALSE);
+}
+
/*
* De-activate a transport handle.
*/
-void
-xprt_unregister(xprt)
+static void
+__xprt_do_unregister(xprt, dolock)
SVCXPRT *xprt;
+ bool_t dolock;
{
int sock;
@@ -136,17 +148,19 @@ xprt_unregister(xprt)
sock = xprt->xp_fd;
- rwlock_wrlock(&svc_fd_lock);
- if ((sock < FD_SETSIZE) && (xports[sock] == xprt)) {
- xports[sock] = NULL;
+ if (dolock)
+ rwlock_wrlock(&svc_fd_lock);
+ if ((sock < FD_SETSIZE) && (__svc_xports[sock] == xprt)) {
+ __svc_xports[sock] = NULL;
FD_CLR(sock, &svc_fdset);
if (sock >= svc_maxfd) {
for (svc_maxfd--; svc_maxfd>=0; svc_maxfd--)
- if (xports[svc_maxfd])
+ if (__svc_xports[svc_maxfd])
break;
}
}
- rwlock_unlock(&svc_fd_lock);
+ if (dolock)
+ rwlock_unlock(&svc_fd_lock);
}
/*
@@ -611,7 +625,7 @@ svc_getreq_common(fd)
r.rq_clntcred = &(cred_area[2*MAX_AUTH_BYTES]);
rwlock_rdlock(&svc_fd_lock);
- xprt = xports[fd];
+ xprt = __svc_xports[fd];
rwlock_unlock(&svc_fd_lock);
if (xprt == NULL)
/* But do we control sock? */
@@ -667,7 +681,7 @@ svc_getreq_common(fd)
* If so, then break.
*/
rwlock_rdlock(&svc_fd_lock);
- if (xprt != xports[fd]) {
+ if (xprt != __svc_xports[fd]) {
rwlock_unlock(&svc_fd_lock);
break;
}
@@ -715,3 +729,24 @@ svc_getreq_poll(pfdp, pollretval)
}
}
}
+
+bool_t
+rpc_control(int what, void *arg)
+{
+ int val;
+
+ switch (what) {
+ case RPC_SVC_CONNMAXREC_SET:
+ val = *(int *)arg;
+ if (val <= 0)
+ return FALSE;
+ __svc_maxrec = val;
+ return TRUE;
+ case RPC_SVC_CONNMAXREC_GET:
+ *(int *)arg = __svc_maxrec;
+ return TRUE;
+ default:
+ break;
+ }
+ return FALSE;
+}
diff --git a/lib/libc/rpc/svc_run.c b/lib/libc/rpc/svc_run.c
index 6d13ee0..53d73f3 100644
--- a/lib/libc/rpc/svc_run.c
+++ b/lib/libc/rpc/svc_run.c
@@ -55,14 +55,19 @@ __FBSDID("$FreeBSD$");
void
svc_run()
{
- fd_set readfds;
+ fd_set readfds, cleanfds;
+ struct timeval timeout;
extern rwlock_t svc_fd_lock;
+ timeout.tv_sec = 30;
+ timeout.tv_usec = 0;
+
for (;;) {
rwlock_rdlock(&svc_fd_lock);
readfds = svc_fdset;
+ cleanfds = svc_fdset;
rwlock_unlock(&svc_fd_lock);
- switch (_select(svc_maxfd+1, &readfds, NULL, NULL, NULL)) {
+ switch (select(svc_maxfd+1, &readfds, NULL, NULL, &timeout)) {
case -1:
FD_ZERO(&readfds);
if (errno == EINTR) {
@@ -71,6 +76,7 @@ svc_run()
_warn("svc_run: - select failed");
return;
case 0:
+ __svc_clean_idle(&cleanfds, 30, FALSE);
continue;
default:
svc_getreqset(&readfds);
diff --git a/lib/libc/rpc/svc_vc.c b/lib/libc/rpc/svc_vc.c
index e6cc187..ee802e2 100644
--- a/lib/libc/rpc/svc_vc.c
+++ b/lib/libc/rpc/svc_vc.c
@@ -51,6 +51,7 @@ __FBSDID("$FreeBSD$");
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/time.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
@@ -58,6 +59,7 @@ __FBSDID("$FreeBSD$");
#include <assert.h>
#include <err.h>
#include <errno.h>
+#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -73,10 +75,13 @@ struct cmessage {
struct cmsgcred cmcred;
};
+extern rwlock_t svc_fd_lock;
+
static SVCXPRT *makefd_xprt(int, u_int, u_int);
static bool_t rendezvous_request(SVCXPRT *, struct rpc_msg *);
static enum xprt_stat rendezvous_stat(SVCXPRT *);
static void svc_vc_destroy(SVCXPRT *);
+static void __svc_vc_dodestroy (SVCXPRT *);
static int read_vc(void *, void *, int);
static int write_vc(void *, void *, int);
static enum xprt_stat svc_vc_stat(SVCXPRT *);
@@ -87,12 +92,15 @@ static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *);
static void svc_vc_rendezvous_ops(SVCXPRT *);
static void svc_vc_ops(SVCXPRT *);
static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
+static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
+ void *in);
static int __msgread_withcred(int, void *, size_t, struct cmessage *);
static int __msgwrite(int, void *, size_t);
struct cf_rendezvous { /* kept in xprt->xp_p1 for rendezvouser */
u_int sendsize;
u_int recvsize;
+ int maxrec;
};
struct cf_conn { /* kept in xprt->xp_p1 for actual connection */
@@ -100,6 +108,11 @@ struct cf_conn { /* kept in xprt->xp_p1 for actual connection */
u_int32_t x_id;
XDR xdrs;
char verf_body[MAX_AUTH_BYTES];
+ u_int sendsize;
+ u_int recvsize;
+ int maxrec;
+ bool_t nonblock;
+ struct timeval last_recv_time;
};
/*
@@ -139,6 +152,7 @@ svc_vc_create(fd, sendsize, recvsize)
return NULL;
r->sendsize = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsize);
r->recvsize = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsize);
+ r->maxrec = __svc_maxrec;
xprt = mem_alloc(sizeof(SVCXPRT));
if (xprt == NULL) {
warnx("svc_vc_create: out of memory");
@@ -285,11 +299,14 @@ rendezvous_request(xprt, msg)
SVCXPRT *xprt;
struct rpc_msg *msg;
{
- int sock;
+ int sock, flags;
struct cf_rendezvous *r;
+ struct cf_conn *cd;
struct sockaddr_storage addr;
socklen_t len;
struct __rpc_sockinfo si;
+ SVCXPRT *newxprt;
+ fd_set cleanfds;
assert(xprt != NULL);
assert(msg != NULL);
@@ -301,21 +318,30 @@ again:
&len)) < 0) {
if (errno == EINTR)
goto again;
- return (FALSE);
+ /*
+ * Clean out the most idle file descriptor when we're
+ * running out.
+ */
+ if (errno == EMFILE || errno == ENFILE) {
+ cleanfds = svc_fdset;
+ __svc_clean_idle(&cleanfds, 0, FALSE);
+ goto again;
+ }
+ return (FALSE);
}
/*
* make a new transporter (re-uses xprt)
*/
- xprt = makefd_xprt(sock, r->sendsize, r->recvsize);
- xprt->xp_rtaddr.buf = mem_alloc(len);
- if (xprt->xp_rtaddr.buf == NULL)
+ newxprt = makefd_xprt(sock, r->sendsize, r->recvsize);
+ newxprt->xp_rtaddr.buf = mem_alloc(len);
+ if (newxprt->xp_rtaddr.buf == NULL)
return (FALSE);
- memcpy(xprt->xp_rtaddr.buf, &addr, len);
- xprt->xp_rtaddr.len = len;
+ memcpy(newxprt->xp_rtaddr.buf, &addr, len);
+ newxprt->xp_rtaddr.len = len;
#ifdef PORTMAP
if (addr.ss_family == AF_INET || addr.ss_family == AF_LOCAL) {
- xprt->xp_raddr = *(struct sockaddr_in *)xprt->xp_rtaddr.buf;
- xprt->xp_addrlen = sizeof (struct sockaddr_in);
+ newxprt->xp_raddr = *(struct sockaddr_in *)newxprt->xp_rtaddr.buf;
+ newxprt->xp_addrlen = sizeof (struct sockaddr_in);
}
#endif /* PORTMAP */
if (__rpc_fd2sockinfo(sock, &si) && si.si_proto == IPPROTO_TCP) {
@@ -323,6 +349,28 @@ again:
/* XXX fvdl - is this useful? */
_setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &len, sizeof (len));
}
+
+ cd = (struct cf_conn *)newxprt->xp_p1;
+
+ cd->recvsize = r->recvsize;
+ cd->sendsize = r->sendsize;
+ cd->maxrec = r->maxrec;
+
+ if (cd->maxrec != 0) {
+ flags = fcntl(sock, F_GETFL, 0);
+ if (flags == -1)
+ return (FALSE);
+ if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
+ return (FALSE);
+ if (cd->recvsize > cd->maxrec)
+ cd->recvsize = cd->maxrec;
+ cd->nonblock = TRUE;
+ __xdrrec_setnonblock(&cd->xdrs, cd->maxrec);
+ } else
+ cd->nonblock = FALSE;
+
+ gettimeofday(&cd->last_recv_time, NULL);
+
return (FALSE); /* there is never an rpc msg to be processed */
}
@@ -339,14 +387,21 @@ static void
svc_vc_destroy(xprt)
SVCXPRT *xprt;
{
+ assert(xprt != NULL);
+
+ xprt_unregister(xprt);
+ __svc_vc_dodestroy(xprt);
+}
+
+static void
+__svc_vc_dodestroy(xprt)
+ SVCXPRT *xprt;
+{
struct cf_conn *cd;
struct cf_rendezvous *r;
- assert(xprt != NULL);
-
cd = (struct cf_conn *)xprt->xp_p1;
- xprt_unregister(xprt);
if (xprt->xp_fd != RPC_ANYFD)
(void)_close(xprt->xp_fd);
if (xprt->xp_port != 0) {
@@ -380,6 +435,30 @@ svc_vc_control(xprt, rq, in)
return (FALSE);
}
+static bool_t
+svc_vc_rendezvous_control(xprt, rq, in)
+ SVCXPRT *xprt;
+ const u_int rq;
+ void *in;
+{
+ struct cf_rendezvous *cfp;
+
+ cfp = (struct cf_rendezvous *)xprt->xp_p1;
+ if (cfp == NULL)
+ return (FALSE);
+ switch (rq) {
+ case SVCGET_CONNMAXREC:
+ *(int *)in = cfp->maxrec;
+ break;
+ case SVCSET_CONNMAXREC:
+ cfp->maxrec = *(int *)in;
+ break;
+ default:
+ return (FALSE);
+ }
+ return (TRUE);
+}
+
/*
* reads data from the tcp or uip connection.
* any error is fatal and the connection is closed.
@@ -399,12 +478,28 @@ read_vc(xprtp, buf, len)
struct pollfd pollfd;
struct sockaddr *sa;
struct cmessage *cm;
+ struct cf_conn *cfp;
xprt = (SVCXPRT *)xprtp;
assert(xprt != NULL);
sock = xprt->xp_fd;
+ cfp = (struct cf_conn *)xprt->xp_p1;
+
+ if (cfp->nonblock) {
+ len = read(sock, buf, (size_t)len);
+ if (len < 0) {
+ if (errno == EAGAIN)
+ len = 0;
+ else
+ goto fatal_err;
+ }
+ if (len != 0)
+ gettimeofday(&cfp->last_recv_time, NULL);
+ return len;
+ }
+
do {
pollfd.fd = sock;
pollfd.events = POLLIN;
@@ -432,8 +527,10 @@ read_vc(xprtp, buf, len)
} else
goto fatal_err;
} else {
- if ((len = _read(sock, buf, (size_t)len)) > 0)
+ if ((len = read(sock, buf, (size_t)len)) > 0) {
+ gettimeofday(&cfp->last_recv_time, NULL);
return (len);
+ }
}
fatal_err:
@@ -454,27 +551,41 @@ write_vc(xprtp, buf, len)
SVCXPRT *xprt;
int i, cnt;
struct sockaddr *sa;
+ struct cf_conn *cd;
+ struct timeval tv0, tv1;
xprt = (SVCXPRT *)xprtp;
assert(xprt != NULL);
+
+ cd = (struct cf_conn *)xprt->xp_p1;
+
+ if (cd->nonblock)
+ gettimeofday(&tv0, NULL);
sa = (struct sockaddr *)xprt->xp_rtaddr.buf;
- if (sa->sa_family == AF_LOCAL) {
- for (cnt = len; cnt > 0; cnt -= i, buf += i) {
- if ((i = __msgwrite(xprt->xp_fd, buf,
- (size_t)cnt)) < 0) {
- ((struct cf_conn *)(xprt->xp_p1))->strm_stat =
- XPRT_DIED;
+ for (cnt = len; cnt > 0; cnt -= i, buf += i) {
+ if (sa->sa_family == AF_LOCAL)
+ i = __msgwrite(xprt->xp_fd, buf, (size_t)cnt);
+ else
+ i = _write(xprt->xp_fd, buf, (size_t)cnt);
+ if (i < 0) {
+ if (errno != EAGAIN || !cd->nonblock) {
+ cd->strm_stat = XPRT_DIED;
return (-1);
}
- }
- } else {
- for (cnt = len; cnt > 0; cnt -= i, buf += i) {
- if ((i = _write(xprt->xp_fd, buf,
- (size_t)cnt)) < 0) {
- ((struct cf_conn *)(xprt->xp_p1))->strm_stat =
- XPRT_DIED;
- return (-1);
+ if (cd->nonblock && i != cnt) {
+ /*
+ * For non-blocking connections, do not
+ * take more than 2 seconds writing the
+ * data out.
+ *
+ * XXX 2 is an arbitrary amount.
+ */
+ gettimeofday(&tv1, NULL);
+ if (tv1.tv_sec - tv0.tv_sec >= 2) {
+ cd->strm_stat = XPRT_DIED;
+ return (-1);
+ }
}
}
}
@@ -513,6 +624,11 @@ svc_vc_recv(xprt, msg)
cd = (struct cf_conn *)(xprt->xp_p1);
xdrs = &(cd->xdrs);
+ if (cd->nonblock) {
+ if (!__xdrrec_getrec(xdrs, &cd->strm_stat, TRUE))
+ return FALSE;
+ }
+
xdrs->x_op = XDR_DECODE;
(void)xdrrec_skiprecord(xdrs);
if (xdr_callmsg(xdrs, msg)) {
@@ -560,7 +676,7 @@ svc_vc_reply(xprt, msg)
{
struct cf_conn *cd;
XDR *xdrs;
- bool_t stat;
+ bool_t rstat;
assert(xprt != NULL);
assert(msg != NULL);
@@ -570,9 +686,9 @@ svc_vc_reply(xprt, msg)
xdrs->x_op = XDR_ENCODE;
msg->rm_xid = cd->x_id;
- stat = xdr_replymsg(xdrs, msg);
+ rstat = xdr_replymsg(xdrs, msg);
(void)xdrrec_endofrecord(xdrs, TRUE);
- return (stat);
+ return (rstat);
}
static void
@@ -619,7 +735,7 @@ svc_vc_rendezvous_ops(xprt)
ops.xp_freeargs =
(bool_t (*)(SVCXPRT *, xdrproc_t, void *))abort,
ops.xp_destroy = svc_vc_destroy;
- ops2.xp_control = svc_vc_control;
+ ops2.xp_control = svc_vc_rendezvous_control;
}
xprt->xp_ops = &ops;
xprt->xp_ops2 = &ops2;
@@ -720,3 +836,53 @@ __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid)
*uid = cmcred->cmcred_euid;
return (0);
}
+
+/*
+ * Destroy xprts that have not have had any activity in 'timeout' seconds.
+ * If 'cleanblock' is true, blocking connections (the default) are also
+ * cleaned. If timeout is 0, the least active connection is picked.
+ */
+bool_t
+__svc_clean_idle(fd_set *fds, int timeout, bool_t cleanblock)
+{
+ int i, ncleaned;
+ SVCXPRT *xprt, *least_active;
+ struct timeval tv, tdiff, tmax;
+ struct cf_conn *cd;
+
+ gettimeofday(&tv, NULL);
+ tmax.tv_sec = tmax.tv_usec = 0;
+ least_active = NULL;
+ rwlock_wrlock(&svc_fd_lock);
+ for (i = ncleaned = 0; i <= svc_maxfd; i++) {
+ if (FD_ISSET(i, fds)) {
+ xprt = __svc_xports[i];
+ if (xprt == NULL || xprt->xp_ops == NULL ||
+ xprt->xp_ops->xp_recv != svc_vc_recv)
+ continue;
+ cd = (struct cf_conn *)xprt->xp_p1;
+ if (!cleanblock && !cd->nonblock)
+ continue;
+ if (timeout == 0) {
+ timersub(&tv, &cd->last_recv_time, &tdiff);
+ if (timercmp(&tdiff, &tmax, >)) {
+ tmax = tdiff;
+ least_active = xprt;
+ }
+ continue;
+ }
+ if (tv.tv_sec - cd->last_recv_time.tv_sec > timeout) {
+ __xprt_unregister_unlocked(xprt);
+ __svc_vc_dodestroy(xprt);
+ ncleaned++;
+ }
+ }
+ }
+ if (timeout == 0 && least_active != NULL) {
+ __xprt_unregister_unlocked(least_active);
+ __svc_vc_dodestroy(least_active);
+ ncleaned++;
+ }
+ rwlock_unlock(&svc_fd_lock);
+ return ncleaned > 0 ? TRUE : FALSE;
+}
OpenPOWER on IntegriCloud