From f209ccb0fd339f0ad952359f267ec716648085fe Mon Sep 17 00:00:00 2001 From: mbr Date: Thu, 16 Jan 2003 07:13:51 +0000 Subject: Implement non-blocking tcp-connections. Reviewed by: rwatson Obtained from: NetBSD MFC after: 1 day --- lib/libc/rpc/rpc_com.h | 3 + lib/libc/rpc/svc.c | 67 +++++++++++---- lib/libc/rpc/svc_run.c | 10 ++- lib/libc/rpc/svc_vc.c | 228 ++++++++++++++++++++++++++++++++++++++++++------- lib/libc/xdr/xdr_rec.c | 191 ++++++++++++++++++++++++++++++++++++++--- 5 files changed, 437 insertions(+), 62 deletions(-) (limited to 'lib/libc') diff --git a/lib/libc/rpc/rpc_com.h b/lib/libc/rpc/rpc_com.h index 9686e11..1f1fd9e 100644 --- a/lib/libc/rpc/rpc_com.h +++ b/lib/libc/rpc/rpc_com.h @@ -84,6 +84,9 @@ bool_t __rpc_control(int,void *); char *_get_next_token(char *, int); +SVCXPRT **__svc_xports; +int __svc_maxrec; + __END_DECLS #endif /* _RPC_RPCCOM_H */ diff --git a/lib/libc/rpc/svc.c b/lib/libc/rpc/svc.c index c4584ed..9d7435e 100644 --- a/lib/libc/rpc/svc.c +++ b/lib/libc/rpc/svc.c @@ -63,8 +63,6 @@ __FBSDID("$FreeBSD$"); #include "rpc_com.h" -static SVCXPRT **xports; - #define RQCRED_SIZE 400 /* this size is excessive */ #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ @@ -91,6 +89,7 @@ extern rwlock_t svc_fd_lock; static struct svc_callout *svc_find(rpcprog_t, rpcvers_t, struct svc_callout **, char *); +static void __xprt_do_unregister (SVCXPRT *xprt, bool_t dolock); /* *************** SVCXPRT related stuff **************** */ @@ -108,27 +107,40 @@ xprt_register(xprt) sock = xprt->xp_fd; rwlock_wrlock(&svc_fd_lock); - if (xports == NULL) { - xports = (SVCXPRT **) + if (__svc_xports == NULL) { + __svc_xports = (SVCXPRT **) mem_alloc(FD_SETSIZE * sizeof(SVCXPRT *)); - if (xports == NULL) + if (__svc_xports == NULL) return; - memset(xports, '\0', FD_SETSIZE * sizeof(SVCXPRT *)); + memset(__svc_xports, '\0', FD_SETSIZE * sizeof(SVCXPRT *)); } if (sock < FD_SETSIZE) { - xports[sock] = xprt; + __svc_xports[sock] = xprt; FD_SET(sock, &svc_fdset); svc_maxfd = max(svc_maxfd, sock); } rwlock_unlock(&svc_fd_lock); } +void +xprt_unregister(SVCXPRT *xprt) +{ + __xprt_do_unregister(xprt, TRUE); +} + +void +__xprt_unregister_unlocked(SVCXPRT *xprt) +{ + __xprt_do_unregister(xprt, FALSE); +} + /* * De-activate a transport handle. */ -void -xprt_unregister(xprt) +static void +__xprt_do_unregister(xprt, dolock) SVCXPRT *xprt; + bool_t dolock; { int sock; @@ -136,17 +148,19 @@ xprt_unregister(xprt) sock = xprt->xp_fd; - rwlock_wrlock(&svc_fd_lock); - if ((sock < FD_SETSIZE) && (xports[sock] == xprt)) { - xports[sock] = NULL; + if (dolock) + rwlock_wrlock(&svc_fd_lock); + if ((sock < FD_SETSIZE) && (__svc_xports[sock] == xprt)) { + __svc_xports[sock] = NULL; FD_CLR(sock, &svc_fdset); if (sock >= svc_maxfd) { for (svc_maxfd--; svc_maxfd>=0; svc_maxfd--) - if (xports[svc_maxfd]) + if (__svc_xports[svc_maxfd]) break; } } - rwlock_unlock(&svc_fd_lock); + if (dolock) + rwlock_unlock(&svc_fd_lock); } /* @@ -611,7 +625,7 @@ svc_getreq_common(fd) r.rq_clntcred = &(cred_area[2*MAX_AUTH_BYTES]); rwlock_rdlock(&svc_fd_lock); - xprt = xports[fd]; + xprt = __svc_xports[fd]; rwlock_unlock(&svc_fd_lock); if (xprt == NULL) /* But do we control sock? */ @@ -667,7 +681,7 @@ svc_getreq_common(fd) * If so, then break. */ rwlock_rdlock(&svc_fd_lock); - if (xprt != xports[fd]) { + if (xprt != __svc_xports[fd]) { rwlock_unlock(&svc_fd_lock); break; } @@ -715,3 +729,24 @@ svc_getreq_poll(pfdp, pollretval) } } } + +bool_t +rpc_control(int what, void *arg) +{ + int val; + + switch (what) { + case RPC_SVC_CONNMAXREC_SET: + val = *(int *)arg; + if (val <= 0) + return FALSE; + __svc_maxrec = val; + return TRUE; + case RPC_SVC_CONNMAXREC_GET: + *(int *)arg = __svc_maxrec; + return TRUE; + default: + break; + } + return FALSE; +} diff --git a/lib/libc/rpc/svc_run.c b/lib/libc/rpc/svc_run.c index 6d13ee0..53d73f3 100644 --- a/lib/libc/rpc/svc_run.c +++ b/lib/libc/rpc/svc_run.c @@ -55,14 +55,19 @@ __FBSDID("$FreeBSD$"); void svc_run() { - fd_set readfds; + fd_set readfds, cleanfds; + struct timeval timeout; extern rwlock_t svc_fd_lock; + timeout.tv_sec = 30; + timeout.tv_usec = 0; + for (;;) { rwlock_rdlock(&svc_fd_lock); readfds = svc_fdset; + cleanfds = svc_fdset; rwlock_unlock(&svc_fd_lock); - switch (_select(svc_maxfd+1, &readfds, NULL, NULL, NULL)) { + switch (select(svc_maxfd+1, &readfds, NULL, NULL, &timeout)) { case -1: FD_ZERO(&readfds); if (errno == EINTR) { @@ -71,6 +76,7 @@ svc_run() _warn("svc_run: - select failed"); return; case 0: + __svc_clean_idle(&cleanfds, 30, FALSE); continue; default: svc_getreqset(&readfds); diff --git a/lib/libc/rpc/svc_vc.c b/lib/libc/rpc/svc_vc.c index e6cc187..ee802e2 100644 --- a/lib/libc/rpc/svc_vc.c +++ b/lib/libc/rpc/svc_vc.c @@ -51,6 +51,7 @@ __FBSDID("$FreeBSD$"); #include #include #include +#include #include #include #include @@ -58,6 +59,7 @@ __FBSDID("$FreeBSD$"); #include #include #include +#include #include #include #include @@ -73,10 +75,13 @@ struct cmessage { struct cmsgcred cmcred; }; +extern rwlock_t svc_fd_lock; + static SVCXPRT *makefd_xprt(int, u_int, u_int); static bool_t rendezvous_request(SVCXPRT *, struct rpc_msg *); static enum xprt_stat rendezvous_stat(SVCXPRT *); static void svc_vc_destroy(SVCXPRT *); +static void __svc_vc_dodestroy (SVCXPRT *); static int read_vc(void *, void *, int); static int write_vc(void *, void *, int); static enum xprt_stat svc_vc_stat(SVCXPRT *); @@ -87,12 +92,15 @@ static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *); static void svc_vc_rendezvous_ops(SVCXPRT *); static void svc_vc_ops(SVCXPRT *); static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in); +static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, + void *in); static int __msgread_withcred(int, void *, size_t, struct cmessage *); static int __msgwrite(int, void *, size_t); struct cf_rendezvous { /* kept in xprt->xp_p1 for rendezvouser */ u_int sendsize; u_int recvsize; + int maxrec; }; struct cf_conn { /* kept in xprt->xp_p1 for actual connection */ @@ -100,6 +108,11 @@ struct cf_conn { /* kept in xprt->xp_p1 for actual connection */ u_int32_t x_id; XDR xdrs; char verf_body[MAX_AUTH_BYTES]; + u_int sendsize; + u_int recvsize; + int maxrec; + bool_t nonblock; + struct timeval last_recv_time; }; /* @@ -139,6 +152,7 @@ svc_vc_create(fd, sendsize, recvsize) return NULL; r->sendsize = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsize); r->recvsize = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsize); + r->maxrec = __svc_maxrec; xprt = mem_alloc(sizeof(SVCXPRT)); if (xprt == NULL) { warnx("svc_vc_create: out of memory"); @@ -285,11 +299,14 @@ rendezvous_request(xprt, msg) SVCXPRT *xprt; struct rpc_msg *msg; { - int sock; + int sock, flags; struct cf_rendezvous *r; + struct cf_conn *cd; struct sockaddr_storage addr; socklen_t len; struct __rpc_sockinfo si; + SVCXPRT *newxprt; + fd_set cleanfds; assert(xprt != NULL); assert(msg != NULL); @@ -301,21 +318,30 @@ again: &len)) < 0) { if (errno == EINTR) goto again; - return (FALSE); + /* + * Clean out the most idle file descriptor when we're + * running out. + */ + if (errno == EMFILE || errno == ENFILE) { + cleanfds = svc_fdset; + __svc_clean_idle(&cleanfds, 0, FALSE); + goto again; + } + return (FALSE); } /* * make a new transporter (re-uses xprt) */ - xprt = makefd_xprt(sock, r->sendsize, r->recvsize); - xprt->xp_rtaddr.buf = mem_alloc(len); - if (xprt->xp_rtaddr.buf == NULL) + newxprt = makefd_xprt(sock, r->sendsize, r->recvsize); + newxprt->xp_rtaddr.buf = mem_alloc(len); + if (newxprt->xp_rtaddr.buf == NULL) return (FALSE); - memcpy(xprt->xp_rtaddr.buf, &addr, len); - xprt->xp_rtaddr.len = len; + memcpy(newxprt->xp_rtaddr.buf, &addr, len); + newxprt->xp_rtaddr.len = len; #ifdef PORTMAP if (addr.ss_family == AF_INET || addr.ss_family == AF_LOCAL) { - xprt->xp_raddr = *(struct sockaddr_in *)xprt->xp_rtaddr.buf; - xprt->xp_addrlen = sizeof (struct sockaddr_in); + newxprt->xp_raddr = *(struct sockaddr_in *)newxprt->xp_rtaddr.buf; + newxprt->xp_addrlen = sizeof (struct sockaddr_in); } #endif /* PORTMAP */ if (__rpc_fd2sockinfo(sock, &si) && si.si_proto == IPPROTO_TCP) { @@ -323,6 +349,28 @@ again: /* XXX fvdl - is this useful? */ _setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &len, sizeof (len)); } + + cd = (struct cf_conn *)newxprt->xp_p1; + + cd->recvsize = r->recvsize; + cd->sendsize = r->sendsize; + cd->maxrec = r->maxrec; + + if (cd->maxrec != 0) { + flags = fcntl(sock, F_GETFL, 0); + if (flags == -1) + return (FALSE); + if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) + return (FALSE); + if (cd->recvsize > cd->maxrec) + cd->recvsize = cd->maxrec; + cd->nonblock = TRUE; + __xdrrec_setnonblock(&cd->xdrs, cd->maxrec); + } else + cd->nonblock = FALSE; + + gettimeofday(&cd->last_recv_time, NULL); + return (FALSE); /* there is never an rpc msg to be processed */ } @@ -339,14 +387,21 @@ static void svc_vc_destroy(xprt) SVCXPRT *xprt; { + assert(xprt != NULL); + + xprt_unregister(xprt); + __svc_vc_dodestroy(xprt); +} + +static void +__svc_vc_dodestroy(xprt) + SVCXPRT *xprt; +{ struct cf_conn *cd; struct cf_rendezvous *r; - assert(xprt != NULL); - cd = (struct cf_conn *)xprt->xp_p1; - xprt_unregister(xprt); if (xprt->xp_fd != RPC_ANYFD) (void)_close(xprt->xp_fd); if (xprt->xp_port != 0) { @@ -380,6 +435,30 @@ svc_vc_control(xprt, rq, in) return (FALSE); } +static bool_t +svc_vc_rendezvous_control(xprt, rq, in) + SVCXPRT *xprt; + const u_int rq; + void *in; +{ + struct cf_rendezvous *cfp; + + cfp = (struct cf_rendezvous *)xprt->xp_p1; + if (cfp == NULL) + return (FALSE); + switch (rq) { + case SVCGET_CONNMAXREC: + *(int *)in = cfp->maxrec; + break; + case SVCSET_CONNMAXREC: + cfp->maxrec = *(int *)in; + break; + default: + return (FALSE); + } + return (TRUE); +} + /* * reads data from the tcp or uip connection. * any error is fatal and the connection is closed. @@ -399,12 +478,28 @@ read_vc(xprtp, buf, len) struct pollfd pollfd; struct sockaddr *sa; struct cmessage *cm; + struct cf_conn *cfp; xprt = (SVCXPRT *)xprtp; assert(xprt != NULL); sock = xprt->xp_fd; + cfp = (struct cf_conn *)xprt->xp_p1; + + if (cfp->nonblock) { + len = read(sock, buf, (size_t)len); + if (len < 0) { + if (errno == EAGAIN) + len = 0; + else + goto fatal_err; + } + if (len != 0) + gettimeofday(&cfp->last_recv_time, NULL); + return len; + } + do { pollfd.fd = sock; pollfd.events = POLLIN; @@ -432,8 +527,10 @@ read_vc(xprtp, buf, len) } else goto fatal_err; } else { - if ((len = _read(sock, buf, (size_t)len)) > 0) + if ((len = read(sock, buf, (size_t)len)) > 0) { + gettimeofday(&cfp->last_recv_time, NULL); return (len); + } } fatal_err: @@ -454,27 +551,41 @@ write_vc(xprtp, buf, len) SVCXPRT *xprt; int i, cnt; struct sockaddr *sa; + struct cf_conn *cd; + struct timeval tv0, tv1; xprt = (SVCXPRT *)xprtp; assert(xprt != NULL); + + cd = (struct cf_conn *)xprt->xp_p1; + + if (cd->nonblock) + gettimeofday(&tv0, NULL); sa = (struct sockaddr *)xprt->xp_rtaddr.buf; - if (sa->sa_family == AF_LOCAL) { - for (cnt = len; cnt > 0; cnt -= i, buf += i) { - if ((i = __msgwrite(xprt->xp_fd, buf, - (size_t)cnt)) < 0) { - ((struct cf_conn *)(xprt->xp_p1))->strm_stat = - XPRT_DIED; + for (cnt = len; cnt > 0; cnt -= i, buf += i) { + if (sa->sa_family == AF_LOCAL) + i = __msgwrite(xprt->xp_fd, buf, (size_t)cnt); + else + i = _write(xprt->xp_fd, buf, (size_t)cnt); + if (i < 0) { + if (errno != EAGAIN || !cd->nonblock) { + cd->strm_stat = XPRT_DIED; return (-1); } - } - } else { - for (cnt = len; cnt > 0; cnt -= i, buf += i) { - if ((i = _write(xprt->xp_fd, buf, - (size_t)cnt)) < 0) { - ((struct cf_conn *)(xprt->xp_p1))->strm_stat = - XPRT_DIED; - return (-1); + if (cd->nonblock && i != cnt) { + /* + * For non-blocking connections, do not + * take more than 2 seconds writing the + * data out. + * + * XXX 2 is an arbitrary amount. + */ + gettimeofday(&tv1, NULL); + if (tv1.tv_sec - tv0.tv_sec >= 2) { + cd->strm_stat = XPRT_DIED; + return (-1); + } } } } @@ -513,6 +624,11 @@ svc_vc_recv(xprt, msg) cd = (struct cf_conn *)(xprt->xp_p1); xdrs = &(cd->xdrs); + if (cd->nonblock) { + if (!__xdrrec_getrec(xdrs, &cd->strm_stat, TRUE)) + return FALSE; + } + xdrs->x_op = XDR_DECODE; (void)xdrrec_skiprecord(xdrs); if (xdr_callmsg(xdrs, msg)) { @@ -560,7 +676,7 @@ svc_vc_reply(xprt, msg) { struct cf_conn *cd; XDR *xdrs; - bool_t stat; + bool_t rstat; assert(xprt != NULL); assert(msg != NULL); @@ -570,9 +686,9 @@ svc_vc_reply(xprt, msg) xdrs->x_op = XDR_ENCODE; msg->rm_xid = cd->x_id; - stat = xdr_replymsg(xdrs, msg); + rstat = xdr_replymsg(xdrs, msg); (void)xdrrec_endofrecord(xdrs, TRUE); - return (stat); + return (rstat); } static void @@ -619,7 +735,7 @@ svc_vc_rendezvous_ops(xprt) ops.xp_freeargs = (bool_t (*)(SVCXPRT *, xdrproc_t, void *))abort, ops.xp_destroy = svc_vc_destroy; - ops2.xp_control = svc_vc_control; + ops2.xp_control = svc_vc_rendezvous_control; } xprt->xp_ops = &ops; xprt->xp_ops2 = &ops2; @@ -720,3 +836,53 @@ __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) *uid = cmcred->cmcred_euid; return (0); } + +/* + * Destroy xprts that have not have had any activity in 'timeout' seconds. + * If 'cleanblock' is true, blocking connections (the default) are also + * cleaned. If timeout is 0, the least active connection is picked. + */ +bool_t +__svc_clean_idle(fd_set *fds, int timeout, bool_t cleanblock) +{ + int i, ncleaned; + SVCXPRT *xprt, *least_active; + struct timeval tv, tdiff, tmax; + struct cf_conn *cd; + + gettimeofday(&tv, NULL); + tmax.tv_sec = tmax.tv_usec = 0; + least_active = NULL; + rwlock_wrlock(&svc_fd_lock); + for (i = ncleaned = 0; i <= svc_maxfd; i++) { + if (FD_ISSET(i, fds)) { + xprt = __svc_xports[i]; + if (xprt == NULL || xprt->xp_ops == NULL || + xprt->xp_ops->xp_recv != svc_vc_recv) + continue; + cd = (struct cf_conn *)xprt->xp_p1; + if (!cleanblock && !cd->nonblock) + continue; + if (timeout == 0) { + timersub(&tv, &cd->last_recv_time, &tdiff); + if (timercmp(&tdiff, &tmax, >)) { + tmax = tdiff; + least_active = xprt; + } + continue; + } + if (tv.tv_sec - cd->last_recv_time.tv_sec > timeout) { + __xprt_unregister_unlocked(xprt); + __svc_vc_dodestroy(xprt); + ncleaned++; + } + } + } + if (timeout == 0 && least_active != NULL) { + __xprt_unregister_unlocked(least_active); + __svc_vc_dodestroy(least_active); + ncleaned++; + } + rwlock_unlock(&svc_fd_lock); + return ncleaned > 0 ? TRUE : FALSE; +} diff --git a/lib/libc/xdr/xdr_rec.c b/lib/libc/xdr/xdr_rec.c index b969d6f..a09c496 100644 --- a/lib/libc/xdr/xdr_rec.c +++ b/lib/libc/xdr/xdr_rec.c @@ -66,6 +66,10 @@ __FBSDID("$FreeBSD$"); #include #include +#include +#include +#include +#include #include "un-namespace.h" static bool_t xdrrec_getlong(XDR *, long *); @@ -91,7 +95,7 @@ static const struct xdr_ops xdrrec_ops = { /* * A record is composed of one or more record fragments. - * A record fragment is a two-byte header followed by zero to + * A record fragment is a four-byte header followed by zero to * 2**32-1 bytes. The header is treated as a long unsigned and is * encode/decoded to the network via htonl/ntohl. The low order 31 bits * are a byte count of the fragment. The highest order bit is a boolean: @@ -106,7 +110,6 @@ static const struct xdr_ops xdrrec_ops = { typedef struct rec_strm { char *tcp_handle; - char *the_buffer; /* * out-goung bits */ @@ -128,6 +131,15 @@ typedef struct rec_strm { bool_t last_frag; u_int sendsize; u_int recvsize; + + bool_t nonblock; + bool_t in_haveheader; + u_int32_t in_header; + char *in_hdrp; + int in_hdrlen; + int in_reclen; + int in_received; + int in_maxrec; } RECSTREAM; static u_int fix_buf_size(u_int); @@ -136,6 +148,7 @@ static bool_t fill_input_buf(RECSTREAM *); static bool_t get_input_bytes(RECSTREAM *, char *, int); static bool_t set_input_fragment(RECSTREAM *); static bool_t skip_input_bytes(RECSTREAM *, long); +static bool_t realloc_stream(RECSTREAM *, int); /* @@ -168,20 +181,21 @@ xdrrec_create(xdrs, sendsize, recvsize, tcp_handle, readit, writeit) */ return; } - /* - * adjust sizes and allocate buffer quad byte aligned - */ rstrm->sendsize = sendsize = fix_buf_size(sendsize); + rstrm->out_base = mem_alloc(rstrm->sendsize); + if (rstrm->out_base == NULL) { + warnx("xdrrec_create: out of memory"); + mem_free(rstrm, sizeof(RECSTREAM)); + return; + } rstrm->recvsize = recvsize = fix_buf_size(recvsize); - rstrm->the_buffer = mem_alloc(sendsize + recvsize + BYTES_PER_XDR_UNIT); - if (rstrm->the_buffer == NULL) { + rstrm->in_base = mem_alloc(recvsize); + if (rstrm->in_base == NULL) { warnx("xdrrec_create: out of memory"); + mem_free(rstrm->out_base, sendsize); + mem_free(rstrm, sizeof(RECSTREAM)); return; } - for (rstrm->out_base = rstrm->the_buffer; - (u_long)rstrm->out_base % BYTES_PER_XDR_UNIT != 0; - rstrm->out_base++); - rstrm->in_base = rstrm->out_base + sendsize; /* * now the rest ... */ @@ -200,6 +214,12 @@ xdrrec_create(xdrs, sendsize, recvsize, tcp_handle, readit, writeit) rstrm->in_finger = (rstrm->in_boundry += recvsize); rstrm->fbtbc = 0; rstrm->last_frag = TRUE; + rstrm->in_haveheader = FALSE; + rstrm->in_hdrlen = 0; + rstrm->in_hdrp = (char *)(void *)&rstrm->in_header; + rstrm->nonblock = FALSE; + rstrm->in_reclen = 0; + rstrm->in_received = 0; } @@ -413,8 +433,8 @@ xdrrec_destroy(xdrs) { RECSTREAM *rstrm = (RECSTREAM *)xdrs->x_private; - mem_free(rstrm->the_buffer, - rstrm->sendsize + rstrm->recvsize + BYTES_PER_XDR_UNIT); + mem_free(rstrm->out_base, rstrm->sendsize); + mem_free(rstrm->in_base, rstrm->recvsize); mem_free(rstrm, sizeof(RECSTREAM)); } @@ -432,6 +452,20 @@ xdrrec_skiprecord(xdrs) XDR *xdrs; { RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private); + enum xprt_stat xstat; + + if (rstrm->nonblock) { + if (__xdrrec_getrec(xdrs, &xstat, FALSE)) { + rstrm->fbtbc = 0; + return TRUE; + } + if (rstrm->in_finger == rstrm->in_boundry && + xstat == XPRT_MOREREQS) { + rstrm->fbtbc = 0; + return TRUE; + } + return FALSE; + } while (rstrm->fbtbc > 0 || (! rstrm->last_frag)) { if (! skip_input_bytes(rstrm, rstrm->fbtbc)) @@ -454,6 +488,15 @@ xdrrec_eof(xdrs) XDR *xdrs; { RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private); + enum xprt_stat xstat; + + if (rstrm->nonblock) { + if (__xdrrec_getrec(xdrs, &xstat, FALSE)) + return FALSE; + if (!rstrm->in_haveheader && xstat == XPRT_IDLE) + return TRUE; + return FALSE; + } while (rstrm->fbtbc > 0 || (! rstrm->last_frag)) { if (! skip_input_bytes(rstrm, rstrm->fbtbc)) @@ -495,6 +538,99 @@ xdrrec_endofrecord(xdrs, sendnow) return (TRUE); } +/* + * Fill the stream buffer with a record for a non-blocking connection. + * Return true if a record is available in the buffer, false if not. + */ +bool_t +__xdrrec_getrec(xdrs, statp, expectdata) + XDR *xdrs; + enum xprt_stat *statp; + bool_t expectdata; +{ + RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private); + ssize_t n; + int fraglen; + + if (!rstrm->in_haveheader) { + n = rstrm->readit(rstrm->tcp_handle, rstrm->in_hdrp, + (int)sizeof (rstrm->in_header) - rstrm->in_hdrlen); + if (n == 0) { + *statp = expectdata ? XPRT_DIED : XPRT_IDLE; + return FALSE; + } + if (n < 0) { + *statp = XPRT_DIED; + return FALSE; + } + rstrm->in_hdrp += n; + rstrm->in_hdrlen += n; + if (rstrm->in_hdrlen < sizeof (rstrm->in_header)) { + *statp = XPRT_MOREREQS; + return FALSE; + } + rstrm->in_header = ntohl(rstrm->in_header); + fraglen = (int)(rstrm->in_header & ~LAST_FRAG); + if (fraglen == 0 || fraglen > rstrm->in_maxrec || + (rstrm->in_reclen + fraglen) > rstrm->in_maxrec) { + *statp = XPRT_DIED; + return FALSE; + } + rstrm->in_reclen += fraglen; + if (rstrm->in_reclen > rstrm->recvsize) + realloc_stream(rstrm, rstrm->in_reclen); + if (rstrm->in_header & LAST_FRAG) { + rstrm->in_header &= ~LAST_FRAG; + rstrm->last_frag = TRUE; + } + } + + n = rstrm->readit(rstrm->tcp_handle, + rstrm->in_base + rstrm->in_received, + (rstrm->in_reclen - rstrm->in_received)); + + if (n < 0) { + *statp = XPRT_DIED; + return FALSE; + } + + if (n == 0) { + *statp = expectdata ? XPRT_DIED : XPRT_IDLE; + return FALSE; + } + + rstrm->in_received += n; + + if (rstrm->in_received == rstrm->in_reclen) { + rstrm->in_haveheader = FALSE; + rstrm->in_hdrp = (char *)(void *)&rstrm->in_header; + rstrm->in_hdrlen = 0; + if (rstrm->last_frag) { + rstrm->fbtbc = rstrm->in_reclen; + rstrm->in_boundry = rstrm->in_base + rstrm->in_reclen; + rstrm->in_finger = rstrm->in_base; + *statp = XPRT_MOREREQS; + return TRUE; + } + } + + *statp = XPRT_MOREREQS; + return FALSE; +} + +bool_t +__xdrrec_setnonblock(xdrs, maxrec) + XDR *xdrs; + int maxrec; +{ + RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private); + + rstrm->nonblock = TRUE; + if (maxrec == 0) + maxrec = rstrm->recvsize; + rstrm->in_maxrec = maxrec; + return TRUE; +} /* * Internal useful routines @@ -527,6 +663,9 @@ fill_input_buf(rstrm) u_int32_t i; int len; + if (rstrm->nonblock) + return FALSE; + where = rstrm->in_base; i = (u_int32_t)((u_long)rstrm->in_boundry % BYTES_PER_XDR_UNIT); where += i; @@ -619,3 +758,29 @@ fix_buf_size(s) s = 4000; return (RNDUP(s)); } + +/* + * Reallocate the input buffer for a non-block stream. + */ +static bool_t +realloc_stream(rstrm, size) + RECSTREAM *rstrm; + int size; +{ + ptrdiff_t diff; + char *buf; + + if (size > rstrm->recvsize) { + buf = realloc(rstrm->in_base, (size_t)size); + if (buf == NULL) + return FALSE; + diff = buf - rstrm->in_base; + rstrm->in_finger += diff; + rstrm->in_base = buf; + rstrm->in_boundry = buf + size; + rstrm->recvsize = size; + rstrm->in_size = size; + } + + return TRUE; +} -- cgit v1.1