diff options
-rw-r--r-- | sys/fs/nfs/nfs_var.h | 11 | ||||
-rw-r--r-- | sys/fs/nfs/nfsrvcache.h | 10 | ||||
-rw-r--r-- | sys/fs/nfsserver/nfs_nfsdcache.c | 135 | ||||
-rw-r--r-- | sys/fs/nfsserver/nfs_nfsdkrpc.c | 39 | ||||
-rw-r--r-- | sys/fs/nfsserver/nfs_nfsdport.c | 44 | ||||
-rw-r--r-- | sys/fs/nfsserver/nfs_nfsdsubs.c | 41 | ||||
-rw-r--r-- | sys/rpc/svc.c | 67 | ||||
-rw-r--r-- | sys/rpc/svc.h | 51 | ||||
-rw-r--r-- | sys/rpc/svc_dg.c | 4 | ||||
-rw-r--r-- | sys/rpc/svc_vc.c | 46 |
10 files changed, 268 insertions, 180 deletions
diff --git a/sys/fs/nfs/nfs_var.h b/sys/fs/nfs/nfs_var.h index 7fbabaa..71daeaf 100644 --- a/sys/fs/nfs/nfs_var.h +++ b/sys/fs/nfs/nfs_var.h @@ -218,14 +218,14 @@ void nfsrvd_dorpc(struct nfsrv_descript *, int, NFSPROC_T *); /* nfs_nfsdcache.c */ void nfsrvd_initcache(void); -int nfsrvd_getcache(struct nfsrv_descript *, struct socket *); -struct nfsrvcache *nfsrvd_updatecache(struct nfsrv_descript *, - struct socket *); -void nfsrvd_sentcache(struct nfsrvcache *, struct socket *, int); +int nfsrvd_getcache(struct nfsrv_descript *); +struct nfsrvcache *nfsrvd_updatecache(struct nfsrv_descript *); +void nfsrvd_sentcache(struct nfsrvcache *, uint32_t); void nfsrvd_cleancache(void); void nfsrvd_refcache(struct nfsrvcache *); void nfsrvd_derefcache(struct nfsrvcache *); void nfsrvd_delcache(struct nfsrvcache *); +void nfsrc_trimcache(uint64_t, uint32_t, int); /* nfs_commonsubs.c */ void newnfs_init(void); @@ -327,9 +327,6 @@ int nfsd_checkrootexp(struct nfsrv_descript *); void nfscl_retopts(struct nfsmount *, char *, size_t); /* nfs_commonport.c */ -int nfsrv_checksockseqnum(struct socket *, tcp_seq); -int nfsrv_getsockseqnum(struct socket *, tcp_seq *); -int nfsrv_getsocksndseq(struct socket *, tcp_seq *, tcp_seq *); int nfsrv_lookupfilename(struct nameidata *, char *, NFSPROC_T *); void nfsrv_object_create(vnode_t, NFSPROC_T *); int nfsrv_mallocmget_limit(void); diff --git a/sys/fs/nfs/nfsrvcache.h b/sys/fs/nfs/nfsrvcache.h index 5c9dc57..7db3035 100644 --- a/sys/fs/nfs/nfsrvcache.h +++ b/sys/fs/nfs/nfsrvcache.h @@ -46,6 +46,7 @@ /* Cache table entry. */ struct nfsrvcache { LIST_ENTRY(nfsrvcache) rc_hash; /* Hash chain */ + LIST_ENTRY(nfsrvcache) rc_ahash; /* ACK hash chain */ TAILQ_ENTRY(nfsrvcache) rc_lru; /* UDP lru chain */ u_int32_t rc_xid; /* rpc id number */ time_t rc_timestamp; /* Time done */ @@ -64,6 +65,7 @@ struct nfsrvcache { int16_t refcnt; u_int16_t cksum; time_t cachetime; + int acked; } ot; } rc_un2; u_int16_t rc_proc; /* rpc proc number */ @@ -81,6 +83,13 @@ struct nfsrvcache { #define rc_reqlen rc_un2.ot.len #define rc_cksum rc_un2.ot.cksum #define rc_cachetime rc_un2.ot.cachetime +#define rc_acked rc_un2.ot.acked + +/* TCP ACK values */ +#define RC_NO_SEQ 0 +#define RC_NO_ACK 1 +#define RC_ACK 2 +#define RC_NACK 3 /* Return values */ #define RC_DROPIT 0 @@ -95,7 +104,6 @@ struct nfsrvcache { #define RC_UDP 0x0010 #define RC_INETIPV6 0x0020 #define RC_INPROG 0x0040 -#define RC_TCPSEQ 0x0080 #define RC_NFSV2 0x0100 #define RC_NFSV3 0x0200 #define RC_NFSV4 0x0400 diff --git a/sys/fs/nfsserver/nfs_nfsdcache.c b/sys/fs/nfsserver/nfs_nfsdcache.c index 04f7231..2927152 100644 --- a/sys/fs/nfsserver/nfs_nfsdcache.c +++ b/sys/fs/nfsserver/nfs_nfsdcache.c @@ -162,6 +162,7 @@ __FBSDID("$FreeBSD$"); extern struct nfsstats newnfsstats; extern struct mtx nfsrc_udpmtx; extern struct nfsrchash_bucket nfsrchash_table[NFSRVCACHE_HASHSIZE]; +extern struct nfsrchash_bucket nfsrcahash_table[NFSRVCACHE_HASHSIZE]; int nfsrc_floodlevel = NFSRVCACHE_FLOODLEVEL, nfsrc_tcpsavedreplies = 0; #endif /* !APPLEKEXT */ @@ -238,6 +239,7 @@ static int newnfsv2_procid[NFS_V3NPROCS] = { (&nfsrvudphashtbl[nfsrc_hash(xid)]) #define NFSRCHASH(xid) \ (&nfsrchash_table[nfsrc_hash(xid)].tbl) +#define NFSRCAHASH(xid) (&nfsrcahash_table[nfsrc_hash(xid)]) #define TRUE 1 #define FALSE 0 #define NFSRVCACHE_CHECKLEN 100 @@ -281,9 +283,6 @@ static void nfsrc_lock(struct nfsrvcache *rp); static void nfsrc_unlock(struct nfsrvcache *rp); static void nfsrc_wanted(struct nfsrvcache *rp); static void nfsrc_freecache(struct nfsrvcache *rp); -static void nfsrc_trimcache(u_int64_t, struct socket *); -static int nfsrc_activesocket(struct nfsrvcache *rp, u_int64_t, - struct socket *); static int nfsrc_getlenandcksum(mbuf_t m1, u_int16_t *cksum); static void nfsrc_marksametcpconn(u_int64_t); @@ -314,6 +313,7 @@ nfsrvd_initcache(void) for (i = 0; i < NFSRVCACHE_HASHSIZE; i++) { LIST_INIT(&nfsrvudphashtbl[i]); LIST_INIT(&nfsrchash_table[i].tbl); + LIST_INIT(&nfsrcahash_table[i].tbl); } TAILQ_INIT(&nfsrvudplru); nfsrc_tcpsavedreplies = 0; @@ -325,10 +325,9 @@ nfsrvd_initcache(void) /* * Get a cache entry for this request. Basically just malloc a new one * and then call nfsrc_getudp() or nfsrc_gettcp() to do the rest. - * Call nfsrc_trimcache() to clean up the cache before returning. */ APPLESTATIC int -nfsrvd_getcache(struct nfsrv_descript *nd, struct socket *so) +nfsrvd_getcache(struct nfsrv_descript *nd) { struct nfsrvcache *newrp; int ret; @@ -356,7 +355,6 @@ nfsrvd_getcache(struct nfsrv_descript *nd, struct socket *so) } else { ret = nfsrc_gettcp(nd, newrp); } - nfsrc_trimcache(nd->nd_sockref, so); NFSEXITCODE2(0, nd); return (ret); } @@ -456,7 +454,7 @@ out: * Update a request cache entry after the rpc has been done */ APPLESTATIC struct nfsrvcache * -nfsrvd_updatecache(struct nfsrv_descript *nd, struct socket *so) +nfsrvd_updatecache(struct nfsrv_descript *nd) { struct nfsrvcache *rp; struct nfsrvcache *retrp = NULL; @@ -549,7 +547,6 @@ nfsrvd_updatecache(struct nfsrv_descript *nd, struct socket *so) } out: - nfsrc_trimcache(nd->nd_sockref, so); NFSEXITCODE2(0, nd); return (retrp); } @@ -575,30 +572,22 @@ nfsrvd_delcache(struct nfsrvcache *rp) /* * Called after nfsrvd_updatecache() once the reply is sent, to update - * the entry for nfsrc_activesocket() and unlock it. The argument is + * the entry's sequence number and unlock it. The argument is * the pointer returned by nfsrvd_updatecache(). */ APPLESTATIC void -nfsrvd_sentcache(struct nfsrvcache *rp, struct socket *so, int err) +nfsrvd_sentcache(struct nfsrvcache *rp, uint32_t seq) { - tcp_seq tmp_seq; - struct mtx *mutex; + struct nfsrchash_bucket *hbp; - mutex = nfsrc_cachemutex(rp); - if (!(rp->rc_flag & RC_LOCKED)) - panic("nfsrvd_sentcache not locked"); - if (!err) { - if ((so->so_proto->pr_domain->dom_family != AF_INET && - so->so_proto->pr_domain->dom_family != AF_INET6) || - so->so_proto->pr_protocol != IPPROTO_TCP) - panic("nfs sent cache"); - if (nfsrv_getsockseqnum(so, &tmp_seq)) { - mtx_lock(mutex); - rp->rc_tcpseq = tmp_seq; - rp->rc_flag |= RC_TCPSEQ; - mtx_unlock(mutex); - } - } + KASSERT(rp->rc_flag & RC_LOCKED, ("nfsrvd_sentcache not locked")); + hbp = NFSRCAHASH(rp->rc_sockref); + mtx_lock(&hbp->mtx); + rp->rc_tcpseq = seq; + if (rp->rc_acked != RC_NO_ACK) + LIST_INSERT_HEAD(&hbp->tbl, rp, rc_ahash); + rp->rc_acked = RC_NO_ACK; + mtx_unlock(&hbp->mtx); nfsrc_unlock(rp); } @@ -790,11 +779,18 @@ nfsrc_wanted(struct nfsrvcache *rp) static void nfsrc_freecache(struct nfsrvcache *rp) { + struct nfsrchash_bucket *hbp; LIST_REMOVE(rp, rc_hash); if (rp->rc_flag & RC_UDP) { TAILQ_REMOVE(&nfsrvudplru, rp, rc_lru); nfsrc_udpcachesize--; + } else if (rp->rc_acked != RC_NO_SEQ) { + hbp = NFSRCAHASH(rp->rc_sockref); + mtx_lock(&hbp->mtx); + if (rp->rc_acked == RC_NO_ACK) + LIST_REMOVE(rp, rc_ahash); + mtx_unlock(&hbp->mtx); } nfsrc_wanted(rp); if (rp->rc_flag & RC_REPMBUF) { @@ -836,14 +832,32 @@ nfsrvd_cleancache(void) /* * The basic rule is to get rid of entries that are expired. */ -static void -nfsrc_trimcache(u_int64_t sockref, struct socket *so) +void +nfsrc_trimcache(u_int64_t sockref, uint32_t snd_una, int final) { + struct nfsrchash_bucket *hbp; struct nfsrvcache *rp, *nextrp; - int i, j, k, tto, time_histo[HISTSIZE]; + int force, lastslot, i, j, k, tto, time_histo[HISTSIZE]; time_t thisstamp; static time_t udp_lasttrim = 0, tcp_lasttrim = 0; - static int onethread = 0; + static int onethread = 0, oneslot = 0; + + if (sockref != 0) { + hbp = NFSRCAHASH(sockref); + mtx_lock(&hbp->mtx); + LIST_FOREACH_SAFE(rp, &hbp->tbl, rc_ahash, nextrp) { + if (sockref == rp->rc_sockref) { + if (SEQ_GEQ(snd_una, rp->rc_tcpseq)) { + rp->rc_acked = RC_ACK; + LIST_REMOVE(rp, rc_ahash); + } else if (final) { + rp->rc_acked = RC_NACK; + LIST_REMOVE(rp, rc_ahash); + } + } + } + mtx_unlock(&hbp->mtx); + } if (atomic_cmpset_acq_int(&onethread, 0, 1) == 0) return; @@ -864,13 +878,28 @@ nfsrc_trimcache(u_int64_t sockref, struct socket *so) } if (NFSD_MONOSEC != tcp_lasttrim || nfsrc_tcpsavedreplies >= nfsrc_tcphighwater) { - for (i = 0; i < HISTSIZE; i++) - time_histo[i] = 0; + force = nfsrc_tcphighwater / 4; + if (force > 0 && + nfsrc_tcpsavedreplies + force >= nfsrc_tcphighwater) { + for (i = 0; i < HISTSIZE; i++) + time_histo[i] = 0; + i = 0; + lastslot = NFSRVCACHE_HASHSIZE; + } else { + force = 0; + if (NFSD_MONOSEC != tcp_lasttrim) { + i = 0; + lastslot = NFSRVCACHE_HASHSIZE - 1; + } else { + lastslot = i = oneslot; + if (++oneslot >= NFSRVCACHE_HASHSIZE) + oneslot = 0; + } + } tto = nfsrc_tcptimeout; - for (i = 0; i < NFSRVCACHE_HASHSIZE; i++) { + tcp_lasttrim = NFSD_MONOSEC; + for (; i <= lastslot; i++) { mtx_lock(&nfsrchash_table[i].mtx); - if (i == 0) - tcp_lasttrim = NFSD_MONOSEC; LIST_FOREACH_SAFE(rp, &nfsrchash_table[i].tbl, rc_hash, nextrp) { if (!(rp->rc_flag & @@ -878,12 +907,12 @@ nfsrc_trimcache(u_int64_t sockref, struct socket *so) && rp->rc_refcnt == 0) { if ((rp->rc_flag & RC_REFCNT) || tcp_lasttrim > rp->rc_timestamp || - nfsrc_activesocket(rp, sockref, so)) { + rp->rc_acked == RC_ACK) { nfsrc_freecache(rp); continue; } - if (nfsrc_tcphighwater == 0) + if (force == 0) continue; /* * The timestamps range from roughly the @@ -903,8 +932,7 @@ nfsrc_trimcache(u_int64_t sockref, struct socket *so) } mtx_unlock(&nfsrchash_table[i].mtx); } - j = nfsrc_tcphighwater / 5; /* 20% of it */ - if (j > 0 && (nfsrc_tcpsavedreplies + j) > nfsrc_tcphighwater) { + if (force) { /* * Trim some more with a smaller timeout of as little * as 20% of nfsrc_tcptimeout to try and get below @@ -913,7 +941,7 @@ nfsrc_trimcache(u_int64_t sockref, struct socket *so) k = 0; for (i = 0; i < (HISTSIZE - 2); i++) { k += time_histo[i]; - if (k > j) + if (k > force) break; } k = tto * (i + 1) / HISTSIZE; @@ -929,8 +957,7 @@ nfsrc_trimcache(u_int64_t sockref, struct socket *so) && rp->rc_refcnt == 0 && ((rp->rc_flag & RC_REFCNT) || thisstamp > rp->rc_timestamp || - nfsrc_activesocket(rp, sockref, - so))) + rp->rc_acked == RC_ACK)) nfsrc_freecache(rp); } mtx_unlock(&nfsrchash_table[i].mtx); @@ -975,28 +1002,6 @@ nfsrvd_derefcache(struct nfsrvcache *rp) } /* - * Check to see if the socket is active. - * Return 1 if the reply has been received/acknowledged by the client, - * 0 otherwise. - * XXX - Uses tcp internals. - */ -static int -nfsrc_activesocket(struct nfsrvcache *rp, u_int64_t cur_sockref, - struct socket *cur_so) -{ - int ret = 0; - - if (!(rp->rc_flag & RC_TCPSEQ)) - return (ret); - /* - * If the sockref is the same, it is the same TCP connection. - */ - if (cur_sockref == rp->rc_sockref) - ret = nfsrv_checksockseqnum(cur_so, rp->rc_tcpseq); - return (ret); -} - -/* * Calculate the length of the mbuf list and a checksum on the first up to * NFSRVCACHE_CHECKLEN bytes. */ diff --git a/sys/fs/nfsserver/nfs_nfsdkrpc.c b/sys/fs/nfsserver/nfs_nfsdkrpc.c index 9185b135..96cb833 100644 --- a/sys/fs/nfsserver/nfs_nfsdkrpc.c +++ b/sys/fs/nfsserver/nfs_nfsdkrpc.c @@ -97,8 +97,8 @@ static int nfs_maxvers = NFS_VER4; SYSCTL_INT(_vfs_nfsd, OID_AUTO, server_max_nfsvers, CTLFLAG_RW, &nfs_maxvers, 0, "The highest version of NFS handled by the server"); -static int nfs_proc(struct nfsrv_descript *, u_int32_t, struct socket *, - u_int64_t, struct nfsrvcache **); +static int nfs_proc(struct nfsrv_descript *, u_int32_t, SVCXPRT *xprt, + struct nfsrvcache **); extern u_long sb_max_adj; extern int newnfs_numnfsd; @@ -251,8 +251,7 @@ nfssvc_program(struct svc_req *rqst, SVCXPRT *xprt) } } - cacherep = nfs_proc(&nd, rqst->rq_xid, xprt->xp_socket, - xprt->xp_sockref, &rp); + cacherep = nfs_proc(&nd, rqst->rq_xid, xprt, &rp); NFSLOCKV4ROOTMUTEX(); nfsv4_relref(&nfsd_suspend_lock); NFSUNLOCKV4ROOTMUTEX(); @@ -287,8 +286,10 @@ nfssvc_program(struct svc_req *rqst, SVCXPRT *xprt) } else if (!svc_sendreply_mbuf(rqst, nd.nd_mreq)) { svcerr_systemerr(rqst); } - if (rp != NULL) - nfsrvd_sentcache(rp, xprt->xp_socket, 0); + if (rp != NULL) { + if (rqst->rq_reply_seq != 0 || SVC_ACK(xprt, NULL)) + nfsrvd_sentcache(rp, rqst->rq_reply_seq); + } svc_freereq(rqst); out: @@ -300,11 +301,12 @@ out: * Return the appropriate cache response. */ static int -nfs_proc(struct nfsrv_descript *nd, u_int32_t xid, struct socket *so, - u_int64_t sockref, struct nfsrvcache **rpp) +nfs_proc(struct nfsrv_descript *nd, u_int32_t xid, SVCXPRT *xprt, + struct nfsrvcache **rpp) { struct thread *td = curthread; int cacherep = RC_DOIT, isdgram; + uint32_t ack; *rpp = NULL; if (nd->nd_nam2 == NULL) { @@ -336,8 +338,11 @@ nfs_proc(struct nfsrv_descript *nd, u_int32_t xid, struct socket *so, nd->nd_flag |= ND_SAMETCPCONN; nd->nd_retxid = xid; nd->nd_tcpconntime = NFSD_MONOSEC; - nd->nd_sockref = sockref; - cacherep = nfsrvd_getcache(nd, so); + nd->nd_sockref = xprt->xp_sockref; + cacherep = nfsrvd_getcache(nd); + ack = 0; + SVC_ACK(xprt, &ack); + nfsrc_trimcache(xprt->xp_sockref, ack, 0); } /* @@ -352,13 +357,23 @@ nfs_proc(struct nfsrv_descript *nd, u_int32_t xid, struct socket *so, cacherep = RC_DROPIT; else cacherep = RC_REPLY; - *rpp = nfsrvd_updatecache(nd, so); + *rpp = nfsrvd_updatecache(nd); } NFSEXITCODE2(0, nd); return (cacherep); } +static void +nfssvc_loss(SVCXPRT *xprt) +{ + uint32_t ack; + + ack = 0; + SVC_ACK(xprt, &ack); + nfsrc_trimcache(xprt->xp_sockref, ack, 1); +} + /* * Adds a socket to the list for servicing by nfsds. */ @@ -399,6 +414,8 @@ nfsrvd_addsock(struct file *fp) if (nfs_maxvers >= NFS_VER4) svc_reg(xprt, NFS_PROG, NFS_VER4, nfssvc_program, NULL); + if (so->so_type == SOCK_STREAM) + svc_loss_reg(xprt, nfssvc_loss); SVC_RELEASE(xprt); } diff --git a/sys/fs/nfsserver/nfs_nfsdport.c b/sys/fs/nfsserver/nfs_nfsdport.c index ef26f64..21d302f 100644 --- a/sys/fs/nfsserver/nfs_nfsdport.c +++ b/sys/fs/nfsserver/nfs_nfsdport.c @@ -61,6 +61,7 @@ extern struct nfsv4lock nfsd_suspend_lock; struct vfsoptlist nfsv4root_opt, nfsv4root_newopt; NFSDLOCKMUTEX; struct nfsrchash_bucket nfsrchash_table[NFSRVCACHE_HASHSIZE]; +struct nfsrchash_bucket nfsrcahash_table[NFSRVCACHE_HASHSIZE]; struct mtx nfsrc_udpmtx; struct mtx nfs_v4root_mutex; struct nfsrvfh nfs_rootfh, nfs_pubfh; @@ -2881,40 +2882,6 @@ out: } /* - * Get the tcp socket sequence numbers we need. - * (Maybe this should be moved to the tcp sources?) - */ -int -nfsrv_getsocksndseq(struct socket *so, tcp_seq *maxp, tcp_seq *unap) -{ - struct inpcb *inp; - struct tcpcb *tp; - int error = 0; - - inp = sotoinpcb(so); - KASSERT(inp != NULL, ("nfsrv_getsocksndseq: inp == NULL")); - INP_RLOCK(inp); - if (inp->inp_flags & (INP_TIMEWAIT | INP_DROPPED)) { - INP_RUNLOCK(inp); - error = EPIPE; - goto out; - } - tp = intotcpcb(inp); - if (tp->t_state != TCPS_ESTABLISHED) { - INP_RUNLOCK(inp); - error = EPIPE; - goto out; - } - *maxp = tp->snd_max; - *unap = tp->snd_una; - INP_RUNLOCK(inp); - -out: - NFSEXITCODE(error); - return (error); -} - -/* * This function needs to test to see if the system is near its limit * for memory allocation via malloc() or mget() and return True iff * either of these resources are near their limit. @@ -3340,6 +3307,11 @@ nfsd_modevent(module_t mod, int type, void *data) i); mtx_init(&nfsrchash_table[i].mtx, nfsrchash_table[i].lock_name, NULL, MTX_DEF); + snprintf(nfsrcahash_table[i].lock_name, + sizeof(nfsrcahash_table[i].lock_name), "nfsrc_tcpa%d", + i); + mtx_init(&nfsrcahash_table[i].mtx, + nfsrcahash_table[i].lock_name, NULL, MTX_DEF); } mtx_init(&nfsrc_udpmtx, "nfs_udpcache_mutex", NULL, MTX_DEF); mtx_init(&nfs_v4root_mutex, "nfs_v4root_mutex", NULL, MTX_DEF); @@ -3385,8 +3357,10 @@ nfsd_modevent(module_t mod, int type, void *data) svcpool_destroy(nfsrvd_pool); /* and get rid of the locks */ - for (i = 0; i < NFSRVCACHE_HASHSIZE; i++) + for (i = 0; i < NFSRVCACHE_HASHSIZE; i++) { mtx_destroy(&nfsrchash_table[i].mtx); + mtx_destroy(&nfsrcahash_table[i].mtx); + } mtx_destroy(&nfsrc_udpmtx); mtx_destroy(&nfs_v4root_mutex); mtx_destroy(&nfsv4root_mnt.mnt_mtx); diff --git a/sys/fs/nfsserver/nfs_nfsdsubs.c b/sys/fs/nfsserver/nfs_nfsdsubs.c index 5c8ecec..404ad87 100644 --- a/sys/fs/nfsserver/nfs_nfsdsubs.c +++ b/sys/fs/nfsserver/nfs_nfsdsubs.c @@ -1987,47 +1987,6 @@ nfsmout: return (error); } -/* - * Check the tcp socket sequence number has been acknowledged. - */ -int -nfsrv_checksockseqnum(struct socket *so, tcp_seq tcpseqval) -{ - tcp_seq maxseq, unaseq; - int error, ret; - - error = nfsrv_getsocksndseq(so, &maxseq, &unaseq); - if (error) - return (0); - ret = SEQ_GEQ(unaseq, tcpseqval); - return (ret); -} - -/* - * Get the tcp sequence number to be acknowledged. - */ -int -nfsrv_getsockseqnum(struct socket *so, tcp_seq *tcpseqp) -{ - tcp_seq maxseq, unaseq; - u_int sbcc; - int error; - - sbcc = so->so_snd.sb_cc; - error = nfsrv_getsocksndseq(so, &maxseq, &unaseq); - if (error) - return (0); - /* - * Set the seq# to a value that will - * be at least the end of the reply. - * When this sequence# is acknowledged - * by the client, the client has received - * the reply. - */ - *tcpseqp = sbcc + maxseq; - return (1); -} - void nfsd_init(void) { diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c index b42bafb..bc82cd4 100644 --- a/sys/rpc/svc.c +++ b/sys/rpc/svc.c @@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$"); #include <sys/queue.h> #include <sys/socketvar.h> #include <sys/systm.h> +#include <sys/sx.h> #include <sys/ucred.h> #include <rpc/rpc.h> @@ -93,6 +94,7 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) TAILQ_INIT(&pool->sp_xlist); TAILQ_INIT(&pool->sp_active); TAILQ_INIT(&pool->sp_callouts); + TAILQ_INIT(&pool->sp_lcallouts); LIST_INIT(&pool->sp_threads); LIST_INIT(&pool->sp_idlethreads); pool->sp_minthreads = 1; @@ -158,6 +160,7 @@ svcpool_destroy(SVCPOOL *pool) { SVCXPRT *xprt, *nxprt; struct svc_callout *s; + struct svc_loss_callout *sl; struct svcxprt_list cleanup; TAILQ_INIT(&cleanup); @@ -169,12 +172,16 @@ svcpool_destroy(SVCPOOL *pool) TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); } - while (TAILQ_FIRST(&pool->sp_callouts)) { - s = TAILQ_FIRST(&pool->sp_callouts); + while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { mtx_unlock(&pool->sp_lock); svc_unreg(pool, s->sc_prog, s->sc_vers); mtx_lock(&pool->sp_lock); } + while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { + mtx_unlock(&pool->sp_lock); + svc_loss_unreg(pool, sl->slc_dispatch); + mtx_lock(&pool->sp_lock); + } mtx_unlock(&pool->sp_lock); TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { @@ -511,6 +518,55 @@ svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) mtx_unlock(&pool->sp_lock); } +/* + * Add a service connection loss program to the callout list. + * The dispatch routine will be called when some port in ths pool die. + */ +bool_t +svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) +{ + SVCPOOL *pool = xprt->xp_pool; + struct svc_loss_callout *s; + + mtx_lock(&pool->sp_lock); + TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { + if (s->slc_dispatch == dispatch) + break; + } + if (s != NULL) { + mtx_unlock(&pool->sp_lock); + return (TRUE); + } + s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); + if (s == NULL) { + mtx_unlock(&pool->sp_lock); + return (FALSE); + } + s->slc_dispatch = dispatch; + TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); + mtx_unlock(&pool->sp_lock); + return (TRUE); +} + +/* + * Remove a service connection loss program from the callout list. + */ +void +svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) +{ + struct svc_loss_callout *s; + + mtx_lock(&pool->sp_lock); + TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { + if (s->slc_dispatch == dispatch) { + TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); + free(s, M_RPC); + break; + } + } + mtx_unlock(&pool->sp_lock); +} + /* ********************** CALLOUT list related stuff ************* */ /* @@ -554,7 +610,7 @@ svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) return (FALSE); - ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); + ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); if (rqstp->rq_addr) { free(rqstp->rq_addr, M_SONAME); rqstp->rq_addr = NULL; @@ -803,6 +859,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) struct svc_req *r; struct rpc_msg msg; struct mbuf *args; + struct svc_loss_callout *s; enum xprt_stat stat; /* now receive msgs from xprtprt (support batch calls) */ @@ -831,7 +888,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) break; case RS_DONE: SVC_REPLY(xprt, &repmsg, r->rq_addr, - repbody); + repbody, &r->rq_reply_seq); if (r->rq_addr) { free(r->rq_addr, M_SONAME); r->rq_addr = NULL; @@ -881,6 +938,8 @@ call_done: r = NULL; } if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { + TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) + (*s->slc_dispatch)(xprt); xprt_unregister(xprt); } diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h index 7d9428e..bcd2d79 100644 --- a/sys/rpc/svc.h +++ b/sys/rpc/svc.h @@ -103,9 +103,11 @@ struct xp_ops { struct sockaddr **, struct mbuf **); /* get transport status */ enum xprt_stat (*xp_stat)(struct __rpc_svcxprt *); + /* get transport acknowledge sequence */ + bool_t (*xp_ack)(struct __rpc_svcxprt *, uint32_t *); /* send reply */ bool_t (*xp_reply)(struct __rpc_svcxprt *, struct rpc_msg *, - struct sockaddr *, struct mbuf *); + struct sockaddr *, struct mbuf *, uint32_t *); /* destroy this struct */ void (*xp_destroy)(struct __rpc_svcxprt *); /* catch-all function */ @@ -166,6 +168,8 @@ typedef struct __rpc_svcxprt { time_t xp_lastactive; /* time of last RPC */ u_int64_t xp_sockref; /* set by nfsv4 to identify socket */ int xp_upcallset; /* socket upcall is set up */ + uint32_t xp_snd_cnt; /* # of bytes sent to socket */ + struct sx xp_snd_lock; /* protects xp_snd_cnt & sb_cc */ #else int xp_fd; u_short xp_port; /* associated port number */ @@ -230,6 +234,17 @@ struct svc_callout { }; TAILQ_HEAD(svc_callout_list, svc_callout); +/* + * The services connection loss list + * The dispatch routine takes request structs and runs the + * apropriate procedure. + */ +struct svc_loss_callout { + TAILQ_ENTRY(svc_loss_callout) slc_link; + void (*slc_dispatch)(SVCXPRT *); +}; +TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout); + struct __rpc_svcthread; /* @@ -253,6 +268,7 @@ struct svc_req { void *rq_p1; /* application workspace */ int rq_p2; /* application workspace */ uint64_t rq_p3; /* application workspace */ + uint32_t rq_reply_seq; /* reply socket sequence # */ char rq_credarea[3*MAX_AUTH_BYTES]; }; STAILQ_HEAD(svc_reqlist, svc_req); @@ -318,6 +334,7 @@ typedef struct __rpc_svcpool { struct svcxprt_list sp_xlist; /* all transports in the pool */ struct svcxprt_list sp_active; /* transports needing service */ struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */ + struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */ struct svcthread_list sp_threads; /* service threads */ struct svcthread_list sp_idlethreads; /* idle service threads */ int sp_minthreads; /* minimum service thread count */ @@ -393,8 +410,12 @@ struct svc_req { #define SVC_STAT(xprt) \ (*(xprt)->xp_ops->xp_stat)(xprt) -#define SVC_REPLY(xprt, msg, addr, m) \ - (*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m)) +#define SVC_ACK(xprt, ack) \ + ((xprt)->xp_ops->xp_stat == NULL ? FALSE : \ + ((ack) == NULL ? TRUE : (*(xprt)->xp_ops->xp_ack)((xprt), (ack)))) + +#define SVC_REPLY(xprt, msg, addr, m, seq) \ + (*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m), (seq)) #define SVC_DESTROY(xprt) \ (*(xprt)->xp_ops->xp_destroy)(xprt) @@ -496,6 +517,30 @@ extern void svc_unreg(const rpcprog_t, const rpcvers_t); __END_DECLS /* + * Service connection loss registration + * + * svc_loss_reg(xprt, dispatch) + * const SVCXPRT *xprt; + * const void (*dispatch)(); + */ + +__BEGIN_DECLS +extern bool_t svc_loss_reg(SVCXPRT *, void (*)(SVCXPRT *)); +__END_DECLS + +/* + * Service connection loss un-registration + * + * svc_loss_unreg(xprt, dispatch) + * const SVCXPRT *xprt; + * const void (*dispatch)(); + */ + +__BEGIN_DECLS +extern void svc_loss_unreg(SVCPOOL *, void (*)(SVCXPRT *)); +__END_DECLS + +/* * Transport registration. * * xprt_register(xprt) diff --git a/sys/rpc/svc_dg.c b/sys/rpc/svc_dg.c index 79f8429..1c530bd 100644 --- a/sys/rpc/svc_dg.c +++ b/sys/rpc/svc_dg.c @@ -66,7 +66,7 @@ static enum xprt_stat svc_dg_stat(SVCXPRT *); static bool_t svc_dg_recv(SVCXPRT *, struct rpc_msg *, struct sockaddr **, struct mbuf **); static bool_t svc_dg_reply(SVCXPRT *, struct rpc_msg *, - struct sockaddr *, struct mbuf *); + struct sockaddr *, struct mbuf *, uint32_t *); static void svc_dg_destroy(SVCXPRT *); static bool_t svc_dg_control(SVCXPRT *, const u_int, void *); static int svc_dg_soupcall(struct socket *so, void *arg, int waitflag); @@ -230,7 +230,7 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_msg *msg, static bool_t svc_dg_reply(SVCXPRT *xprt, struct rpc_msg *msg, - struct sockaddr *addr, struct mbuf *m) + struct sockaddr *addr, struct mbuf *m, uint32_t *seq) { XDR xdrs; struct mbuf *mrep; diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c index 3140915..c10224e 100644 --- a/sys/rpc/svc_vc.c +++ b/sys/rpc/svc_vc.c @@ -76,10 +76,11 @@ static void svc_vc_rendezvous_destroy(SVCXPRT *); static bool_t svc_vc_null(void); static void svc_vc_destroy(SVCXPRT *); static enum xprt_stat svc_vc_stat(SVCXPRT *); +static bool_t svc_vc_ack(SVCXPRT *, uint32_t *); static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *, struct sockaddr **, struct mbuf **); static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *, - struct sockaddr *, struct mbuf *); + struct sockaddr *, struct mbuf *, uint32_t *seq); static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in); static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, void *in); @@ -88,7 +89,7 @@ static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *); static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *, struct sockaddr **, struct mbuf **); static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *, - struct sockaddr *, struct mbuf *); + struct sockaddr *, struct mbuf *, uint32_t *); static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in); static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so, @@ -100,7 +101,7 @@ static struct xp_ops svc_vc_rendezvous_ops = { .xp_recv = svc_vc_rendezvous_recv, .xp_stat = svc_vc_rendezvous_stat, .xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *, - struct sockaddr *, struct mbuf *))svc_vc_null, + struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null, .xp_destroy = svc_vc_rendezvous_destroy, .xp_control = svc_vc_rendezvous_control }; @@ -108,6 +109,7 @@ static struct xp_ops svc_vc_rendezvous_ops = { static struct xp_ops svc_vc_ops = { .xp_recv = svc_vc_recv, .xp_stat = svc_vc_stat, + .xp_ack = svc_vc_ack, .xp_reply = svc_vc_reply, .xp_destroy = svc_vc_destroy, .xp_control = svc_vc_control @@ -159,6 +161,7 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, xprt = svc_xprt_alloc(); sx_init(&xprt->xp_lock, "xprt->xp_lock"); + sx_init(&xprt->xp_snd_lock, "xprt->xp_snd_lock"); xprt->xp_pool = pool; xprt->xp_socket = so; xprt->xp_p1 = NULL; @@ -184,8 +187,11 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, return (xprt); cleanup_svc_vc_create: - if (xprt) + if (xprt) { + sx_destroy(&xprt->xp_snd_lock); + sx_destroy(&xprt->xp_lock); svc_xprt_free(xprt); + } return (NULL); } @@ -231,6 +237,7 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) xprt = svc_xprt_alloc(); sx_init(&xprt->xp_lock, "xprt->xp_lock"); + sx_init(&xprt->xp_snd_lock, "xprt->xp_snd_lock"); xprt->xp_pool = pool; xprt->xp_socket = so; xprt->xp_p1 = cd; @@ -270,7 +277,9 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) return (xprt); cleanup_svc_vc_create: if (xprt) { - mem_free(xprt, sizeof(*xprt)); + sx_destroy(&xprt->xp_snd_lock); + sx_destroy(&xprt->xp_lock); + svc_xprt_free(xprt); } if (cd) mem_free(cd, sizeof(*cd)); @@ -291,6 +300,7 @@ svc_vc_create_backchannel(SVCPOOL *pool) xprt = svc_xprt_alloc(); sx_init(&xprt->xp_lock, "xprt->xp_lock"); + sx_init(&xprt->xp_snd_lock, "xprt->xp_snd_lock"); xprt->xp_pool = pool; xprt->xp_socket = NULL; xprt->xp_p1 = cd; @@ -451,7 +461,6 @@ svc_vc_destroy_common(SVCXPRT *xprt) } SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); - sx_destroy(&xprt->xp_lock); if (xprt->xp_socket) (void)soclose(xprt->xp_socket); @@ -537,6 +546,16 @@ svc_vc_stat(SVCXPRT *xprt) return (XPRT_IDLE); } +static bool_t +svc_vc_ack(SVCXPRT *xprt, uint32_t *ack) +{ + + sx_slock(&xprt->xp_snd_lock); + *ack = xprt->xp_snd_cnt - xprt->xp_socket->so_snd.sb_cc; + sx_sunlock(&xprt->xp_snd_lock); + return (TRUE); +} + static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *xprt) { @@ -785,12 +804,12 @@ svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg, static bool_t svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, - struct sockaddr *addr, struct mbuf *m) + struct sockaddr *addr, struct mbuf *m, uint32_t *seq) { XDR xdrs; struct mbuf *mrep; bool_t stat = TRUE; - int error; + int error, len; /* * Leave space for record mark. @@ -817,14 +836,19 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, * Prepend a record marker containing the reply length. */ M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); + len = mrep->m_pkthdr.len; *mtod(mrep, uint32_t *) = - htonl(0x80000000 | (mrep->m_pkthdr.len - - sizeof(uint32_t))); + htonl(0x80000000 | (len - sizeof(uint32_t))); + sx_xlock(&xprt->xp_snd_lock); error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL, 0, curthread); if (!error) { + xprt->xp_snd_cnt += len; + if (seq) + *seq = xprt->xp_snd_cnt; stat = TRUE; } + sx_xunlock(&xprt->xp_snd_lock); } else { m_freem(mrep); } @@ -837,7 +861,7 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, static bool_t svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg, - struct sockaddr *addr, struct mbuf *m) + struct sockaddr *addr, struct mbuf *m, uint32_t *seq) { struct ct_data *ct; XDR xdrs; |