summaryrefslogtreecommitdiffstats
path: root/sys/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'sys/rpc')
-rw-r--r--sys/rpc/svc.c67
-rw-r--r--sys/rpc/svc.h55
-rw-r--r--sys/rpc/svc_dg.c4
-rw-r--r--sys/rpc/svc_vc.c42
4 files changed, 146 insertions, 22 deletions
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c
index b42bafb..bc82cd4 100644
--- a/sys/rpc/svc.c
+++ b/sys/rpc/svc.c
@@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$");
#include <sys/queue.h>
#include <sys/socketvar.h>
#include <sys/systm.h>
+#include <sys/sx.h>
#include <sys/ucred.h>
#include <rpc/rpc.h>
@@ -93,6 +94,7 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
TAILQ_INIT(&pool->sp_xlist);
TAILQ_INIT(&pool->sp_active);
TAILQ_INIT(&pool->sp_callouts);
+ TAILQ_INIT(&pool->sp_lcallouts);
LIST_INIT(&pool->sp_threads);
LIST_INIT(&pool->sp_idlethreads);
pool->sp_minthreads = 1;
@@ -158,6 +160,7 @@ svcpool_destroy(SVCPOOL *pool)
{
SVCXPRT *xprt, *nxprt;
struct svc_callout *s;
+ struct svc_loss_callout *sl;
struct svcxprt_list cleanup;
TAILQ_INIT(&cleanup);
@@ -169,12 +172,16 @@ svcpool_destroy(SVCPOOL *pool)
TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
}
- while (TAILQ_FIRST(&pool->sp_callouts)) {
- s = TAILQ_FIRST(&pool->sp_callouts);
+ while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
mtx_unlock(&pool->sp_lock);
svc_unreg(pool, s->sc_prog, s->sc_vers);
mtx_lock(&pool->sp_lock);
}
+ while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
+ mtx_unlock(&pool->sp_lock);
+ svc_loss_unreg(pool, sl->slc_dispatch);
+ mtx_lock(&pool->sp_lock);
+ }
mtx_unlock(&pool->sp_lock);
TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
@@ -511,6 +518,55 @@ svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
mtx_unlock(&pool->sp_lock);
}
+/*
+ * Add a service connection loss program to the callout list.
+ * The dispatch routine will be called when some port in ths pool die.
+ */
+bool_t
+svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
+{
+ SVCPOOL *pool = xprt->xp_pool;
+ struct svc_loss_callout *s;
+
+ mtx_lock(&pool->sp_lock);
+ TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
+ if (s->slc_dispatch == dispatch)
+ break;
+ }
+ if (s != NULL) {
+ mtx_unlock(&pool->sp_lock);
+ return (TRUE);
+ }
+ s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
+ if (s == NULL) {
+ mtx_unlock(&pool->sp_lock);
+ return (FALSE);
+ }
+ s->slc_dispatch = dispatch;
+ TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
+ mtx_unlock(&pool->sp_lock);
+ return (TRUE);
+}
+
+/*
+ * Remove a service connection loss program from the callout list.
+ */
+void
+svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
+{
+ struct svc_loss_callout *s;
+
+ mtx_lock(&pool->sp_lock);
+ TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
+ if (s->slc_dispatch == dispatch) {
+ TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
+ free(s, M_RPC);
+ break;
+ }
+ }
+ mtx_unlock(&pool->sp_lock);
+}
+
/* ********************** CALLOUT list related stuff ************* */
/*
@@ -554,7 +610,7 @@ svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
return (FALSE);
- ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
+ ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
if (rqstp->rq_addr) {
free(rqstp->rq_addr, M_SONAME);
rqstp->rq_addr = NULL;
@@ -803,6 +859,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
struct svc_req *r;
struct rpc_msg msg;
struct mbuf *args;
+ struct svc_loss_callout *s;
enum xprt_stat stat;
/* now receive msgs from xprtprt (support batch calls) */
@@ -831,7 +888,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
break;
case RS_DONE:
SVC_REPLY(xprt, &repmsg, r->rq_addr,
- repbody);
+ repbody, &r->rq_reply_seq);
if (r->rq_addr) {
free(r->rq_addr, M_SONAME);
r->rq_addr = NULL;
@@ -881,6 +938,8 @@ call_done:
r = NULL;
}
if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
+ TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
+ (*s->slc_dispatch)(xprt);
xprt_unregister(xprt);
}
diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h
index 7d9428e..a7f5f51 100644
--- a/sys/rpc/svc.h
+++ b/sys/rpc/svc.h
@@ -103,9 +103,11 @@ struct xp_ops {
struct sockaddr **, struct mbuf **);
/* get transport status */
enum xprt_stat (*xp_stat)(struct __rpc_svcxprt *);
+ /* get transport acknowledge sequence */
+ bool_t (*xp_ack)(struct __rpc_svcxprt *, uint32_t *);
/* send reply */
bool_t (*xp_reply)(struct __rpc_svcxprt *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *);
/* destroy this struct */
void (*xp_destroy)(struct __rpc_svcxprt *);
/* catch-all function */
@@ -166,6 +168,8 @@ typedef struct __rpc_svcxprt {
time_t xp_lastactive; /* time of last RPC */
u_int64_t xp_sockref; /* set by nfsv4 to identify socket */
int xp_upcallset; /* socket upcall is set up */
+ uint32_t xp_snd_cnt; /* # of bytes to send to socket */
+ uint32_t xp_snt_cnt; /* # of bytes sent to socket */
#else
int xp_fd;
u_short xp_port; /* associated port number */
@@ -230,6 +234,17 @@ struct svc_callout {
};
TAILQ_HEAD(svc_callout_list, svc_callout);
+/*
+ * The services connection loss list
+ * The dispatch routine takes request structs and runs the
+ * apropriate procedure.
+ */
+struct svc_loss_callout {
+ TAILQ_ENTRY(svc_loss_callout) slc_link;
+ void (*slc_dispatch)(SVCXPRT *);
+};
+TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout);
+
struct __rpc_svcthread;
/*
@@ -253,6 +268,7 @@ struct svc_req {
void *rq_p1; /* application workspace */
int rq_p2; /* application workspace */
uint64_t rq_p3; /* application workspace */
+ uint32_t rq_reply_seq; /* reply socket sequence # */
char rq_credarea[3*MAX_AUTH_BYTES];
};
STAILQ_HEAD(svc_reqlist, svc_req);
@@ -311,13 +327,14 @@ enum svcpool_state {
typedef SVCTHREAD *pool_assign_fn(SVCTHREAD *, struct svc_req *);
typedef void pool_done_fn(SVCTHREAD *, struct svc_req *);
typedef struct __rpc_svcpool {
- struct mtx sp_lock; /* protect the transport lists */
+ struct mtx_padalign sp_lock; /* protect the transport lists */
const char *sp_name; /* pool name (e.g. "nfsd", "NLM" */
enum svcpool_state sp_state; /* current pool state */
struct proc *sp_proc; /* process which is in svc_run */
struct svcxprt_list sp_xlist; /* all transports in the pool */
struct svcxprt_list sp_active; /* transports needing service */
struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */
+ struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */
struct svcthread_list sp_threads; /* service threads */
struct svcthread_list sp_idlethreads; /* idle service threads */
int sp_minthreads; /* minimum service thread count */
@@ -393,8 +410,12 @@ struct svc_req {
#define SVC_STAT(xprt) \
(*(xprt)->xp_ops->xp_stat)(xprt)
-#define SVC_REPLY(xprt, msg, addr, m) \
- (*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m))
+#define SVC_ACK(xprt, ack) \
+ ((xprt)->xp_ops->xp_ack == NULL ? FALSE : \
+ ((ack) == NULL ? TRUE : (*(xprt)->xp_ops->xp_ack)((xprt), (ack))))
+
+#define SVC_REPLY(xprt, msg, addr, m, seq) \
+ (*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m), (seq))
#define SVC_DESTROY(xprt) \
(*(xprt)->xp_ops->xp_destroy)(xprt)
@@ -495,6 +516,32 @@ extern void svc_unreg(const rpcprog_t, const rpcvers_t);
#endif
__END_DECLS
+#ifdef _KERNEL
+/*
+ * Service connection loss registration
+ *
+ * svc_loss_reg(xprt, dispatch)
+ * const SVCXPRT *xprt;
+ * const void (*dispatch)();
+ */
+
+__BEGIN_DECLS
+extern bool_t svc_loss_reg(SVCXPRT *, void (*)(SVCXPRT *));
+__END_DECLS
+
+/*
+ * Service connection loss un-registration
+ *
+ * svc_loss_unreg(xprt, dispatch)
+ * const SVCXPRT *xprt;
+ * const void (*dispatch)();
+ */
+
+__BEGIN_DECLS
+extern void svc_loss_unreg(SVCPOOL *, void (*)(SVCXPRT *));
+__END_DECLS
+#endif
+
/*
* Transport registration.
*
diff --git a/sys/rpc/svc_dg.c b/sys/rpc/svc_dg.c
index 79f8429..1c530bd 100644
--- a/sys/rpc/svc_dg.c
+++ b/sys/rpc/svc_dg.c
@@ -66,7 +66,7 @@ static enum xprt_stat svc_dg_stat(SVCXPRT *);
static bool_t svc_dg_recv(SVCXPRT *, struct rpc_msg *,
struct sockaddr **, struct mbuf **);
static bool_t svc_dg_reply(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *);
static void svc_dg_destroy(SVCXPRT *);
static bool_t svc_dg_control(SVCXPRT *, const u_int, void *);
static int svc_dg_soupcall(struct socket *so, void *arg, int waitflag);
@@ -230,7 +230,7 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_msg *msg,
static bool_t
svc_dg_reply(SVCXPRT *xprt, struct rpc_msg *msg,
- struct sockaddr *addr, struct mbuf *m)
+ struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
{
XDR xdrs;
struct mbuf *mrep;
diff --git a/sys/rpc/svc_vc.c b/sys/rpc/svc_vc.c
index 3140915..5fe6488 100644
--- a/sys/rpc/svc_vc.c
+++ b/sys/rpc/svc_vc.c
@@ -76,10 +76,11 @@ static void svc_vc_rendezvous_destroy(SVCXPRT *);
static bool_t svc_vc_null(void);
static void svc_vc_destroy(SVCXPRT *);
static enum xprt_stat svc_vc_stat(SVCXPRT *);
+static bool_t svc_vc_ack(SVCXPRT *, uint32_t *);
static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
struct sockaddr **, struct mbuf **);
static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *seq);
static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
void *in);
@@ -88,7 +89,7 @@ static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
struct sockaddr **, struct mbuf **);
static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *);
+ struct sockaddr *, struct mbuf *, uint32_t *);
static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
void *in);
static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
@@ -100,7 +101,7 @@ static struct xp_ops svc_vc_rendezvous_ops = {
.xp_recv = svc_vc_rendezvous_recv,
.xp_stat = svc_vc_rendezvous_stat,
.xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *,
- struct sockaddr *, struct mbuf *))svc_vc_null,
+ struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null,
.xp_destroy = svc_vc_rendezvous_destroy,
.xp_control = svc_vc_rendezvous_control
};
@@ -108,6 +109,7 @@ static struct xp_ops svc_vc_rendezvous_ops = {
static struct xp_ops svc_vc_ops = {
.xp_recv = svc_vc_recv,
.xp_stat = svc_vc_stat,
+ .xp_ack = svc_vc_ack,
.xp_reply = svc_vc_reply,
.xp_destroy = svc_vc_destroy,
.xp_control = svc_vc_control
@@ -184,8 +186,10 @@ svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
return (xprt);
cleanup_svc_vc_create:
- if (xprt)
+ if (xprt) {
+ sx_destroy(&xprt->xp_lock);
svc_xprt_free(xprt);
+ }
return (NULL);
}
@@ -270,7 +274,8 @@ svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
return (xprt);
cleanup_svc_vc_create:
if (xprt) {
- mem_free(xprt, sizeof(*xprt));
+ sx_destroy(&xprt->xp_lock);
+ svc_xprt_free(xprt);
}
if (cd)
mem_free(cd, sizeof(*cd));
@@ -451,7 +456,6 @@ svc_vc_destroy_common(SVCXPRT *xprt)
}
SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
- sx_destroy(&xprt->xp_lock);
if (xprt->xp_socket)
(void)soclose(xprt->xp_socket);
@@ -537,6 +541,15 @@ svc_vc_stat(SVCXPRT *xprt)
return (XPRT_IDLE);
}
+static bool_t
+svc_vc_ack(SVCXPRT *xprt, uint32_t *ack)
+{
+
+ *ack = atomic_load_acq_32(&xprt->xp_snt_cnt);
+ *ack -= xprt->xp_socket->so_snd.sb_cc;
+ return (TRUE);
+}
+
static enum xprt_stat
svc_vc_backchannel_stat(SVCXPRT *xprt)
{
@@ -785,12 +798,12 @@ svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
static bool_t
svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
- struct sockaddr *addr, struct mbuf *m)
+ struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
{
XDR xdrs;
struct mbuf *mrep;
bool_t stat = TRUE;
- int error;
+ int error, len;
/*
* Leave space for record mark.
@@ -817,14 +830,19 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
* Prepend a record marker containing the reply length.
*/
M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
+ len = mrep->m_pkthdr.len;
*mtod(mrep, uint32_t *) =
- htonl(0x80000000 | (mrep->m_pkthdr.len
- - sizeof(uint32_t)));
+ htonl(0x80000000 | (len - sizeof(uint32_t)));
+ atomic_add_acq_32(&xprt->xp_snd_cnt, len);
error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
0, curthread);
if (!error) {
+ atomic_add_rel_32(&xprt->xp_snt_cnt, len);
+ if (seq)
+ *seq = xprt->xp_snd_cnt;
stat = TRUE;
- }
+ } else
+ atomic_subtract_32(&xprt->xp_snd_cnt, len);
} else {
m_freem(mrep);
}
@@ -837,7 +855,7 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
static bool_t
svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
- struct sockaddr *addr, struct mbuf *m)
+ struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
{
struct ct_data *ct;
XDR xdrs;
OpenPOWER on IntegriCloud