summaryrefslogtreecommitdiffstats
path: root/sys/rpc
diff options
context:
space:
mode:
authormav <mav@FreeBSD.org>2014-01-22 23:55:25 +0000
committermav <mav@FreeBSD.org>2014-01-22 23:55:25 +0000
commit31820682b51229f8d751da6265f356ae29aa4f35 (patch)
tree072242ab1e702c0ab5321cf6d0b8c69e85b20fba /sys/rpc
parent9b0c44b0a6b411d66547ee6ce611289ef831a73f (diff)
downloadFreeBSD-src-31820682b51229f8d751da6265f356ae29aa4f35.zip
FreeBSD-src-31820682b51229f8d751da6265f356ae29aa4f35.tar.gz
MFC r260229, r260258, r260367, r260390, r260459, r260648:
Rework NFS Duplicate Request Cache cleanup logic. - Introduce additional hash to group requests by hash of sockref. This allows to process TCP acknowledgements without looping though all the cache, and as result allows to do it every time. - Indroduce additional callbacks to notify application layer about sockets disconnection. Without this last few requests processed just before socket disconnection never processed their ACKs and stuck in cache for many hours. - Implement transport-specific method for tracking reply acknowledgements. New implementation does not cross multiple stack layers to get the data and does not have race conditions that previously made some requests stuck in cache. This could be done more efficiently at sockbuf layer, but that would broke some KBIs, while I don't know other consumers for it aside NFS. - Instead of traversing all DRC twice per request, run cleaning only once per request, and except in some conditions traverse only single hash slot at a time. Together this limits NFS DRC growth only to situations of real connectivity problems. If network is working well, and so all replies are acknowledged, cache remains almost empty even after hours of heavy load. Without this change on the same test cache was growing to many thousand requests even with perfectly working local network. As another result this reduces CPU time spent on the DRC handling during SPEC NFS benchmark from about 10% to 0.5%. Sponsored by: iXsystems, Inc.
Diffstat (limited to 'sys/rpc')
-rw-r--r--sys/rpc/svc.c67
-rw-r--r--sys/rpc/svc.h55
-rw-r--r--sys/rpc/svc_dg.c4
-rw-r--r--sys/rpc/svc_vc.c42
4 files changed, 146 insertions, 22 deletions
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c
index b42bafb..bc82cd4 100644
--- a/sys/rpc/svc.c
+++ b/sys/rpc/svc.c
@@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$");
#include <sys/queue.h>
#include <sys/socketvar.h>
#include <sys/systm.h>
+#include <sys/sx.h>
#include <sys/ucred.h>
#include <rpc/rpc.h>
@@ -93,6 +94,7 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
TAILQ_INIT(&pool->sp_xlist);
TAILQ_INIT(&pool->sp_active);
TAILQ_INIT(&pool->sp_callouts);
+ TAILQ_INIT(&pool->sp_lcallouts);
LIST_INIT(&pool->sp_threads);
LIST_INIT(&pool->sp_idlethreads);
pool->sp_minthreads = 1;
@@ -158,6 +160,7 @@ svcpool_destroy(SVCPOOL *pool)
{
SVCXPRT *xprt, *nxprt;
struct svc_callout *s;
+ struct svc_loss_callout *sl;
struct svcxprt_list cleanup;
TAILQ_INIT(&cleanup);
@@ -169,12 +172,16 @@ svcpool_destroy(SVCPOOL *pool)
TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
}
- while (TAILQ_FIRST(&pool->sp_callouts)) {
- s = TAILQ_FIRST(&pool->sp_callouts);
+ while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
mtx_unlock(&pool->sp_lock);
svc_unreg(pool, s->sc_prog, s->sc_vers);
mtx_lock(&pool->sp_lock);
}
+ while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
+ mtx_unlock(&pool->sp_lock);
+ svc_loss_unreg(pool, sl->slc_dispatch);
+ mtx_lock(&pool->sp_lock);
+ }
mtx_unlock(&pool->sp_lock);
TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
@@ -511,6 +518,55 @@ svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
mtx_unlock(&pool->sp_lock);
}
+/*
+ * Add a service connection loss program to the callout list.
+ * The dispatch routine will be called when some port in ths pool die.
+ */
+bool_t
+svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
+{
+ SVCPOOL *pool = xprt->xp_pool;
+ struct svc_loss_callout *s;
+
+ mtx_lock(&pool->sp_lock);
+ TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
+ if (s->slc_dispatch == dispatch)
+ break;
+ }
+ if (s != NULL) {
+ mtx_unlock(&pool->sp_lock);
+ return (TRUE);
+ }
+ s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
+ if (s == NULL) {
+ mtx_unlock(&pool->sp_lock);
+ return (FALSE);
+ }
+ s->slc_dispatch = dispatch;
+ TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
+ mtx_unlock(&pool->sp_lock);
+ return (TRUE);
+}
+
+/*
+ * Remove a service connection loss program from the callout list.
+ */
+void
+svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
+{
+ struct svc_loss_callout *s;
+
+ mtx_lock(&pool->sp_lock);
+ TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
+ if (s->slc_dispatch == dispatch) {
+ TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
+ free(s, M_RPC);
+ break;
+ }
+ }
+ mtx_unlock(&pool->sp_lock);
+}
+
/* ********************** CALLOUT list related stuff ************* */
/*
@@ -554,7 +610,7 @@ svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
return (FALSE);
- ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
+ ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
if (rqstp->rq_addr) {
free(rqstp->rq_addr, M_SONAME);
rqstp->rq_addr = NULL;
@@ -803,6 +859,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
struct svc_req *r;
struct rpc_msg msg;
struct mbuf *args;
+ struct svc_loss_callout *s;
enum xprt_stat stat;
/* now receive msgs from xprtprt (support batch calls) */
@@ -831,7 +888,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
break;
case RS_DONE:
SVC_REPLY(xprt, &repmsg, r->rq_addr,
- repbody);
+ repbody, &r->rq_reply_seq);
if (r->rq_addr) {
free(r->rq_addr, M_SONAME);
r->rq_addr = NULL;
@@ -881,6 +938,8 @@ call_done:
r = NULL;
}
if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
+ TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
+ (*s->slc_dispatch)(xprt);
xprt_unregister(xprt);
}
diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h
index 7d9428e..a7f5f51 100644
--- a/sys/rpc/svc.h
+++ b/sys/rpc/svc.h
@@ -103,9 +103,11 @@ struct xp_ops {
struct sockaddr **, struct mbuf **);
/* get transport status */
enum xprt_stat (*xp_stat)(struct __rpc_svcxprt *);
+ /* get transport acknowledge sequence */
+ bool_t (*xp_ack)(struct __rpc_svcxprt *, uint32_t *);
/* send reply */
bool_t (*xp_reply)(struct __rpc_svcxprt *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *);
/* destroy this struct */
void (*xp_destroy)(struct __rpc_svcxprt *);
/* catch-all function */
@@ -166,6 +168,8 @@ typedef struct __rpc_svcxprt {
time_t xp_lastactive; /* time of last RPC */
u_int64_t xp_sockref; /* set by nfsv4 to identify socket */
int xp_upcallset; /* socket upcall is set up */
+ uint32_t xp_snd_cnt; /* # of bytes to send to socket */
+ uint32_t xp_snt_cnt; /* # of bytes sent to socket */
#else
int xp_fd;
u_short xp_port; /* associated port number */
@@ -230,6 +234,17 @@ struct svc_callout {
};
TAILQ_HEAD(svc_callout_list, svc_callout);
+/*
+ * The services connection loss list
+ * The dispatch routine takes request structs and runs the
+ * apropriate procedure.
+ */
+struct svc_loss_callout {
+ TAILQ_ENTRY(svc_loss_callout) slc_link;
+ void (*slc_dispatch)(SVCXPRT *);
+};
+TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout);
+
struct __rpc_svcthread;
/*
@@ -253,6 +268,7 @@ struct svc_req {
void *rq_p1; /* application workspace */
int rq_p2; /* application workspace */
uint64_t rq_p3; /* application workspace */
+ uint32_t rq_reply_seq; /* reply socket sequence # */
char rq_credarea[3*MAX_AUTH_BYTES];
};
STAILQ_HEAD(svc_reqlist, svc_req);
@@ -311,13 +327,14 @@ enum svcpool_state {
typedef SVCTHREAD *pool_assign_fn(SVCTHREAD *, struct svc_req *);
typedef void pool_done_fn(SVCTHREAD *, struct svc_req *);
typedef struct __rpc_svcpool {
- struct mtx sp_lock; /* protect the transport lists */
+ struct mtx_padalign sp_lock; /* protect the transport lists */
const char *sp_name; /* pool name (e.g. "nfsd", "NLM" */
enum svcpool_state sp_state; /* current pool state */
struct proc *sp_proc; /* process which is in svc_run */
struct svcxprt_list sp_xlist; /* all transports in the pool */
struct svcxprt_list sp_active; /* transports needing service */
struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */
+ struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */
struct svcthread_list sp_threads; /* service threads */
struct svcthread_list sp_idlethreads; /* idle service threads */
int sp_minthreads; /* minimum service thread count */
@@ -393,8 +410,12 @@ struct svc_req {
#define SVC_STAT(xprt) \
(*(xprt)->xp_ops->xp_stat)(xprt)
-#define SVC_REPLY(xprt, msg, addr, m) \
- (*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m))
+#define SVC_ACK(xprt, ack) \
+ ((xprt)->xp_ops->xp_ack == NULL ? FALSE : \
+ ((ack) == NULL ? TRUE : (*(xprt)->xp_ops->xp_ack)((xprt), (ack))))
+
+#define SVC_REPLY(xprt, msg, addr, m, seq) \
+ (*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m), (seq))
#define SVC_DESTROY(xprt) \
(*(xprt)->xp_ops->xp_destroy)(xprt)
@@ -495,6 +516,32 @@ extern void svc_unreg(const rpcprog_t, const rpcvers_t);
#endif
__END_DECLS
+#ifdef _KERNEL
+/*
+ * Service connection loss registration
+ *
+ * svc_loss_reg(xprt, dispatch)
+ * const SVCXPRT *xprt;
+ * const void (*dispatch)();
+ */
+
+__BEGIN_DECLS
+extern bool_t svc_loss_reg(SVCXPRT *, void (*)(SVCXPRT *));
+__END_DECLS
+
+/*
+ * Service connection loss un-registration
+ *
+ * svc_loss_unreg(xprt, dispatch)
+ * const SVCXPRT *xprt;
+ * const void (*dispatch)();
+ */
+
+__BEGIN_DECLS
+extern void svc_loss_unreg(SVCPOOL *, void (*)(SVCXPRT *));
+__END_DECLS
+#endif
+
/*
* Transport registration.
*
diff --git a/sys/rpc/svc_dg.c b/sys/rpc/svc_dg.c
index 79f8429..1c530bd 100644
--- a/sys/rpc/svc_dg.c
+++ b/sys/rpc/svc_dg.c
@@ -66,7 +66,7 @@ static enum xprt_stat svc_dg_stat(SVCXPRT *);
static bool_t svc_dg_recv(SVCXPRT *, struct rpc_msg *,
struct sockaddr **, struct mbuf **);
static bool_t svc_dg_reply(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *);
static void svc_dg_destroy(SVCXPRT *);
static bool_t svc_dg_control(SVCXPRT *, const u_int, void *);
static int svc_dg_soupcall(struct socket *so, void *arg, int waitflag);
@@ -230,7 +230,7 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_msg *msg,
static bool_t
svc_dg_reply(SVCXPRT *xprt, struct rpc_msg *msg,
- struct sockaddr *addr, struct mbuf *m)
+ struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
{
XDR xdrs;
struct mbuf *mrep;
diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c
index 3140915..5fe6488 100644
--- a/sys/rpc/svc_vc.c
+++ b/sys/rpc/svc_vc.c
@@ -76,10 +76,11 @@ static void svc_vc_rendezvous_destroy(SVCXPRT *);
static bool_t svc_vc_null(void);
static void svc_vc_destroy(SVCXPRT *);
static enum xprt_stat svc_vc_stat(SVCXPRT *);
+static bool_t svc_vc_ack(SVCXPRT *, uint32_t *);
static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
struct sockaddr **, struct mbuf **);
static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *seq);
static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
void *in);
@@ -88,7 +89,7 @@ static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
struct sockaddr **, struct mbuf **);
static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *);
static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
void *in);
static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
@@ -100,7 +101,7 @@ static struct xp_ops svc_vc_rendezvous_ops = {
.xp_recv = svc_vc_rendezvous_recv,
.xp_stat = svc_vc_rendezvous_stat,
.xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *))svc_vc_null,
+ struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null,
.xp_destroy = svc_vc_rendezvous_destroy,
.xp_control = svc_vc_rendezvous_control
};
@@ -108,6 +109,7 @@ static struct xp_ops svc_vc_rendezvous_ops = {
static struct xp_ops svc_vc_ops = {
.xp_recv = svc_vc_recv,
.xp_stat = svc_vc_stat,
+ .xp_ack = svc_vc_ack,
.xp_reply = svc_vc_reply,
.xp_destroy = svc_vc_destroy,
.xp_control = svc_vc_control
@@ -184,8 +186,10 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
return (xprt);
cleanup_svc_vc_create:
- if (xprt)
+ if (xprt) {
+ sx_destroy(&xprt->xp_lock);
svc_xprt_free(xprt);
+ }
return (NULL);
}
@@ -270,7 +274,8 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
return (xprt);
cleanup_svc_vc_create:
if (xprt) {
- mem_free(xprt, sizeof(*xprt));
+ sx_destroy(&xprt->xp_lock);
+ svc_xprt_free(xprt);
}
if (cd)
mem_free(cd, sizeof(*cd));
@@ -451,7 +456,6 @@ svc_vc_destroy_common(SVCXPRT *xprt)
}
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
- sx_destroy(&xprt->xp_lock);
if (xprt->xp_socket)
(void)soclose(xprt->xp_socket);
@@ -537,6 +541,15 @@ svc_vc_stat(SVCXPRT *xprt)
return (XPRT_IDLE);
}
+static bool_t
+svc_vc_ack(SVCXPRT *xprt, uint32_t *ack)
+{
+
+ *ack = atomic_load_acq_32(&xprt->xp_snt_cnt);
+ *ack -= xprt->xp_socket->so_snd.sb_cc;
+ return (TRUE);
+}
+
static enum xprt_stat
svc_vc_backchannel_stat(SVCXPRT *xprt)
{
@@ -785,12 +798,12 @@ svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
static bool_t
svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
- struct sockaddr *addr, struct mbuf *m)
+ struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
{
XDR xdrs;
struct mbuf *mrep;
bool_t stat = TRUE;
- int error;
+ int error, len;
/*
* Leave space for record mark.
@@ -817,14 +830,19 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
* Prepend a record marker containing the reply length.
*/
M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
+ len = mrep->m_pkthdr.len;
*mtod(mrep, uint32_t *) =
- htonl(0x80000000 | (mrep->m_pkthdr.len
- - sizeof(uint32_t)));
+ htonl(0x80000000 | (len - sizeof(uint32_t)));
+ atomic_add_acq_32(&xprt->xp_snd_cnt, len);
error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
0, curthread);
if (!error) {
+ atomic_add_rel_32(&xprt->xp_snt_cnt, len);
+ if (seq)
+ *seq = xprt->xp_snd_cnt;
stat = TRUE;
- }
+ } else
+ atomic_subtract_32(&xprt->xp_snd_cnt, len);
} else {
m_freem(mrep);
}
@@ -837,7 +855,7 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
static bool_t
svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
- struct sockaddr *addr, struct mbuf *m)
+ struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
{
struct ct_data *ct;
XDR xdrs;
OpenPOWER on IntegriCloud