diff options
Diffstat (limited to 'sys/rpc')
-rw-r--r-- | sys/rpc/svc.c | 67 | ||||
-rw-r--r-- | sys/rpc/svc.h | 55 | ||||
-rw-r--r-- | sys/rpc/svc_dg.c | 4 | ||||
-rw-r--r-- | sys/rpc/svc_vc.c | 42 |
4 files changed, 146 insertions, 22 deletions
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c index b42bafb..bc82cd4 100644 --- a/sys/rpc/svc.c +++ b/sys/rpc/svc.c @@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$"); #include <sys/queue.h> #include <sys/socketvar.h> #include <sys/systm.h> +#include <sys/sx.h> #include <sys/ucred.h> #include <rpc/rpc.h> @@ -93,6 +94,7 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) TAILQ_INIT(&pool->sp_xlist); TAILQ_INIT(&pool->sp_active); TAILQ_INIT(&pool->sp_callouts); + TAILQ_INIT(&pool->sp_lcallouts); LIST_INIT(&pool->sp_threads); LIST_INIT(&pool->sp_idlethreads); pool->sp_minthreads = 1; @@ -158,6 +160,7 @@ svcpool_destroy(SVCPOOL *pool) { SVCXPRT *xprt, *nxprt; struct svc_callout *s; + struct svc_loss_callout *sl; struct svcxprt_list cleanup; TAILQ_INIT(&cleanup); @@ -169,12 +172,16 @@ svcpool_destroy(SVCPOOL *pool) TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); } - while (TAILQ_FIRST(&pool->sp_callouts)) { - s = TAILQ_FIRST(&pool->sp_callouts); + while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { mtx_unlock(&pool->sp_lock); svc_unreg(pool, s->sc_prog, s->sc_vers); mtx_lock(&pool->sp_lock); } + while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { + mtx_unlock(&pool->sp_lock); + svc_loss_unreg(pool, sl->slc_dispatch); + mtx_lock(&pool->sp_lock); + } mtx_unlock(&pool->sp_lock); TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { @@ -511,6 +518,55 @@ svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) mtx_unlock(&pool->sp_lock); } +/* + * Add a service connection loss program to the callout list. + * The dispatch routine will be called when some port in ths pool die. + */ +bool_t +svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) +{ + SVCPOOL *pool = xprt->xp_pool; + struct svc_loss_callout *s; + + mtx_lock(&pool->sp_lock); + TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { + if (s->slc_dispatch == dispatch) + break; + } + if (s != NULL) { + mtx_unlock(&pool->sp_lock); + return (TRUE); + } + s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); + if (s == NULL) { + mtx_unlock(&pool->sp_lock); + return (FALSE); + } + s->slc_dispatch = dispatch; + TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); + mtx_unlock(&pool->sp_lock); + return (TRUE); +} + +/* + * Remove a service connection loss program from the callout list. + */ +void +svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) +{ + struct svc_loss_callout *s; + + mtx_lock(&pool->sp_lock); + TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { + if (s->slc_dispatch == dispatch) { + TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); + free(s, M_RPC); + break; + } + } + mtx_unlock(&pool->sp_lock); +} + /* ********************** CALLOUT list related stuff ************* */ /* @@ -554,7 +610,7 @@ svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) return (FALSE); - ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); + ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); if (rqstp->rq_addr) { free(rqstp->rq_addr, M_SONAME); rqstp->rq_addr = NULL; @@ -803,6 +859,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) struct svc_req *r; struct rpc_msg msg; struct mbuf *args; + struct svc_loss_callout *s; enum xprt_stat stat; /* now receive msgs from xprtprt (support batch calls) */ @@ -831,7 +888,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) break; case RS_DONE: SVC_REPLY(xprt, &repmsg, r->rq_addr, - repbody); + repbody, &r->rq_reply_seq); if (r->rq_addr) { free(r->rq_addr, M_SONAME); r->rq_addr = NULL; @@ -881,6 +938,8 @@ call_done: r = NULL; } if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { + TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) + (*s->slc_dispatch)(xprt); xprt_unregister(xprt); } diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h index 7d9428e..a7f5f51 100644 --- a/sys/rpc/svc.h +++ b/sys/rpc/svc.h @@ -103,9 +103,11 @@ struct xp_ops { struct sockaddr **, struct mbuf **); /* get transport status */ enum xprt_stat (*xp_stat)(struct __rpc_svcxprt *); + /* get transport acknowledge sequence */ + bool_t (*xp_ack)(struct __rpc_svcxprt *, uint32_t *); /* send reply */ bool_t (*xp_reply)(struct __rpc_svcxprt *, struct rpc_msg *, - struct sockaddr *, struct mbuf *); + struct sockaddr *, struct mbuf *, uint32_t *); /* destroy this struct */ void (*xp_destroy)(struct __rpc_svcxprt *); /* catch-all function */ @@ -166,6 +168,8 @@ typedef struct __rpc_svcxprt { time_t xp_lastactive; /* time of last RPC */ u_int64_t xp_sockref; /* set by nfsv4 to identify socket */ int xp_upcallset; /* socket upcall is set up */ + uint32_t xp_snd_cnt; /* # of bytes to send to socket */ + uint32_t xp_snt_cnt; /* # of bytes sent to socket */ #else int xp_fd; u_short xp_port; /* associated port number */ @@ -230,6 +234,17 @@ struct svc_callout { }; TAILQ_HEAD(svc_callout_list, svc_callout); +/* + * The services connection loss list + * The dispatch routine takes request structs and runs the + * apropriate procedure. + */ +struct svc_loss_callout { + TAILQ_ENTRY(svc_loss_callout) slc_link; + void (*slc_dispatch)(SVCXPRT *); +}; +TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout); + struct __rpc_svcthread; /* @@ -253,6 +268,7 @@ struct svc_req { void *rq_p1; /* application workspace */ int rq_p2; /* application workspace */ uint64_t rq_p3; /* application workspace */ + uint32_t rq_reply_seq; /* reply socket sequence # */ char rq_credarea[3*MAX_AUTH_BYTES]; }; STAILQ_HEAD(svc_reqlist, svc_req); @@ -311,13 +327,14 @@ enum svcpool_state { typedef SVCTHREAD *pool_assign_fn(SVCTHREAD *, struct svc_req *); typedef void pool_done_fn(SVCTHREAD *, struct svc_req *); typedef struct __rpc_svcpool { - struct mtx sp_lock; /* protect the transport lists */ + struct mtx_padalign sp_lock; /* protect the transport lists */ const char *sp_name; /* pool name (e.g. "nfsd", "NLM" */ enum svcpool_state sp_state; /* current pool state */ struct proc *sp_proc; /* process which is in svc_run */ struct svcxprt_list sp_xlist; /* all transports in the pool */ struct svcxprt_list sp_active; /* transports needing service */ struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */ + struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */ struct svcthread_list sp_threads; /* service threads */ struct svcthread_list sp_idlethreads; /* idle service threads */ int sp_minthreads; /* minimum service thread count */ @@ -393,8 +410,12 @@ struct svc_req { #define SVC_STAT(xprt) \ (*(xprt)->xp_ops->xp_stat)(xprt) -#define SVC_REPLY(xprt, msg, addr, m) \ - (*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m)) +#define SVC_ACK(xprt, ack) \ + ((xprt)->xp_ops->xp_ack == NULL ? FALSE : \ + ((ack) == NULL ? TRUE : (*(xprt)->xp_ops->xp_ack)((xprt), (ack)))) + +#define SVC_REPLY(xprt, msg, addr, m, seq) \ + (*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m), (seq)) #define SVC_DESTROY(xprt) \ (*(xprt)->xp_ops->xp_destroy)(xprt) @@ -495,6 +516,32 @@ extern void svc_unreg(const rpcprog_t, const rpcvers_t); #endif __END_DECLS +#ifdef _KERNEL +/* + * Service connection loss registration + * + * svc_loss_reg(xprt, dispatch) + * const SVCXPRT *xprt; + * const void (*dispatch)(); + */ + +__BEGIN_DECLS +extern bool_t svc_loss_reg(SVCXPRT *, void (*)(SVCXPRT *)); +__END_DECLS + +/* + * Service connection loss un-registration + * + * svc_loss_unreg(xprt, dispatch) + * const SVCXPRT *xprt; + * const void (*dispatch)(); + */ + +__BEGIN_DECLS +extern void svc_loss_unreg(SVCPOOL *, void (*)(SVCXPRT *)); +__END_DECLS +#endif + /* * Transport registration. * diff --git a/sys/rpc/svc_dg.c b/sys/rpc/svc_dg.c index 79f8429..1c530bd 100644 --- a/sys/rpc/svc_dg.c +++ b/sys/rpc/svc_dg.c @@ -66,7 +66,7 @@ static enum xprt_stat svc_dg_stat(SVCXPRT *); static bool_t svc_dg_recv(SVCXPRT *, struct rpc_msg *, struct sockaddr **, struct mbuf **); static bool_t svc_dg_reply(SVCXPRT *, struct rpc_msg *, - struct sockaddr *, struct mbuf *); + struct sockaddr *, struct mbuf *, uint32_t *); static void svc_dg_destroy(SVCXPRT *); static bool_t svc_dg_control(SVCXPRT *, const u_int, void *); static int svc_dg_soupcall(struct socket *so, void *arg, int waitflag); @@ -230,7 +230,7 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_msg *msg, static bool_t svc_dg_reply(SVCXPRT *xprt, struct rpc_msg *msg, - struct sockaddr *addr, struct mbuf *m) + struct sockaddr *addr, struct mbuf *m, uint32_t *seq) { XDR xdrs; struct mbuf *mrep; diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c index 3140915..5fe6488 100644 --- a/sys/rpc/svc_vc.c +++ b/sys/rpc/svc_vc.c @@ -76,10 +76,11 @@ static void svc_vc_rendezvous_destroy(SVCXPRT *); static bool_t svc_vc_null(void); static void svc_vc_destroy(SVCXPRT *); static enum xprt_stat svc_vc_stat(SVCXPRT *); +static bool_t svc_vc_ack(SVCXPRT *, uint32_t *); static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *, struct sockaddr **, struct mbuf **); static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *, - struct sockaddr *, struct mbuf *); + struct sockaddr *, struct mbuf *, uint32_t *seq); static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in); static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, void *in); @@ -88,7 +89,7 @@ static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *); static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *, struct sockaddr **, struct mbuf **); static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *, - struct sockaddr *, struct mbuf *); + struct sockaddr *, struct mbuf *, uint32_t *); static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in); static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so, @@ -100,7 +101,7 @@ static struct xp_ops svc_vc_rendezvous_ops = { .xp_recv = svc_vc_rendezvous_recv, .xp_stat = svc_vc_rendezvous_stat, .xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *, - struct sockaddr *, struct mbuf *))svc_vc_null, + struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null, .xp_destroy = svc_vc_rendezvous_destroy, .xp_control = svc_vc_rendezvous_control }; @@ -108,6 +109,7 @@ static struct xp_ops svc_vc_rendezvous_ops = { static struct xp_ops svc_vc_ops = { .xp_recv = svc_vc_recv, .xp_stat = svc_vc_stat, + .xp_ack = svc_vc_ack, .xp_reply = svc_vc_reply, .xp_destroy = svc_vc_destroy, .xp_control = svc_vc_control @@ -184,8 +186,10 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, return (xprt); cleanup_svc_vc_create: - if (xprt) + if (xprt) { + sx_destroy(&xprt->xp_lock); svc_xprt_free(xprt); + } return (NULL); } @@ -270,7 +274,8 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) return (xprt); cleanup_svc_vc_create: if (xprt) { - mem_free(xprt, sizeof(*xprt)); + sx_destroy(&xprt->xp_lock); + svc_xprt_free(xprt); } if (cd) mem_free(cd, sizeof(*cd)); @@ -451,7 +456,6 @@ svc_vc_destroy_common(SVCXPRT *xprt) } SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); - sx_destroy(&xprt->xp_lock); if (xprt->xp_socket) (void)soclose(xprt->xp_socket); @@ -537,6 +541,15 @@ svc_vc_stat(SVCXPRT *xprt) return (XPRT_IDLE); } +static bool_t +svc_vc_ack(SVCXPRT *xprt, uint32_t *ack) +{ + + *ack = atomic_load_acq_32(&xprt->xp_snt_cnt); + *ack -= xprt->xp_socket->so_snd.sb_cc; + return (TRUE); +} + static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *xprt) { @@ -785,12 +798,12 @@ svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg, static bool_t svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, - struct sockaddr *addr, struct mbuf *m) + struct sockaddr *addr, struct mbuf *m, uint32_t *seq) { XDR xdrs; struct mbuf *mrep; bool_t stat = TRUE; - int error; + int error, len; /* * Leave space for record mark. @@ -817,14 +830,19 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, * Prepend a record marker containing the reply length. */ M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); + len = mrep->m_pkthdr.len; *mtod(mrep, uint32_t *) = - htonl(0x80000000 | (mrep->m_pkthdr.len - - sizeof(uint32_t))); + htonl(0x80000000 | (len - sizeof(uint32_t))); + atomic_add_acq_32(&xprt->xp_snd_cnt, len); error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL, 0, curthread); if (!error) { + atomic_add_rel_32(&xprt->xp_snt_cnt, len); + if (seq) + *seq = xprt->xp_snd_cnt; stat = TRUE; - } + } else + atomic_subtract_32(&xprt->xp_snd_cnt, len); } else { m_freem(mrep); } @@ -837,7 +855,7 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, static bool_t svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg, - struct sockaddr *addr, struct mbuf *m) + struct sockaddr *addr, struct mbuf *m, uint32_t *seq) { struct ct_data *ct; XDR xdrs; |