summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sys/fs/nfs/nfs_var.h11
-rw-r--r--sys/fs/nfs/nfsrvcache.h10
-rw-r--r--sys/fs/nfsserver/nfs_nfsdcache.c135
-rw-r--r--sys/fs/nfsserver/nfs_nfsdkrpc.c39
-rw-r--r--sys/fs/nfsserver/nfs_nfsdport.c44
-rw-r--r--sys/fs/nfsserver/nfs_nfsdsubs.c41
-rw-r--r--sys/rpc/svc.c67
-rw-r--r--sys/rpc/svc.h55
-rw-r--r--sys/rpc/svc_dg.c4
-rw-r--r--sys/rpc/svc_vc.c42
10 files changed, 267 insertions, 181 deletions
diff --git a/sys/fs/nfs/nfs_var.h b/sys/fs/nfs/nfs_var.h
index 7fbabaa..895829d 100644
--- a/sys/fs/nfs/nfs_var.h
+++ b/sys/fs/nfs/nfs_var.h
@@ -218,14 +218,14 @@ void nfsrvd_dorpc(struct nfsrv_descript *, int, NFSPROC_T *);
/* nfs_nfsdcache.c */
void nfsrvd_initcache(void);
-int nfsrvd_getcache(struct nfsrv_descript *, struct socket *);
-struct nfsrvcache *nfsrvd_updatecache(struct nfsrv_descript *,
- struct socket *);
-void nfsrvd_sentcache(struct nfsrvcache *, struct socket *, int);
+int nfsrvd_getcache(struct nfsrv_descript *);
+struct nfsrvcache *nfsrvd_updatecache(struct nfsrv_descript *);
+void nfsrvd_sentcache(struct nfsrvcache *, int, uint32_t);
void nfsrvd_cleancache(void);
void nfsrvd_refcache(struct nfsrvcache *);
void nfsrvd_derefcache(struct nfsrvcache *);
void nfsrvd_delcache(struct nfsrvcache *);
+void nfsrc_trimcache(uint64_t, uint32_t, int);
/* nfs_commonsubs.c */
void newnfs_init(void);
@@ -327,9 +327,6 @@ int nfsd_checkrootexp(struct nfsrv_descript *);
void nfscl_retopts(struct nfsmount *, char *, size_t);
/* nfs_commonport.c */
-int nfsrv_checksockseqnum(struct socket *, tcp_seq);
-int nfsrv_getsockseqnum(struct socket *, tcp_seq *);
-int nfsrv_getsocksndseq(struct socket *, tcp_seq *, tcp_seq *);
int nfsrv_lookupfilename(struct nameidata *, char *, NFSPROC_T *);
void nfsrv_object_create(vnode_t, NFSPROC_T *);
int nfsrv_mallocmget_limit(void);
diff --git a/sys/fs/nfs/nfsrvcache.h b/sys/fs/nfs/nfsrvcache.h
index 5c9dc57..7db3035 100644
--- a/sys/fs/nfs/nfsrvcache.h
+++ b/sys/fs/nfs/nfsrvcache.h
@@ -46,6 +46,7 @@
/* Cache table entry. */
struct nfsrvcache {
LIST_ENTRY(nfsrvcache) rc_hash; /* Hash chain */
+ LIST_ENTRY(nfsrvcache) rc_ahash; /* ACK hash chain */
TAILQ_ENTRY(nfsrvcache) rc_lru; /* UDP lru chain */
u_int32_t rc_xid; /* rpc id number */
time_t rc_timestamp; /* Time done */
@@ -64,6 +65,7 @@ struct nfsrvcache {
int16_t refcnt;
u_int16_t cksum;
time_t cachetime;
+ int acked;
} ot;
} rc_un2;
u_int16_t rc_proc; /* rpc proc number */
@@ -81,6 +83,13 @@ struct nfsrvcache {
#define rc_reqlen rc_un2.ot.len
#define rc_cksum rc_un2.ot.cksum
#define rc_cachetime rc_un2.ot.cachetime
+#define rc_acked rc_un2.ot.acked
+
+/* TCP ACK values */
+#define RC_NO_SEQ 0
+#define RC_NO_ACK 1
+#define RC_ACK 2
+#define RC_NACK 3
/* Return values */
#define RC_DROPIT 0
@@ -95,7 +104,6 @@ struct nfsrvcache {
#define RC_UDP 0x0010
#define RC_INETIPV6 0x0020
#define RC_INPROG 0x0040
-#define RC_TCPSEQ 0x0080
#define RC_NFSV2 0x0100
#define RC_NFSV3 0x0200
#define RC_NFSV4 0x0400
diff --git a/sys/fs/nfsserver/nfs_nfsdcache.c b/sys/fs/nfsserver/nfs_nfsdcache.c
index 04f7231..2c4a654 100644
--- a/sys/fs/nfsserver/nfs_nfsdcache.c
+++ b/sys/fs/nfsserver/nfs_nfsdcache.c
@@ -162,6 +162,7 @@ __FBSDID("$FreeBSD$");
extern struct nfsstats newnfsstats;
extern struct mtx nfsrc_udpmtx;
extern struct nfsrchash_bucket nfsrchash_table[NFSRVCACHE_HASHSIZE];
+extern struct nfsrchash_bucket nfsrcahash_table[NFSRVCACHE_HASHSIZE];
int nfsrc_floodlevel = NFSRVCACHE_FLOODLEVEL, nfsrc_tcpsavedreplies = 0;
#endif /* !APPLEKEXT */
@@ -238,6 +239,7 @@ static int newnfsv2_procid[NFS_V3NPROCS] = {
(&nfsrvudphashtbl[nfsrc_hash(xid)])
#define NFSRCHASH(xid) \
(&nfsrchash_table[nfsrc_hash(xid)].tbl)
+#define NFSRCAHASH(xid) (&nfsrcahash_table[nfsrc_hash(xid)])
#define TRUE 1
#define FALSE 0
#define NFSRVCACHE_CHECKLEN 100
@@ -281,9 +283,6 @@ static void nfsrc_lock(struct nfsrvcache *rp);
static void nfsrc_unlock(struct nfsrvcache *rp);
static void nfsrc_wanted(struct nfsrvcache *rp);
static void nfsrc_freecache(struct nfsrvcache *rp);
-static void nfsrc_trimcache(u_int64_t, struct socket *);
-static int nfsrc_activesocket(struct nfsrvcache *rp, u_int64_t,
- struct socket *);
static int nfsrc_getlenandcksum(mbuf_t m1, u_int16_t *cksum);
static void nfsrc_marksametcpconn(u_int64_t);
@@ -314,6 +313,7 @@ nfsrvd_initcache(void)
for (i = 0; i < NFSRVCACHE_HASHSIZE; i++) {
LIST_INIT(&nfsrvudphashtbl[i]);
LIST_INIT(&nfsrchash_table[i].tbl);
+ LIST_INIT(&nfsrcahash_table[i].tbl);
}
TAILQ_INIT(&nfsrvudplru);
nfsrc_tcpsavedreplies = 0;
@@ -325,10 +325,9 @@ nfsrvd_initcache(void)
/*
* Get a cache entry for this request. Basically just malloc a new one
* and then call nfsrc_getudp() or nfsrc_gettcp() to do the rest.
- * Call nfsrc_trimcache() to clean up the cache before returning.
*/
APPLESTATIC int
-nfsrvd_getcache(struct nfsrv_descript *nd, struct socket *so)
+nfsrvd_getcache(struct nfsrv_descript *nd)
{
struct nfsrvcache *newrp;
int ret;
@@ -356,7 +355,6 @@ nfsrvd_getcache(struct nfsrv_descript *nd, struct socket *so)
} else {
ret = nfsrc_gettcp(nd, newrp);
}
- nfsrc_trimcache(nd->nd_sockref, so);
NFSEXITCODE2(0, nd);
return (ret);
}
@@ -456,7 +454,7 @@ out:
* Update a request cache entry after the rpc has been done
*/
APPLESTATIC struct nfsrvcache *
-nfsrvd_updatecache(struct nfsrv_descript *nd, struct socket *so)
+nfsrvd_updatecache(struct nfsrv_descript *nd)
{
struct nfsrvcache *rp;
struct nfsrvcache *retrp = NULL;
@@ -549,7 +547,6 @@ nfsrvd_updatecache(struct nfsrv_descript *nd, struct socket *so)
}
out:
- nfsrc_trimcache(nd->nd_sockref, so);
NFSEXITCODE2(0, nd);
return (retrp);
}
@@ -575,29 +572,23 @@ nfsrvd_delcache(struct nfsrvcache *rp)
/*
* Called after nfsrvd_updatecache() once the reply is sent, to update
- * the entry for nfsrc_activesocket() and unlock it. The argument is
+ * the entry's sequence number and unlock it. The argument is
* the pointer returned by nfsrvd_updatecache().
*/
APPLESTATIC void
-nfsrvd_sentcache(struct nfsrvcache *rp, struct socket *so, int err)
+nfsrvd_sentcache(struct nfsrvcache *rp, int have_seq, uint32_t seq)
{
- tcp_seq tmp_seq;
- struct mtx *mutex;
+ struct nfsrchash_bucket *hbp;
- mutex = nfsrc_cachemutex(rp);
- if (!(rp->rc_flag & RC_LOCKED))
- panic("nfsrvd_sentcache not locked");
- if (!err) {
- if ((so->so_proto->pr_domain->dom_family != AF_INET &&
- so->so_proto->pr_domain->dom_family != AF_INET6) ||
- so->so_proto->pr_protocol != IPPROTO_TCP)
- panic("nfs sent cache");
- if (nfsrv_getsockseqnum(so, &tmp_seq)) {
- mtx_lock(mutex);
- rp->rc_tcpseq = tmp_seq;
- rp->rc_flag |= RC_TCPSEQ;
- mtx_unlock(mutex);
- }
+ KASSERT(rp->rc_flag & RC_LOCKED, ("nfsrvd_sentcache not locked"));
+ if (have_seq) {
+ hbp = NFSRCAHASH(rp->rc_sockref);
+ mtx_lock(&hbp->mtx);
+ rp->rc_tcpseq = seq;
+ if (rp->rc_acked != RC_NO_ACK)
+ LIST_INSERT_HEAD(&hbp->tbl, rp, rc_ahash);
+ rp->rc_acked = RC_NO_ACK;
+ mtx_unlock(&hbp->mtx);
}
nfsrc_unlock(rp);
}
@@ -790,11 +781,18 @@ nfsrc_wanted(struct nfsrvcache *rp)
static void
nfsrc_freecache(struct nfsrvcache *rp)
{
+ struct nfsrchash_bucket *hbp;
LIST_REMOVE(rp, rc_hash);
if (rp->rc_flag & RC_UDP) {
TAILQ_REMOVE(&nfsrvudplru, rp, rc_lru);
nfsrc_udpcachesize--;
+ } else if (rp->rc_acked != RC_NO_SEQ) {
+ hbp = NFSRCAHASH(rp->rc_sockref);
+ mtx_lock(&hbp->mtx);
+ if (rp->rc_acked == RC_NO_ACK)
+ LIST_REMOVE(rp, rc_ahash);
+ mtx_unlock(&hbp->mtx);
}
nfsrc_wanted(rp);
if (rp->rc_flag & RC_REPMBUF) {
@@ -836,14 +834,32 @@ nfsrvd_cleancache(void)
/*
* The basic rule is to get rid of entries that are expired.
*/
-static void
-nfsrc_trimcache(u_int64_t sockref, struct socket *so)
+void
+nfsrc_trimcache(u_int64_t sockref, uint32_t snd_una, int final)
{
+ struct nfsrchash_bucket *hbp;
struct nfsrvcache *rp, *nextrp;
- int i, j, k, tto, time_histo[HISTSIZE];
+ int force, lastslot, i, j, k, tto, time_histo[HISTSIZE];
time_t thisstamp;
static time_t udp_lasttrim = 0, tcp_lasttrim = 0;
- static int onethread = 0;
+ static int onethread = 0, oneslot = 0;
+
+ if (sockref != 0) {
+ hbp = NFSRCAHASH(sockref);
+ mtx_lock(&hbp->mtx);
+ LIST_FOREACH_SAFE(rp, &hbp->tbl, rc_ahash, nextrp) {
+ if (sockref == rp->rc_sockref) {
+ if (SEQ_GEQ(snd_una, rp->rc_tcpseq)) {
+ rp->rc_acked = RC_ACK;
+ LIST_REMOVE(rp, rc_ahash);
+ } else if (final) {
+ rp->rc_acked = RC_NACK;
+ LIST_REMOVE(rp, rc_ahash);
+ }
+ }
+ }
+ mtx_unlock(&hbp->mtx);
+ }
if (atomic_cmpset_acq_int(&onethread, 0, 1) == 0)
return;
@@ -864,13 +880,28 @@ nfsrc_trimcache(u_int64_t sockref, struct socket *so)
}
if (NFSD_MONOSEC != tcp_lasttrim ||
nfsrc_tcpsavedreplies >= nfsrc_tcphighwater) {
- for (i = 0; i < HISTSIZE; i++)
- time_histo[i] = 0;
+ force = nfsrc_tcphighwater / 4;
+ if (force > 0 &&
+ nfsrc_tcpsavedreplies + force >= nfsrc_tcphighwater) {
+ for (i = 0; i < HISTSIZE; i++)
+ time_histo[i] = 0;
+ i = 0;
+ lastslot = NFSRVCACHE_HASHSIZE - 1;
+ } else {
+ force = 0;
+ if (NFSD_MONOSEC != tcp_lasttrim) {
+ i = 0;
+ lastslot = NFSRVCACHE_HASHSIZE - 1;
+ } else {
+ lastslot = i = oneslot;
+ if (++oneslot >= NFSRVCACHE_HASHSIZE)
+ oneslot = 0;
+ }
+ }
tto = nfsrc_tcptimeout;
- for (i = 0; i < NFSRVCACHE_HASHSIZE; i++) {
+ tcp_lasttrim = NFSD_MONOSEC;
+ for (; i <= lastslot; i++) {
mtx_lock(&nfsrchash_table[i].mtx);
- if (i == 0)
- tcp_lasttrim = NFSD_MONOSEC;
LIST_FOREACH_SAFE(rp, &nfsrchash_table[i].tbl, rc_hash,
nextrp) {
if (!(rp->rc_flag &
@@ -878,12 +909,12 @@ nfsrc_trimcache(u_int64_t sockref, struct socket *so)
&& rp->rc_refcnt == 0) {
if ((rp->rc_flag & RC_REFCNT) ||
tcp_lasttrim > rp->rc_timestamp ||
- nfsrc_activesocket(rp, sockref, so)) {
+ rp->rc_acked == RC_ACK) {
nfsrc_freecache(rp);
continue;
}
- if (nfsrc_tcphighwater == 0)
+ if (force == 0)
continue;
/*
* The timestamps range from roughly the
@@ -903,8 +934,7 @@ nfsrc_trimcache(u_int64_t sockref, struct socket *so)
}
mtx_unlock(&nfsrchash_table[i].mtx);
}
- j = nfsrc_tcphighwater / 5; /* 20% of it */
- if (j > 0 && (nfsrc_tcpsavedreplies + j) > nfsrc_tcphighwater) {
+ if (force) {
/*
* Trim some more with a smaller timeout of as little
* as 20% of nfsrc_tcptimeout to try and get below
@@ -913,7 +943,7 @@ nfsrc_trimcache(u_int64_t sockref, struct socket *so)
k = 0;
for (i = 0; i < (HISTSIZE - 2); i++) {
k += time_histo[i];
- if (k > j)
+ if (k > force)
break;
}
k = tto * (i + 1) / HISTSIZE;
@@ -929,8 +959,7 @@ nfsrc_trimcache(u_int64_t sockref, struct socket *so)
&& rp->rc_refcnt == 0
&& ((rp->rc_flag & RC_REFCNT) ||
thisstamp > rp->rc_timestamp ||
- nfsrc_activesocket(rp, sockref,
- so)))
+ rp->rc_acked == RC_ACK))
nfsrc_freecache(rp);
}
mtx_unlock(&nfsrchash_table[i].mtx);
@@ -975,28 +1004,6 @@ nfsrvd_derefcache(struct nfsrvcache *rp)
}
/*
- * Check to see if the socket is active.
- * Return 1 if the reply has been received/acknowledged by the client,
- * 0 otherwise.
- * XXX - Uses tcp internals.
- */
-static int
-nfsrc_activesocket(struct nfsrvcache *rp, u_int64_t cur_sockref,
- struct socket *cur_so)
-{
- int ret = 0;
-
- if (!(rp->rc_flag & RC_TCPSEQ))
- return (ret);
- /*
- * If the sockref is the same, it is the same TCP connection.
- */
- if (cur_sockref == rp->rc_sockref)
- ret = nfsrv_checksockseqnum(cur_so, rp->rc_tcpseq);
- return (ret);
-}
-
-/*
* Calculate the length of the mbuf list and a checksum on the first up to
* NFSRVCACHE_CHECKLEN bytes.
*/
diff --git a/sys/fs/nfsserver/nfs_nfsdkrpc.c b/sys/fs/nfsserver/nfs_nfsdkrpc.c
index 9185b135..9649093 100644
--- a/sys/fs/nfsserver/nfs_nfsdkrpc.c
+++ b/sys/fs/nfsserver/nfs_nfsdkrpc.c
@@ -97,8 +97,8 @@ static int nfs_maxvers = NFS_VER4;
SYSCTL_INT(_vfs_nfsd, OID_AUTO, server_max_nfsvers, CTLFLAG_RW,
&nfs_maxvers, 0, "The highest version of NFS handled by the server");
-static int nfs_proc(struct nfsrv_descript *, u_int32_t, struct socket *,
- u_int64_t, struct nfsrvcache **);
+static int nfs_proc(struct nfsrv_descript *, u_int32_t, SVCXPRT *xprt,
+ struct nfsrvcache **);
extern u_long sb_max_adj;
extern int newnfs_numnfsd;
@@ -251,8 +251,7 @@ nfssvc_program(struct svc_req *rqst, SVCXPRT *xprt)
}
}
- cacherep = nfs_proc(&nd, rqst->rq_xid, xprt->xp_socket,
- xprt->xp_sockref, &rp);
+ cacherep = nfs_proc(&nd, rqst->rq_xid, xprt, &rp);
NFSLOCKV4ROOTMUTEX();
nfsv4_relref(&nfsd_suspend_lock);
NFSUNLOCKV4ROOTMUTEX();
@@ -287,8 +286,10 @@ nfssvc_program(struct svc_req *rqst, SVCXPRT *xprt)
} else if (!svc_sendreply_mbuf(rqst, nd.nd_mreq)) {
svcerr_systemerr(rqst);
}
- if (rp != NULL)
- nfsrvd_sentcache(rp, xprt->xp_socket, 0);
+ if (rp != NULL) {
+ nfsrvd_sentcache(rp, (rqst->rq_reply_seq != 0 ||
+ SVC_ACK(xprt, NULL)), rqst->rq_reply_seq);
+ }
svc_freereq(rqst);
out:
@@ -300,11 +301,12 @@ out:
* Return the appropriate cache response.
*/
static int
-nfs_proc(struct nfsrv_descript *nd, u_int32_t xid, struct socket *so,
- u_int64_t sockref, struct nfsrvcache **rpp)
+nfs_proc(struct nfsrv_descript *nd, u_int32_t xid, SVCXPRT *xprt,
+ struct nfsrvcache **rpp)
{
struct thread *td = curthread;
int cacherep = RC_DOIT, isdgram;
+ uint32_t ack;
*rpp = NULL;
if (nd->nd_nam2 == NULL) {
@@ -336,8 +338,11 @@ nfs_proc(struct nfsrv_descript *nd, u_int32_t xid, struct socket *so,
nd->nd_flag |= ND_SAMETCPCONN;
nd->nd_retxid = xid;
nd->nd_tcpconntime = NFSD_MONOSEC;
- nd->nd_sockref = sockref;
- cacherep = nfsrvd_getcache(nd, so);
+ nd->nd_sockref = xprt->xp_sockref;
+ cacherep = nfsrvd_getcache(nd);
+ ack = 0;
+ SVC_ACK(xprt, &ack);
+ nfsrc_trimcache(xprt->xp_sockref, ack, 0);
}
/*
@@ -352,13 +357,23 @@ nfs_proc(struct nfsrv_descript *nd, u_int32_t xid, struct socket *so,
cacherep = RC_DROPIT;
else
cacherep = RC_REPLY;
- *rpp = nfsrvd_updatecache(nd, so);
+ *rpp = nfsrvd_updatecache(nd);
}
NFSEXITCODE2(0, nd);
return (cacherep);
}
+static void
+nfssvc_loss(SVCXPRT *xprt)
+{
+ uint32_t ack;
+
+ ack = 0;
+ SVC_ACK(xprt, &ack);
+ nfsrc_trimcache(xprt->xp_sockref, ack, 1);
+}
+
/*
* Adds a socket to the list for servicing by nfsds.
*/
@@ -399,6 +414,8 @@ nfsrvd_addsock(struct file *fp)
if (nfs_maxvers >= NFS_VER4)
svc_reg(xprt, NFS_PROG, NFS_VER4, nfssvc_program,
NULL);
+ if (so->so_type == SOCK_STREAM)
+ svc_loss_reg(xprt, nfssvc_loss);
SVC_RELEASE(xprt);
}
diff --git a/sys/fs/nfsserver/nfs_nfsdport.c b/sys/fs/nfsserver/nfs_nfsdport.c
index ef26f64..21d302f 100644
--- a/sys/fs/nfsserver/nfs_nfsdport.c
+++ b/sys/fs/nfsserver/nfs_nfsdport.c
@@ -61,6 +61,7 @@ extern struct nfsv4lock nfsd_suspend_lock;
struct vfsoptlist nfsv4root_opt, nfsv4root_newopt;
NFSDLOCKMUTEX;
struct nfsrchash_bucket nfsrchash_table[NFSRVCACHE_HASHSIZE];
+struct nfsrchash_bucket nfsrcahash_table[NFSRVCACHE_HASHSIZE];
struct mtx nfsrc_udpmtx;
struct mtx nfs_v4root_mutex;
struct nfsrvfh nfs_rootfh, nfs_pubfh;
@@ -2881,40 +2882,6 @@ out:
}
/*
- * Get the tcp socket sequence numbers we need.
- * (Maybe this should be moved to the tcp sources?)
- */
-int
-nfsrv_getsocksndseq(struct socket *so, tcp_seq *maxp, tcp_seq *unap)
-{
- struct inpcb *inp;
- struct tcpcb *tp;
- int error = 0;
-
- inp = sotoinpcb(so);
- KASSERT(inp != NULL, ("nfsrv_getsocksndseq: inp == NULL"));
- INP_RLOCK(inp);
- if (inp->inp_flags & (INP_TIMEWAIT | INP_DROPPED)) {
- INP_RUNLOCK(inp);
- error = EPIPE;
- goto out;
- }
- tp = intotcpcb(inp);
- if (tp->t_state != TCPS_ESTABLISHED) {
- INP_RUNLOCK(inp);
- error = EPIPE;
- goto out;
- }
- *maxp = tp->snd_max;
- *unap = tp->snd_una;
- INP_RUNLOCK(inp);
-
-out:
- NFSEXITCODE(error);
- return (error);
-}
-
-/*
* This function needs to test to see if the system is near its limit
* for memory allocation via malloc() or mget() and return True iff
* either of these resources are near their limit.
@@ -3340,6 +3307,11 @@ nfsd_modevent(module_t mod, int type, void *data)
i);
mtx_init(&nfsrchash_table[i].mtx,
nfsrchash_table[i].lock_name, NULL, MTX_DEF);
+ snprintf(nfsrcahash_table[i].lock_name,
+ sizeof(nfsrcahash_table[i].lock_name), "nfsrc_tcpa%d",
+ i);
+ mtx_init(&nfsrcahash_table[i].mtx,
+ nfsrcahash_table[i].lock_name, NULL, MTX_DEF);
}
mtx_init(&nfsrc_udpmtx, "nfs_udpcache_mutex", NULL, MTX_DEF);
mtx_init(&nfs_v4root_mutex, "nfs_v4root_mutex", NULL, MTX_DEF);
@@ -3385,8 +3357,10 @@ nfsd_modevent(module_t mod, int type, void *data)
svcpool_destroy(nfsrvd_pool);
/* and get rid of the locks */
- for (i = 0; i < NFSRVCACHE_HASHSIZE; i++)
+ for (i = 0; i < NFSRVCACHE_HASHSIZE; i++) {
mtx_destroy(&nfsrchash_table[i].mtx);
+ mtx_destroy(&nfsrcahash_table[i].mtx);
+ }
mtx_destroy(&nfsrc_udpmtx);
mtx_destroy(&nfs_v4root_mutex);
mtx_destroy(&nfsv4root_mnt.mnt_mtx);
diff --git a/sys/fs/nfsserver/nfs_nfsdsubs.c b/sys/fs/nfsserver/nfs_nfsdsubs.c
index 5c8ecec..404ad87 100644
--- a/sys/fs/nfsserver/nfs_nfsdsubs.c
+++ b/sys/fs/nfsserver/nfs_nfsdsubs.c
@@ -1987,47 +1987,6 @@ nfsmout:
return (error);
}
-/*
- * Check the tcp socket sequence number has been acknowledged.
- */
-int
-nfsrv_checksockseqnum(struct socket *so, tcp_seq tcpseqval)
-{
- tcp_seq maxseq, unaseq;
- int error, ret;
-
- error = nfsrv_getsocksndseq(so, &maxseq, &unaseq);
- if (error)
- return (0);
- ret = SEQ_GEQ(unaseq, tcpseqval);
- return (ret);
-}
-
-/*
- * Get the tcp sequence number to be acknowledged.
- */
-int
-nfsrv_getsockseqnum(struct socket *so, tcp_seq *tcpseqp)
-{
- tcp_seq maxseq, unaseq;
- u_int sbcc;
- int error;
-
- sbcc = so->so_snd.sb_cc;
- error = nfsrv_getsocksndseq(so, &maxseq, &unaseq);
- if (error)
- return (0);
- /*
- * Set the seq# to a value that will
- * be at least the end of the reply.
- * When this sequence# is acknowledged
- * by the client, the client has received
- * the reply.
- */
- *tcpseqp = sbcc + maxseq;
- return (1);
-}
-
void
nfsd_init(void)
{
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c
index b42bafb..bc82cd4 100644
--- a/sys/rpc/svc.c
+++ b/sys/rpc/svc.c
@@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$");
#include <sys/queue.h>
#include <sys/socketvar.h>
#include <sys/systm.h>
+#include <sys/sx.h>
#include <sys/ucred.h>
#include <rpc/rpc.h>
@@ -93,6 +94,7 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
TAILQ_INIT(&pool->sp_xlist);
TAILQ_INIT(&pool->sp_active);
TAILQ_INIT(&pool->sp_callouts);
+ TAILQ_INIT(&pool->sp_lcallouts);
LIST_INIT(&pool->sp_threads);
LIST_INIT(&pool->sp_idlethreads);
pool->sp_minthreads = 1;
@@ -158,6 +160,7 @@ svcpool_destroy(SVCPOOL *pool)
{
SVCXPRT *xprt, *nxprt;
struct svc_callout *s;
+ struct svc_loss_callout *sl;
struct svcxprt_list cleanup;
TAILQ_INIT(&cleanup);
@@ -169,12 +172,16 @@ svcpool_destroy(SVCPOOL *pool)
TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
}
- while (TAILQ_FIRST(&pool->sp_callouts)) {
- s = TAILQ_FIRST(&pool->sp_callouts);
+ while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
mtx_unlock(&pool->sp_lock);
svc_unreg(pool, s->sc_prog, s->sc_vers);
mtx_lock(&pool->sp_lock);
}
+ while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
+ mtx_unlock(&pool->sp_lock);
+ svc_loss_unreg(pool, sl->slc_dispatch);
+ mtx_lock(&pool->sp_lock);
+ }
mtx_unlock(&pool->sp_lock);
TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
@@ -511,6 +518,55 @@ svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
mtx_unlock(&pool->sp_lock);
}
+/*
+ * Add a service connection loss program to the callout list.
+ * The dispatch routine will be called when some port in ths pool die.
+ */
+bool_t
+svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
+{
+ SVCPOOL *pool = xprt->xp_pool;
+ struct svc_loss_callout *s;
+
+ mtx_lock(&pool->sp_lock);
+ TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
+ if (s->slc_dispatch == dispatch)
+ break;
+ }
+ if (s != NULL) {
+ mtx_unlock(&pool->sp_lock);
+ return (TRUE);
+ }
+ s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
+ if (s == NULL) {
+ mtx_unlock(&pool->sp_lock);
+ return (FALSE);
+ }
+ s->slc_dispatch = dispatch;
+ TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
+ mtx_unlock(&pool->sp_lock);
+ return (TRUE);
+}
+
+/*
+ * Remove a service connection loss program from the callout list.
+ */
+void
+svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
+{
+ struct svc_loss_callout *s;
+
+ mtx_lock(&pool->sp_lock);
+ TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
+ if (s->slc_dispatch == dispatch) {
+ TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
+ free(s, M_RPC);
+ break;
+ }
+ }
+ mtx_unlock(&pool->sp_lock);
+}
+
/* ********************** CALLOUT list related stuff ************* */
/*
@@ -554,7 +610,7 @@ svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
return (FALSE);
- ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
+ ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
if (rqstp->rq_addr) {
free(rqstp->rq_addr, M_SONAME);
rqstp->rq_addr = NULL;
@@ -803,6 +859,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
struct svc_req *r;
struct rpc_msg msg;
struct mbuf *args;
+ struct svc_loss_callout *s;
enum xprt_stat stat;
/* now receive msgs from xprtprt (support batch calls) */
@@ -831,7 +888,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
break;
case RS_DONE:
SVC_REPLY(xprt, &repmsg, r->rq_addr,
- repbody);
+ repbody, &r->rq_reply_seq);
if (r->rq_addr) {
free(r->rq_addr, M_SONAME);
r->rq_addr = NULL;
@@ -881,6 +938,8 @@ call_done:
r = NULL;
}
if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
+ TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
+ (*s->slc_dispatch)(xprt);
xprt_unregister(xprt);
}
diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h
index 7d9428e..a7f5f51 100644
--- a/sys/rpc/svc.h
+++ b/sys/rpc/svc.h
@@ -103,9 +103,11 @@ struct xp_ops {
struct sockaddr **, struct mbuf **);
/* get transport status */
enum xprt_stat (*xp_stat)(struct __rpc_svcxprt *);
+ /* get transport acknowledge sequence */
+ bool_t (*xp_ack)(struct __rpc_svcxprt *, uint32_t *);
/* send reply */
bool_t (*xp_reply)(struct __rpc_svcxprt *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *);
/* destroy this struct */
void (*xp_destroy)(struct __rpc_svcxprt *);
/* catch-all function */
@@ -166,6 +168,8 @@ typedef struct __rpc_svcxprt {
time_t xp_lastactive; /* time of last RPC */
u_int64_t xp_sockref; /* set by nfsv4 to identify socket */
int xp_upcallset; /* socket upcall is set up */
+ uint32_t xp_snd_cnt; /* # of bytes to send to socket */
+ uint32_t xp_snt_cnt; /* # of bytes sent to socket */
#else
int xp_fd;
u_short xp_port; /* associated port number */
@@ -230,6 +234,17 @@ struct svc_callout {
};
TAILQ_HEAD(svc_callout_list, svc_callout);
+/*
+ * The services connection loss list
+ * The dispatch routine takes request structs and runs the
+ * apropriate procedure.
+ */
+struct svc_loss_callout {
+ TAILQ_ENTRY(svc_loss_callout) slc_link;
+ void (*slc_dispatch)(SVCXPRT *);
+};
+TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout);
+
struct __rpc_svcthread;
/*
@@ -253,6 +268,7 @@ struct svc_req {
void *rq_p1; /* application workspace */
int rq_p2; /* application workspace */
uint64_t rq_p3; /* application workspace */
+ uint32_t rq_reply_seq; /* reply socket sequence # */
char rq_credarea[3*MAX_AUTH_BYTES];
};
STAILQ_HEAD(svc_reqlist, svc_req);
@@ -311,13 +327,14 @@ enum svcpool_state {
typedef SVCTHREAD *pool_assign_fn(SVCTHREAD *, struct svc_req *);
typedef void pool_done_fn(SVCTHREAD *, struct svc_req *);
typedef struct __rpc_svcpool {
- struct mtx sp_lock; /* protect the transport lists */
+ struct mtx_padalign sp_lock; /* protect the transport lists */
const char *sp_name; /* pool name (e.g. "nfsd", "NLM" */
enum svcpool_state sp_state; /* current pool state */
struct proc *sp_proc; /* process which is in svc_run */
struct svcxprt_list sp_xlist; /* all transports in the pool */
struct svcxprt_list sp_active; /* transports needing service */
struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */
+ struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */
struct svcthread_list sp_threads; /* service threads */
struct svcthread_list sp_idlethreads; /* idle service threads */
int sp_minthreads; /* minimum service thread count */
@@ -393,8 +410,12 @@ struct svc_req {
#define SVC_STAT(xprt) \
(*(xprt)->xp_ops->xp_stat)(xprt)
-#define SVC_REPLY(xprt, msg, addr, m) \
- (*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m))
+#define SVC_ACK(xprt, ack) \
+ ((xprt)->xp_ops->xp_ack == NULL ? FALSE : \
+ ((ack) == NULL ? TRUE : (*(xprt)->xp_ops->xp_ack)((xprt), (ack))))
+
+#define SVC_REPLY(xprt, msg, addr, m, seq) \
+ (*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m), (seq))
#define SVC_DESTROY(xprt) \
(*(xprt)->xp_ops->xp_destroy)(xprt)
@@ -495,6 +516,32 @@ extern void svc_unreg(const rpcprog_t, const rpcvers_t);
#endif
__END_DECLS
+#ifdef _KERNEL
+/*
+ * Service connection loss registration
+ *
+ * svc_loss_reg(xprt, dispatch)
+ * const SVCXPRT *xprt;
+ * const void (*dispatch)();
+ */
+
+__BEGIN_DECLS
+extern bool_t svc_loss_reg(SVCXPRT *, void (*)(SVCXPRT *));
+__END_DECLS
+
+/*
+ * Service connection loss un-registration
+ *
+ * svc_loss_unreg(xprt, dispatch)
+ * const SVCXPRT *xprt;
+ * const void (*dispatch)();
+ */
+
+__BEGIN_DECLS
+extern void svc_loss_unreg(SVCPOOL *, void (*)(SVCXPRT *));
+__END_DECLS
+#endif
+
/*
* Transport registration.
*
diff --git a/sys/rpc/svc_dg.c b/sys/rpc/svc_dg.c
index 79f8429..1c530bd 100644
--- a/sys/rpc/svc_dg.c
+++ b/sys/rpc/svc_dg.c
@@ -66,7 +66,7 @@ static enum xprt_stat svc_dg_stat(SVCXPRT *);
static bool_t svc_dg_recv(SVCXPRT *, struct rpc_msg *,
struct sockaddr **, struct mbuf **);
static bool_t svc_dg_reply(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *);
static void svc_dg_destroy(SVCXPRT *);
static bool_t svc_dg_control(SVCXPRT *, const u_int, void *);
static int svc_dg_soupcall(struct socket *so, void *arg, int waitflag);
@@ -230,7 +230,7 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_msg *msg,
static bool_t
svc_dg_reply(SVCXPRT *xprt, struct rpc_msg *msg,
- struct sockaddr *addr, struct mbuf *m)
+ struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
{
XDR xdrs;
struct mbuf *mrep;
diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c
index 3140915..5fe6488 100644
--- a/sys/rpc/svc_vc.c
+++ b/sys/rpc/svc_vc.c
@@ -76,10 +76,11 @@ static void svc_vc_rendezvous_destroy(SVCXPRT *);
static bool_t svc_vc_null(void);
static void svc_vc_destroy(SVCXPRT *);
static enum xprt_stat svc_vc_stat(SVCXPRT *);
+static bool_t svc_vc_ack(SVCXPRT *, uint32_t *);
static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
struct sockaddr **, struct mbuf **);
static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *seq);
static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
void *in);
@@ -88,7 +89,7 @@ static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
struct sockaddr **, struct mbuf **);
static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *);
static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
void *in);
static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
@@ -100,7 +101,7 @@ static struct xp_ops svc_vc_rendezvous_ops = {
.xp_recv = svc_vc_rendezvous_recv,
.xp_stat = svc_vc_rendezvous_stat,
.xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *))svc_vc_null,
+ struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null,
.xp_destroy = svc_vc_rendezvous_destroy,
.xp_control = svc_vc_rendezvous_control
};
@@ -108,6 +109,7 @@ static struct xp_ops svc_vc_rendezvous_ops = {
static struct xp_ops svc_vc_ops = {
.xp_recv = svc_vc_recv,
.xp_stat = svc_vc_stat,
+ .xp_ack = svc_vc_ack,
.xp_reply = svc_vc_reply,
.xp_destroy = svc_vc_destroy,
.xp_control = svc_vc_control
@@ -184,8 +186,10 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
return (xprt);
cleanup_svc_vc_create:
- if (xprt)
+ if (xprt) {
+ sx_destroy(&xprt->xp_lock);
svc_xprt_free(xprt);
+ }
return (NULL);
}
@@ -270,7 +274,8 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
return (xprt);
cleanup_svc_vc_create:
if (xprt) {
- mem_free(xprt, sizeof(*xprt));
+ sx_destroy(&xprt->xp_lock);
+ svc_xprt_free(xprt);
}
if (cd)
mem_free(cd, sizeof(*cd));
@@ -451,7 +456,6 @@ svc_vc_destroy_common(SVCXPRT *xprt)
}
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
- sx_destroy(&xprt->xp_lock);
if (xprt->xp_socket)
(void)soclose(xprt->xp_socket);
@@ -537,6 +541,15 @@ svc_vc_stat(SVCXPRT *xprt)
return (XPRT_IDLE);
}
+static bool_t
+svc_vc_ack(SVCXPRT *xprt, uint32_t *ack)
+{
+
+ *ack = atomic_load_acq_32(&xprt->xp_snt_cnt);
+ *ack -= xprt->xp_socket->so_snd.sb_cc;
+ return (TRUE);
+}
+
static enum xprt_stat
svc_vc_backchannel_stat(SVCXPRT *xprt)
{
@@ -785,12 +798,12 @@ svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
static bool_t
svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
- struct sockaddr *addr, struct mbuf *m)
+ struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
{
XDR xdrs;
struct mbuf *mrep;
bool_t stat = TRUE;
- int error;
+ int error, len;
/*
* Leave space for record mark.
@@ -817,14 +830,19 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
* Prepend a record marker containing the reply length.
*/
M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
+ len = mrep->m_pkthdr.len;
*mtod(mrep, uint32_t *) =
- htonl(0x80000000 | (mrep->m_pkthdr.len
- - sizeof(uint32_t)));
+ htonl(0x80000000 | (len - sizeof(uint32_t)));
+ atomic_add_acq_32(&xprt->xp_snd_cnt, len);
error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
0, curthread);
if (!error) {
+ atomic_add_rel_32(&xprt->xp_snt_cnt, len);
+ if (seq)
+ *seq = xprt->xp_snd_cnt;
stat = TRUE;
- }
+ } else
+ atomic_subtract_32(&xprt->xp_snd_cnt, len);
} else {
m_freem(mrep);
}
@@ -837,7 +855,7 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
static bool_t
svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
- struct sockaddr *addr, struct mbuf *m)
+ struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
{
struct ct_data *ct;
XDR xdrs;
OpenPOWER on IntegriCloud