summaryrefslogtreecommitdiffstats
path: root/sys/rpc/svc.c
diff options
context:
space:
mode:
Diffstat (limited to 'sys/rpc/svc.c')
-rw-r--r--sys/rpc/svc.c1048
1 files changed, 894 insertions, 154 deletions
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c
index d6d6d78..8af9e80 100644
--- a/sys/rpc/svc.c
+++ b/sys/rpc/svc.c
@@ -49,37 +49,105 @@ __FBSDID("$FreeBSD$");
#include <sys/param.h>
#include <sys/lock.h>
#include <sys/kernel.h>
+#include <sys/kthread.h>
#include <sys/malloc.h>
+#include <sys/mbuf.h>
#include <sys/mutex.h>
+#include <sys/proc.h>
#include <sys/queue.h>
+#include <sys/socketvar.h>
#include <sys/systm.h>
#include <sys/ucred.h>
#include <rpc/rpc.h>
#include <rpc/rpcb_clnt.h>
+#include <rpc/replay.h>
#include <rpc/rpc_com.h>
#define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
-#define version_keepquiet(xp) ((u_long)(xp)->xp_p3 & SVC_VERSQUIET)
+#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
char *);
-static void __xprt_do_unregister (SVCXPRT *xprt, bool_t dolock);
+static void svc_new_thread(SVCPOOL *pool);
+static void xprt_unregister_locked(SVCXPRT *xprt);
/* *************** SVCXPRT related stuff **************** */
+static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
+static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
+
SVCPOOL*
-svcpool_create(void)
+svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
{
SVCPOOL *pool;
pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
+ pool->sp_name = name;
+ pool->sp_state = SVCPOOL_INIT;
+ pool->sp_proc = NULL;
TAILQ_INIT(&pool->sp_xlist);
TAILQ_INIT(&pool->sp_active);
TAILQ_INIT(&pool->sp_callouts);
+ LIST_INIT(&pool->sp_threads);
+ LIST_INIT(&pool->sp_idlethreads);
+ pool->sp_minthreads = 1;
+ pool->sp_maxthreads = 1;
+ pool->sp_threadcount = 0;
+
+ /*
+ * Don't use more than a quarter of mbuf clusters or more than
+ * 45Mb buffering requests.
+ */
+ pool->sp_space_high = nmbclusters * MCLBYTES / 4;
+ if (pool->sp_space_high > 45 << 20)
+ pool->sp_space_high = 45 << 20;
+ pool->sp_space_low = 2 * pool->sp_space_high / 3;
+
+ sysctl_ctx_init(&pool->sp_sysctl);
+ if (sysctl_base) {
+ SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+ "minthreads", CTLTYPE_INT | CTLFLAG_RW,
+ pool, 0, svcpool_minthread_sysctl, "I", "");
+ SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+ "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
+ pool, 0, svcpool_maxthread_sysctl, "I", "");
+ SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+ "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
+
+ SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+ "request_space_used", CTLFLAG_RD,
+ &pool->sp_space_used, 0,
+ "Space in parsed but not handled requests.");
+
+ SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+ "request_space_used_highest", CTLFLAG_RD,
+ &pool->sp_space_used_highest, 0,
+ "Highest space used since reboot.");
+
+ SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+ "request_space_high", CTLFLAG_RW,
+ &pool->sp_space_high, 0,
+ "Maximum space in parsed but not handled requests.");
+
+ SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+ "request_space_low", CTLFLAG_RW,
+ &pool->sp_space_low, 0,
+ "Low water mark for request space.");
+
+ SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+ "request_space_throttled", CTLFLAG_RD,
+ &pool->sp_space_throttled, 0,
+ "Whether nfs requests are currently throttled");
+
+ SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+ "request_space_throttle_count", CTLFLAG_RD,
+ &pool->sp_space_throttle_count, 0,
+ "Count of times throttling based on request space has occurred");
+ }
return pool;
}
@@ -87,16 +155,17 @@ svcpool_create(void)
void
svcpool_destroy(SVCPOOL *pool)
{
- SVCXPRT *xprt;
+ SVCXPRT *xprt, *nxprt;
struct svc_callout *s;
+ struct svcxprt_list cleanup;
+ TAILQ_INIT(&cleanup);
mtx_lock(&pool->sp_lock);
while (TAILQ_FIRST(&pool->sp_xlist)) {
xprt = TAILQ_FIRST(&pool->sp_xlist);
- mtx_unlock(&pool->sp_lock);
- SVC_DESTROY(xprt);
- mtx_lock(&pool->sp_lock);
+ xprt_unregister_locked(xprt);
+ TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
}
while (TAILQ_FIRST(&pool->sp_callouts)) {
@@ -107,9 +176,97 @@ svcpool_destroy(SVCPOOL *pool)
}
mtx_destroy(&pool->sp_lock);
+
+ TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
+ SVC_RELEASE(xprt);
+ }
+
+ if (pool->sp_rcache)
+ replay_freecache(pool->sp_rcache);
+
+ sysctl_ctx_free(&pool->sp_sysctl);
free(pool, M_RPC);
}
+static bool_t
+svcpool_active(SVCPOOL *pool)
+{
+ enum svcpool_state state = pool->sp_state;
+
+ if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
+ return (FALSE);
+ return (TRUE);
+}
+
+/*
+ * Sysctl handler to set the minimum thread count on a pool
+ */
+static int
+svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
+{
+ SVCPOOL *pool;
+ int newminthreads, error, n;
+
+ pool = oidp->oid_arg1;
+ newminthreads = pool->sp_minthreads;
+ error = sysctl_handle_int(oidp, &newminthreads, 0, req);
+ if (error == 0 && newminthreads != pool->sp_minthreads) {
+ if (newminthreads > pool->sp_maxthreads)
+ return (EINVAL);
+ mtx_lock(&pool->sp_lock);
+ if (newminthreads > pool->sp_minthreads
+ && svcpool_active(pool)) {
+ /*
+ * If the pool is running and we are
+ * increasing, create some more threads now.
+ */
+ n = newminthreads - pool->sp_threadcount;
+ if (n > 0) {
+ mtx_unlock(&pool->sp_lock);
+ while (n--)
+ svc_new_thread(pool);
+ mtx_lock(&pool->sp_lock);
+ }
+ }
+ pool->sp_minthreads = newminthreads;
+ mtx_unlock(&pool->sp_lock);
+ }
+ return (error);
+}
+
+/*
+ * Sysctl handler to set the maximum thread count on a pool
+ */
+static int
+svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
+{
+ SVCPOOL *pool;
+ SVCTHREAD *st;
+ int newmaxthreads, error;
+
+ pool = oidp->oid_arg1;
+ newmaxthreads = pool->sp_maxthreads;
+ error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
+ if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
+ if (newmaxthreads < pool->sp_minthreads)
+ return (EINVAL);
+ mtx_lock(&pool->sp_lock);
+ if (newmaxthreads < pool->sp_maxthreads
+ && svcpool_active(pool)) {
+ /*
+ * If the pool is running and we are
+ * decreasing, wake up some idle threads to
+ * encourage them to exit.
+ */
+ LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
+ cv_signal(&st->st_cond);
+ }
+ pool->sp_maxthreads = newmaxthreads;
+ mtx_unlock(&pool->sp_lock);
+ }
+ return (error);
+}
+
/*
* Activate a transport handle.
*/
@@ -125,40 +282,70 @@ xprt_register(SVCXPRT *xprt)
mtx_unlock(&pool->sp_lock);
}
-void
-xprt_unregister(SVCXPRT *xprt)
-{
- __xprt_do_unregister(xprt, TRUE);
-}
-
-void
-__xprt_unregister_unlocked(SVCXPRT *xprt)
-{
- __xprt_do_unregister(xprt, FALSE);
-}
-
/*
- * De-activate a transport handle.
+ * De-activate a transport handle. Note: the locked version doesn't
+ * release the transport - caller must do that after dropping the pool
+ * lock.
*/
static void
-__xprt_do_unregister(SVCXPRT *xprt, bool_t dolock)
+xprt_unregister_locked(SVCXPRT *xprt)
{
SVCPOOL *pool = xprt->xp_pool;
- //__svc_generic_cleanup(xprt);
-
- if (dolock)
- mtx_lock(&pool->sp_lock);
-
if (xprt->xp_active) {
TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
xprt->xp_active = FALSE;
}
TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
xprt->xp_registered = FALSE;
+}
- if (dolock)
- mtx_unlock(&pool->sp_lock);
+void
+xprt_unregister(SVCXPRT *xprt)
+{
+ SVCPOOL *pool = xprt->xp_pool;
+
+ mtx_lock(&pool->sp_lock);
+ xprt_unregister_locked(xprt);
+ mtx_unlock(&pool->sp_lock);
+
+ SVC_RELEASE(xprt);
+}
+
+static void
+xprt_assignthread(SVCXPRT *xprt)
+{
+ SVCPOOL *pool = xprt->xp_pool;
+ SVCTHREAD *st;
+
+ /*
+ * Attempt to assign a service thread to this
+ * transport.
+ */
+ LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
+ if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
+ break;
+ }
+ if (st) {
+ SVC_ACQUIRE(xprt);
+ xprt->xp_thread = st;
+ st->st_xprt = xprt;
+ cv_signal(&st->st_cond);
+ } else {
+ /*
+ * See if we can create a new thread. The
+ * actual thread creation happens in
+ * svc_run_internal because our locking state
+ * is poorly defined (we are typically called
+ * from a socket upcall). Don't create more
+ * than one thread per second.
+ */
+ if (pool->sp_state == SVCPOOL_ACTIVE
+ && pool->sp_lastcreatetime < time_uptime
+ && pool->sp_threadcount < pool->sp_maxthreads) {
+ pool->sp_state = SVCPOOL_THREADWANTED;
+ }
+ }
}
void
@@ -166,30 +353,42 @@ xprt_active(SVCXPRT *xprt)
{
SVCPOOL *pool = xprt->xp_pool;
+ if (!xprt->xp_registered) {
+ /*
+ * Race with xprt_unregister - we lose.
+ */
+ return;
+ }
+
mtx_lock(&pool->sp_lock);
if (!xprt->xp_active) {
TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
xprt->xp_active = TRUE;
+ xprt_assignthread(xprt);
}
- wakeup(&pool->sp_active);
mtx_unlock(&pool->sp_lock);
}
void
-xprt_inactive(SVCXPRT *xprt)
+xprt_inactive_locked(SVCXPRT *xprt)
{
SVCPOOL *pool = xprt->xp_pool;
- mtx_lock(&pool->sp_lock);
-
if (xprt->xp_active) {
TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
xprt->xp_active = FALSE;
}
- wakeup(&pool->sp_active);
+}
+
+void
+xprt_inactive(SVCXPRT *xprt)
+{
+ SVCPOOL *pool = xprt->xp_pool;
+ mtx_lock(&pool->sp_lock);
+ xprt_inactive_locked(xprt);
mtx_unlock(&pool->sp_lock);
}
@@ -253,9 +452,11 @@ rpcb_it:
if (nconf) {
bool_t dummy;
struct netconfig tnc;
+ struct netbuf nb;
tnc = *nconf;
- dummy = rpcb_set(prog, vers, &tnc,
- &((SVCXPRT *) xprt)->xp_ltaddr);
+ nb.buf = &xprt->xp_ltaddr;
+ nb.len = xprt->xp_ltaddr.ss_len;
+ dummy = rpcb_set(prog, vers, &tnc, &nb);
return (dummy);
}
return (TRUE);
@@ -305,270 +506,809 @@ svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
/* ******************* REPLY GENERATION ROUTINES ************ */
+static bool_t
+svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
+ struct mbuf *body)
+{
+ SVCXPRT *xprt = rqstp->rq_xprt;
+ bool_t ok;
+
+ if (rqstp->rq_args) {
+ m_freem(rqstp->rq_args);
+ rqstp->rq_args = NULL;
+ }
+
+ if (xprt->xp_pool->sp_rcache)
+ replay_setreply(xprt->xp_pool->sp_rcache,
+ rply, svc_getrpccaller(rqstp), body);
+
+ if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
+ return (FALSE);
+
+ ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
+ if (rqstp->rq_addr) {
+ free(rqstp->rq_addr, M_SONAME);
+ rqstp->rq_addr = NULL;
+ }
+
+ return (ok);
+}
+
/*
* Send a reply to an rpc request
*/
bool_t
-svc_sendreply(SVCXPRT *xprt, xdrproc_t xdr_results, void * xdr_location)
+svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
{
struct rpc_msg rply;
+ struct mbuf *m;
+ XDR xdrs;
+ bool_t ok;
+ rply.rm_xid = rqstp->rq_xid;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
- rply.acpted_rply.ar_verf = xprt->xp_verf;
+ rply.acpted_rply.ar_verf = rqstp->rq_verf;
rply.acpted_rply.ar_stat = SUCCESS;
- rply.acpted_rply.ar_results.where = xdr_location;
- rply.acpted_rply.ar_results.proc = xdr_results;
+ rply.acpted_rply.ar_results.where = NULL;
+ rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
+
+ MGET(m, M_WAIT, MT_DATA);
+ MCLGET(m, M_WAIT);
+ m->m_len = 0;
+ xdrmbuf_create(&xdrs, m, XDR_ENCODE);
+ ok = xdr_results(&xdrs, xdr_location);
+ XDR_DESTROY(&xdrs);
+
+ if (ok) {
+ return (svc_sendreply_common(rqstp, &rply, m));
+ } else {
+ m_freem(m);
+ return (FALSE);
+ }
+}
- return (SVC_REPLY(xprt, &rply));
+bool_t
+svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
+{
+ struct rpc_msg rply;
+
+ rply.rm_xid = rqstp->rq_xid;
+ rply.rm_direction = REPLY;
+ rply.rm_reply.rp_stat = MSG_ACCEPTED;
+ rply.acpted_rply.ar_verf = rqstp->rq_verf;
+ rply.acpted_rply.ar_stat = SUCCESS;
+ rply.acpted_rply.ar_results.where = NULL;
+ rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
+
+ return (svc_sendreply_common(rqstp, &rply, m));
}
/*
* No procedure error reply
*/
void
-svcerr_noproc(SVCXPRT *xprt)
+svcerr_noproc(struct svc_req *rqstp)
{
+ SVCXPRT *xprt = rqstp->rq_xprt;
struct rpc_msg rply;
+ rply.rm_xid = rqstp->rq_xid;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
- rply.acpted_rply.ar_verf = xprt->xp_verf;
+ rply.acpted_rply.ar_verf = rqstp->rq_verf;
rply.acpted_rply.ar_stat = PROC_UNAVAIL;
- SVC_REPLY(xprt, &rply);
+ if (xprt->xp_pool->sp_rcache)
+ replay_setreply(xprt->xp_pool->sp_rcache,
+ &rply, svc_getrpccaller(rqstp), NULL);
+
+ svc_sendreply_common(rqstp, &rply, NULL);
}
/*
* Can't decode args error reply
*/
void
-svcerr_decode(SVCXPRT *xprt)
+svcerr_decode(struct svc_req *rqstp)
{
+ SVCXPRT *xprt = rqstp->rq_xprt;
struct rpc_msg rply;
+ rply.rm_xid = rqstp->rq_xid;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
- rply.acpted_rply.ar_verf = xprt->xp_verf;
+ rply.acpted_rply.ar_verf = rqstp->rq_verf;
rply.acpted_rply.ar_stat = GARBAGE_ARGS;
- SVC_REPLY(xprt, &rply);
+ if (xprt->xp_pool->sp_rcache)
+ replay_setreply(xprt->xp_pool->sp_rcache,
+ &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
+
+ svc_sendreply_common(rqstp, &rply, NULL);
}
/*
* Some system error
*/
void
-svcerr_systemerr(SVCXPRT *xprt)
+svcerr_systemerr(struct svc_req *rqstp)
{
+ SVCXPRT *xprt = rqstp->rq_xprt;
struct rpc_msg rply;
+ rply.rm_xid = rqstp->rq_xid;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
- rply.acpted_rply.ar_verf = xprt->xp_verf;
+ rply.acpted_rply.ar_verf = rqstp->rq_verf;
rply.acpted_rply.ar_stat = SYSTEM_ERR;
- SVC_REPLY(xprt, &rply);
+ if (xprt->xp_pool->sp_rcache)
+ replay_setreply(xprt->xp_pool->sp_rcache,
+ &rply, svc_getrpccaller(rqstp), NULL);
+
+ svc_sendreply_common(rqstp, &rply, NULL);
}
/*
* Authentication error reply
*/
void
-svcerr_auth(SVCXPRT *xprt, enum auth_stat why)
+svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
{
+ SVCXPRT *xprt = rqstp->rq_xprt;
struct rpc_msg rply;
+ rply.rm_xid = rqstp->rq_xid;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_DENIED;
rply.rjcted_rply.rj_stat = AUTH_ERROR;
rply.rjcted_rply.rj_why = why;
- SVC_REPLY(xprt, &rply);
+ if (xprt->xp_pool->sp_rcache)
+ replay_setreply(xprt->xp_pool->sp_rcache,
+ &rply, svc_getrpccaller(rqstp), NULL);
+
+ svc_sendreply_common(rqstp, &rply, NULL);
}
/*
* Auth too weak error reply
*/
void
-svcerr_weakauth(SVCXPRT *xprt)
+svcerr_weakauth(struct svc_req *rqstp)
{
- svcerr_auth(xprt, AUTH_TOOWEAK);
+ svcerr_auth(rqstp, AUTH_TOOWEAK);
}
/*
* Program unavailable error reply
*/
void
-svcerr_noprog(SVCXPRT *xprt)
+svcerr_noprog(struct svc_req *rqstp)
{
+ SVCXPRT *xprt = rqstp->rq_xprt;
struct rpc_msg rply;
+ rply.rm_xid = rqstp->rq_xid;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
- rply.acpted_rply.ar_verf = xprt->xp_verf;
+ rply.acpted_rply.ar_verf = rqstp->rq_verf;
rply.acpted_rply.ar_stat = PROG_UNAVAIL;
- SVC_REPLY(xprt, &rply);
+ if (xprt->xp_pool->sp_rcache)
+ replay_setreply(xprt->xp_pool->sp_rcache,
+ &rply, svc_getrpccaller(rqstp), NULL);
+
+ svc_sendreply_common(rqstp, &rply, NULL);
}
/*
* Program version mismatch error reply
*/
void
-svcerr_progvers(SVCXPRT *xprt, rpcvers_t low_vers, rpcvers_t high_vers)
+svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
{
+ SVCXPRT *xprt = rqstp->rq_xprt;
struct rpc_msg rply;
+ rply.rm_xid = rqstp->rq_xid;
rply.rm_direction = REPLY;
rply.rm_reply.rp_stat = MSG_ACCEPTED;
- rply.acpted_rply.ar_verf = xprt->xp_verf;
+ rply.acpted_rply.ar_verf = rqstp->rq_verf;
rply.acpted_rply.ar_stat = PROG_MISMATCH;
rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
- SVC_REPLY(xprt, &rply);
+ if (xprt->xp_pool->sp_rcache)
+ replay_setreply(xprt->xp_pool->sp_rcache,
+ &rply, svc_getrpccaller(rqstp), NULL);
+
+ svc_sendreply_common(rqstp, &rply, NULL);
}
-/* ******************* SERVER INPUT STUFF ******************* */
+/*
+ * Allocate a new server transport structure. All fields are
+ * initialized to zero and xp_p3 is initialized to point at an
+ * extension structure to hold various flags and authentication
+ * parameters.
+ */
+SVCXPRT *
+svc_xprt_alloc()
+{
+ SVCXPRT *xprt;
+ SVCXPRT_EXT *ext;
+
+ xprt = mem_alloc(sizeof(SVCXPRT));
+ memset(xprt, 0, sizeof(SVCXPRT));
+ ext = mem_alloc(sizeof(SVCXPRT_EXT));
+ memset(ext, 0, sizeof(SVCXPRT_EXT));
+ xprt->xp_p3 = ext;
+ refcount_init(&xprt->xp_refs, 1);
+
+ return (xprt);
+}
/*
- * Get server side input from some transport.
- *
- * Statement of authentication parameters management:
- * This function owns and manages all authentication parameters, specifically
- * the "raw" parameters (msg.rm_call.cb_cred and msg.rm_call.cb_verf) and
- * the "cooked" credentials (rqst->rq_clntcred).
- * In-kernel, we represent non-trivial cooked creds with struct ucred.
- * In all events, all three parameters are freed upon exit from this routine.
- * The storage is trivially management on the call stack in user land, but
- * is mallocated in kernel land.
+ * Free a server transport structure.
*/
+void
+svc_xprt_free(xprt)
+ SVCXPRT *xprt;
+{
-static void
-svc_getreq(SVCXPRT *xprt)
+ mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
+ mem_free(xprt, sizeof(SVCXPRT));
+}
+
+/* ******************* SERVER INPUT STUFF ******************* */
+
+/*
+ * Read RPC requests from a transport and queue them to be
+ * executed. We handle authentication and replay cache replies here.
+ * Actually dispatching the RPC is deferred till svc_executereq.
+ */
+static enum xprt_stat
+svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
{
SVCPOOL *pool = xprt->xp_pool;
- struct svc_req r;
+ struct svc_req *r;
struct rpc_msg msg;
- int prog_found;
- rpcvers_t low_vers;
- rpcvers_t high_vers;
+ struct mbuf *args;
enum xprt_stat stat;
- char cred_area[2*MAX_AUTH_BYTES + sizeof(struct xucred)];
-
- msg.rm_call.cb_cred.oa_base = cred_area;
- msg.rm_call.cb_verf.oa_base = &cred_area[MAX_AUTH_BYTES];
- r.rq_clntcred = &cred_area[2*MAX_AUTH_BYTES];
/* now receive msgs from xprtprt (support batch calls) */
- do {
- if (SVC_RECV(xprt, &msg)) {
-
- /* now find the exported program and call it */
- struct svc_callout *s;
- enum auth_stat why;
-
- r.rq_xprt = xprt;
- r.rq_prog = msg.rm_call.cb_prog;
- r.rq_vers = msg.rm_call.cb_vers;
- r.rq_proc = msg.rm_call.cb_proc;
- r.rq_cred = msg.rm_call.cb_cred;
- /* first authenticate the message */
- if ((why = _authenticate(&r, &msg)) != AUTH_OK) {
- svcerr_auth(xprt, why);
+ r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
+
+ msg.rm_call.cb_cred.oa_base = r->rq_credarea;
+ msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
+ r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
+ if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
+ enum auth_stat why;
+
+ /*
+ * Handle replays and authenticate before queuing the
+ * request to be executed.
+ */
+ SVC_ACQUIRE(xprt);
+ r->rq_xprt = xprt;
+ if (pool->sp_rcache) {
+ struct rpc_msg repmsg;
+ struct mbuf *repbody;
+ enum replay_state rs;
+ rs = replay_find(pool->sp_rcache, &msg,
+ svc_getrpccaller(r), &repmsg, &repbody);
+ switch (rs) {
+ case RS_NEW:
+ break;
+ case RS_DONE:
+ SVC_REPLY(xprt, &repmsg, r->rq_addr,
+ repbody);
+ if (r->rq_addr) {
+ free(r->rq_addr, M_SONAME);
+ r->rq_addr = NULL;
+ }
+ goto call_done;
+
+ default:
goto call_done;
}
- /* now match message with a registered service*/
- prog_found = FALSE;
- low_vers = (rpcvers_t) -1L;
- high_vers = (rpcvers_t) 0L;
- TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
- if (s->sc_prog == r.rq_prog) {
- if (s->sc_vers == r.rq_vers) {
- (*s->sc_dispatch)(&r, xprt);
- goto call_done;
- } /* found correct version */
- prog_found = TRUE;
- if (s->sc_vers < low_vers)
- low_vers = s->sc_vers;
- if (s->sc_vers > high_vers)
- high_vers = s->sc_vers;
- } /* found correct program */
- }
+ }
+
+ r->rq_xid = msg.rm_xid;
+ r->rq_prog = msg.rm_call.cb_prog;
+ r->rq_vers = msg.rm_call.cb_vers;
+ r->rq_proc = msg.rm_call.cb_proc;
+ r->rq_size = sizeof(*r) + m_length(args, NULL);
+ r->rq_args = args;
+ if ((why = _authenticate(r, &msg)) != AUTH_OK) {
/*
- * if we got here, the program or version
- * is not served ...
+ * RPCSEC_GSS uses this return code
+ * for requests that form part of its
+ * context establishment protocol and
+ * should not be dispatched to the
+ * application.
*/
- if (prog_found)
- svcerr_progvers(xprt, low_vers, high_vers);
- else
- svcerr_noprog(xprt);
- /* Fall through to ... */
+ if (why != RPCSEC_GSS_NODISPATCH)
+ svcerr_auth(r, why);
+ goto call_done;
}
+
+ if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
+ svcerr_decode(r);
+ goto call_done;
+ }
+
/*
- * Check if the xprt has been disconnected in a
- * recursive call in the service dispatch routine.
- * If so, then break.
+ * Everything checks out, return request to caller.
*/
- mtx_lock(&pool->sp_lock);
- if (!xprt->xp_registered) {
- mtx_unlock(&pool->sp_lock);
- break;
- }
- mtx_unlock(&pool->sp_lock);
+ *rqstp_ret = r;
+ r = NULL;
+ }
call_done:
- if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
- SVC_DESTROY(xprt);
- break;
+ if (r) {
+ svc_freereq(r);
+ r = NULL;
+ }
+ if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
+ xprt_unregister(xprt);
+ }
+
+ return (stat);
+}
+
+static void
+svc_executereq(struct svc_req *rqstp)
+{
+ SVCXPRT *xprt = rqstp->rq_xprt;
+ SVCPOOL *pool = xprt->xp_pool;
+ int prog_found;
+ rpcvers_t low_vers;
+ rpcvers_t high_vers;
+ struct svc_callout *s;
+
+ /* now match message with a registered service*/
+ prog_found = FALSE;
+ low_vers = (rpcvers_t) -1L;
+ high_vers = (rpcvers_t) 0L;
+ TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
+ if (s->sc_prog == rqstp->rq_prog) {
+ if (s->sc_vers == rqstp->rq_vers) {
+ /*
+ * We hand ownership of r to the
+ * dispatch method - they must call
+ * svc_freereq.
+ */
+ (*s->sc_dispatch)(rqstp, xprt);
+ return;
+ } /* found correct version */
+ prog_found = TRUE;
+ if (s->sc_vers < low_vers)
+ low_vers = s->sc_vers;
+ if (s->sc_vers > high_vers)
+ high_vers = s->sc_vers;
+ } /* found correct program */
+ }
+
+ /*
+ * if we got here, the program or version
+ * is not served ...
+ */
+ if (prog_found)
+ svcerr_progvers(rqstp, low_vers, high_vers);
+ else
+ svcerr_noprog(rqstp);
+
+ svc_freereq(rqstp);
+}
+
+static void
+svc_checkidle(SVCPOOL *pool)
+{
+ SVCXPRT *xprt, *nxprt;
+ time_t timo;
+ struct svcxprt_list cleanup;
+
+ TAILQ_INIT(&cleanup);
+ TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
+ /*
+ * Only some transports have idle timers. Don't time
+ * something out which is just waking up.
+ */
+ if (!xprt->xp_idletimeout || xprt->xp_thread)
+ continue;
+
+ timo = xprt->xp_lastactive + xprt->xp_idletimeout;
+ if (time_uptime > timo) {
+ xprt_unregister_locked(xprt);
+ TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
}
- } while (stat == XPRT_MOREREQS);
+ }
+
+ mtx_unlock(&pool->sp_lock);
+ TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
+ SVC_RELEASE(xprt);
+ }
+ mtx_lock(&pool->sp_lock);
+
}
-void
-svc_run(SVCPOOL *pool)
+static void
+svc_assign_waiting_sockets(SVCPOOL *pool)
+{
+ SVCXPRT *xprt;
+
+ TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
+ if (!xprt->xp_thread) {
+ xprt_assignthread(xprt);
+ }
+ }
+}
+
+static bool_t
+svc_request_space_available(SVCPOOL *pool)
+{
+
+ mtx_assert(&pool->sp_lock, MA_OWNED);
+
+ if (pool->sp_space_throttled) {
+ /*
+ * Below the low-water yet? If so, assign any waiting sockets.
+ */
+ if (pool->sp_space_used < pool->sp_space_low) {
+ pool->sp_space_throttled = FALSE;
+ svc_assign_waiting_sockets(pool);
+ return TRUE;
+ }
+
+ return FALSE;
+ } else {
+ if (pool->sp_space_used
+ >= pool->sp_space_high) {
+ pool->sp_space_throttled = TRUE;
+ pool->sp_space_throttle_count++;
+ return FALSE;
+ }
+
+ return TRUE;
+ }
+}
+
+static void
+svc_run_internal(SVCPOOL *pool, bool_t ismaster)
{
+ SVCTHREAD *st, *stpref;
SVCXPRT *xprt;
+ enum xprt_stat stat;
+ struct svc_req *rqstp;
int error;
+ st = mem_alloc(sizeof(*st));
+ st->st_xprt = NULL;
+ STAILQ_INIT(&st->st_reqs);
+ cv_init(&st->st_cond, "rpcsvc");
+
mtx_lock(&pool->sp_lock);
+ LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
- pool->sp_exited = FALSE;
+ /*
+ * If we are a new thread which was spawned to cope with
+ * increased load, set the state back to SVCPOOL_ACTIVE.
+ */
+ if (pool->sp_state == SVCPOOL_THREADSTARTING)
+ pool->sp_state = SVCPOOL_ACTIVE;
- while (!pool->sp_exited) {
- xprt = TAILQ_FIRST(&pool->sp_active);
- if (!xprt) {
- error = msleep(&pool->sp_active, &pool->sp_lock, PCATCH,
- "rpcsvc", 0);
- if (error)
+ while (pool->sp_state != SVCPOOL_CLOSING) {
+ /*
+ * Check for idle transports once per second.
+ */
+ if (time_uptime > pool->sp_lastidlecheck) {
+ pool->sp_lastidlecheck = time_uptime;
+ svc_checkidle(pool);
+ }
+
+ xprt = st->st_xprt;
+ if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
+ /*
+ * Enforce maxthreads count.
+ */
+ if (pool->sp_threadcount > pool->sp_maxthreads)
+ break;
+
+ /*
+ * Before sleeping, see if we can find an
+ * active transport which isn't being serviced
+ * by a thread.
+ */
+ if (svc_request_space_available(pool)) {
+ TAILQ_FOREACH(xprt, &pool->sp_active,
+ xp_alink) {
+ if (!xprt->xp_thread) {
+ SVC_ACQUIRE(xprt);
+ xprt->xp_thread = st;
+ st->st_xprt = xprt;
+ break;
+ }
+ }
+ }
+ if (st->st_xprt)
+ continue;
+
+ LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
+ error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock,
+ 5 * hz);
+ LIST_REMOVE(st, st_ilink);
+
+ /*
+ * Reduce worker thread count when idle.
+ */
+ if (error == EWOULDBLOCK) {
+ if (!ismaster
+ && (pool->sp_threadcount
+ > pool->sp_minthreads)
+ && !st->st_xprt
+ && STAILQ_EMPTY(&st->st_reqs))
+ break;
+ }
+ if (error == EWOULDBLOCK)
+ continue;
+ if (error) {
+ if (pool->sp_state != SVCPOOL_CLOSING) {
+ mtx_unlock(&pool->sp_lock);
+ svc_exit(pool);
+ mtx_lock(&pool->sp_lock);
+ }
break;
+ }
+
+ if (pool->sp_state == SVCPOOL_THREADWANTED) {
+ pool->sp_state = SVCPOOL_THREADSTARTING;
+ pool->sp_lastcreatetime = time_uptime;
+ mtx_unlock(&pool->sp_lock);
+ svc_new_thread(pool);
+ mtx_lock(&pool->sp_lock);
+ }
continue;
}
+ if (xprt) {
+ /*
+ * Drain the transport socket and queue up any
+ * RPCs.
+ */
+ xprt->xp_lastactive = time_uptime;
+ stat = XPRT_IDLE;
+ do {
+ if (!svc_request_space_available(pool))
+ break;
+ rqstp = NULL;
+ mtx_unlock(&pool->sp_lock);
+ stat = svc_getreq(xprt, &rqstp);
+ mtx_lock(&pool->sp_lock);
+ if (rqstp) {
+ /*
+ * See if the application has
+ * a preference for some other
+ * thread.
+ */
+ stpref = st;
+ if (pool->sp_assign)
+ stpref = pool->sp_assign(st,
+ rqstp);
+
+ pool->sp_space_used +=
+ rqstp->rq_size;
+ if (pool->sp_space_used
+ > pool->sp_space_used_highest)
+ pool->sp_space_used_highest =
+ pool->sp_space_used;
+ rqstp->rq_thread = stpref;
+ STAILQ_INSERT_TAIL(&stpref->st_reqs,
+ rqstp, rq_link);
+ stpref->st_reqcount++;
+
+ /*
+ * If we assigned the request
+ * to another thread, make
+ * sure its awake and continue
+ * reading from the
+ * socket. Otherwise, try to
+ * find some other thread to
+ * read from the socket and
+ * execute the request
+ * immediately.
+ */
+ if (stpref != st) {
+ cv_signal(&stpref->st_cond);
+ continue;
+ } else {
+ break;
+ }
+ }
+ } while (stat == XPRT_MOREREQS
+ && pool->sp_state != SVCPOOL_CLOSING);
+
+ /*
+ * Move this transport to the end of the
+ * active list to ensure fairness when
+ * multiple transports are active. If this was
+ * the last queued request, svc_getreq will
+ * end up calling xprt_inactive to remove from
+ * the active list.
+ */
+ xprt->xp_thread = NULL;
+ st->st_xprt = NULL;
+ if (xprt->xp_active) {
+ xprt_assignthread(xprt);
+ TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
+ TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
+ xp_alink);
+ }
+ mtx_unlock(&pool->sp_lock);
+ SVC_RELEASE(xprt);
+ mtx_lock(&pool->sp_lock);
+ }
+
/*
- * Move this transport to the end to ensure fairness
- * when multiple transports are active. If this was
- * the last queued request, svc_getreq will end up
- * calling xprt_inactive to remove from the active
- * list.
+ * Execute what we have queued.
*/
- TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
- TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
+ while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
+ size_t sz = rqstp->rq_size;
+ mtx_unlock(&pool->sp_lock);
+ svc_executereq(rqstp);
+ mtx_lock(&pool->sp_lock);
+ pool->sp_space_used -= sz;
+ }
+ }
- mtx_unlock(&pool->sp_lock);
- svc_getreq(xprt);
- mtx_lock(&pool->sp_lock);
+ if (st->st_xprt) {
+ xprt = st->st_xprt;
+ st->st_xprt = NULL;
+ SVC_RELEASE(xprt);
+ }
+
+ KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
+ LIST_REMOVE(st, st_link);
+ pool->sp_threadcount--;
+
+ mtx_unlock(&pool->sp_lock);
+
+ cv_destroy(&st->st_cond);
+ mem_free(st, sizeof(*st));
+
+ if (!ismaster)
+ wakeup(pool);
+}
+
+static void
+svc_thread_start(void *arg)
+{
+
+ svc_run_internal((SVCPOOL *) arg, FALSE);
+ kthread_exit();
+}
+
+static void
+svc_new_thread(SVCPOOL *pool)
+{
+ struct thread *td;
+
+ pool->sp_threadcount++;
+ kthread_add(svc_thread_start, pool,
+ pool->sp_proc, &td, 0, 0,
+ "%s: service", pool->sp_name);
+}
+
+void
+svc_run(SVCPOOL *pool)
+{
+ int i;
+ struct proc *p;
+ struct thread *td;
+
+ p = curproc;
+ td = curthread;
+ snprintf(td->td_name, sizeof(td->td_name),
+ "%s: master", pool->sp_name);
+ pool->sp_state = SVCPOOL_ACTIVE;
+ pool->sp_proc = p;
+ pool->sp_lastcreatetime = time_uptime;
+ pool->sp_threadcount = 1;
+
+ for (i = 1; i < pool->sp_minthreads; i++) {
+ svc_new_thread(pool);
}
+ svc_run_internal(pool, TRUE);
+
+ mtx_lock(&pool->sp_lock);
+ while (pool->sp_threadcount > 0)
+ msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
mtx_unlock(&pool->sp_lock);
}
void
svc_exit(SVCPOOL *pool)
{
+ SVCTHREAD *st;
+
mtx_lock(&pool->sp_lock);
- pool->sp_exited = TRUE;
- wakeup(&pool->sp_active);
+
+ pool->sp_state = SVCPOOL_CLOSING;
+ LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
+ cv_signal(&st->st_cond);
+
mtx_unlock(&pool->sp_lock);
}
+
+bool_t
+svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
+{
+ struct mbuf *m;
+ XDR xdrs;
+ bool_t stat;
+
+ m = rqstp->rq_args;
+ rqstp->rq_args = NULL;
+
+ xdrmbuf_create(&xdrs, m, XDR_DECODE);
+ stat = xargs(&xdrs, args);
+ XDR_DESTROY(&xdrs);
+
+ return (stat);
+}
+
+bool_t
+svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
+{
+ XDR xdrs;
+
+ if (rqstp->rq_addr) {
+ free(rqstp->rq_addr, M_SONAME);
+ rqstp->rq_addr = NULL;
+ }
+
+ xdrs.x_op = XDR_FREE;
+ return (xargs(&xdrs, args));
+}
+
+void
+svc_freereq(struct svc_req *rqstp)
+{
+ SVCTHREAD *st;
+ SVCXPRT *xprt;
+ SVCPOOL *pool;
+
+ st = rqstp->rq_thread;
+ xprt = rqstp->rq_xprt;
+ if (xprt)
+ pool = xprt->xp_pool;
+ else
+ pool = NULL;
+ if (st) {
+ mtx_lock(&pool->sp_lock);
+ KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
+ ("Freeing request out of order"));
+ STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
+ st->st_reqcount--;
+ if (pool->sp_done)
+ pool->sp_done(st, rqstp);
+ mtx_unlock(&pool->sp_lock);
+ }
+
+ if (rqstp->rq_auth.svc_ah_ops)
+ SVCAUTH_RELEASE(&rqstp->rq_auth);
+
+ if (rqstp->rq_xprt) {
+ SVC_RELEASE(rqstp->rq_xprt);
+ }
+
+ if (rqstp->rq_addr)
+ free(rqstp->rq_addr, M_SONAME);
+
+ if (rqstp->rq_args)
+ m_freem(rqstp->rq_args);
+
+ free(rqstp, M_RPC);
+}
OpenPOWER on IntegriCloud