diff options
-rw-r--r-- | sys/rpc/svc.c | 357 | ||||
-rw-r--r-- | sys/rpc/svc.h | 55 | ||||
-rw-r--r-- | sys/rpc/svc_generic.c | 20 |
3 files changed, 250 insertions, 182 deletions
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c index b7c8c57..b682b39 100644 --- a/sys/rpc/svc.c +++ b/sys/rpc/svc.c @@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$"); #include <sys/queue.h> #include <sys/socketvar.h> #include <sys/systm.h> +#include <sys/smp.h> #include <sys/sx.h> #include <sys/ucred.h> @@ -70,7 +71,7 @@ __FBSDID("$FreeBSD$"); static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, char *); -static void svc_new_thread(SVCPOOL *pool); +static void svc_new_thread(SVCGROUP *grp); static void xprt_unregister_locked(SVCXPRT *xprt); static void svc_change_space_used(SVCPOOL *pool, int delta); static bool_t svc_request_space_available(SVCPOOL *pool); @@ -79,11 +80,14 @@ static bool_t svc_request_space_available(SVCPOOL *pool); static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); +static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); SVCPOOL* svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) { SVCPOOL *pool; + SVCGROUP *grp; + int g; pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); @@ -91,15 +95,22 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) pool->sp_name = name; pool->sp_state = SVCPOOL_INIT; pool->sp_proc = NULL; - TAILQ_INIT(&pool->sp_xlist); - TAILQ_INIT(&pool->sp_active); TAILQ_INIT(&pool->sp_callouts); TAILQ_INIT(&pool->sp_lcallouts); - LIST_INIT(&pool->sp_threads); - LIST_INIT(&pool->sp_idlethreads); pool->sp_minthreads = 1; pool->sp_maxthreads = 1; - pool->sp_threadcount = 0; + pool->sp_groupcount = 1; + for (g = 0; g < SVC_MAXGROUPS; g++) { + grp = &pool->sp_groups[g]; + mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); + grp->sg_pool = pool; + grp->sg_state = SVCPOOL_ACTIVE; + TAILQ_INIT(&grp->sg_xlist); + TAILQ_INIT(&grp->sg_active); + LIST_INIT(&grp->sg_idlethreads); + grp->sg_minthreads = 1; + grp->sg_maxthreads = 1; + } /* * Don't use more than a quarter of mbuf clusters or more than @@ -114,12 +125,19 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) if (sysctl_base) { SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, "minthreads", CTLTYPE_INT | CTLFLAG_RW, - pool, 0, svcpool_minthread_sysctl, "I", ""); + pool, 0, svcpool_minthread_sysctl, "I", + "Minimal number of threads"); SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, "maxthreads", CTLTYPE_INT | CTLFLAG_RW, - pool, 0, svcpool_maxthread_sysctl, "I", ""); + pool, 0, svcpool_maxthread_sysctl, "I", + "Maximal number of threads"); + SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, + "threads", CTLTYPE_INT | CTLFLAG_RD, + pool, 0, svcpool_threads_sysctl, "I", + "Current number of threads"); SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, - "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, ""); + "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, + "Number of thread groups"); SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, "request_space_used", CTLFLAG_RD, @@ -158,20 +176,29 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) void svcpool_destroy(SVCPOOL *pool) { + SVCGROUP *grp; SVCXPRT *xprt, *nxprt; struct svc_callout *s; struct svc_loss_callout *sl; struct svcxprt_list cleanup; + int g; TAILQ_INIT(&cleanup); - mtx_lock(&pool->sp_lock); - while (TAILQ_FIRST(&pool->sp_xlist)) { - xprt = TAILQ_FIRST(&pool->sp_xlist); - xprt_unregister_locked(xprt); - TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); + for (g = 0; g < SVC_MAXGROUPS; g++) { + grp = &pool->sp_groups[g]; + mtx_lock(&grp->sg_lock); + while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { + xprt_unregister_locked(xprt); + TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); + } + mtx_unlock(&grp->sg_lock); + } + TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { + SVC_RELEASE(xprt); } + mtx_lock(&pool->sp_lock); while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { mtx_unlock(&pool->sp_lock); svc_unreg(pool, s->sc_prog, s->sc_vers); @@ -184,10 +211,10 @@ svcpool_destroy(SVCPOOL *pool) } mtx_unlock(&pool->sp_lock); - TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { - SVC_RELEASE(xprt); + for (g = 0; g < SVC_MAXGROUPS; g++) { + grp = &pool->sp_groups[g]; + mtx_destroy(&grp->sg_lock); } - mtx_destroy(&pool->sp_lock); if (pool->sp_rcache) @@ -197,14 +224,23 @@ svcpool_destroy(SVCPOOL *pool) free(pool, M_RPC); } -static bool_t -svcpool_active(SVCPOOL *pool) +/* + * Sysctl handler to get the present thread count on a pool + */ +static int +svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) { - enum svcpool_state state = pool->sp_state; + SVCPOOL *pool; + int threads, error, g; - if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING) - return (FALSE); - return (TRUE); + pool = oidp->oid_arg1; + threads = 0; + mtx_lock(&pool->sp_lock); + for (g = 0; g < pool->sp_groupcount; g++) + threads += pool->sp_groups[g].sg_threadcount; + mtx_unlock(&pool->sp_lock); + error = sysctl_handle_int(oidp, &threads, 0, req); + return (error); } /* @@ -214,7 +250,7 @@ static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) { SVCPOOL *pool; - int newminthreads, error, n; + int newminthreads, error, g; pool = oidp->oid_arg1; newminthreads = pool->sp_minthreads; @@ -223,21 +259,11 @@ svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) if (newminthreads > pool->sp_maxthreads) return (EINVAL); mtx_lock(&pool->sp_lock); - if (newminthreads > pool->sp_minthreads - && svcpool_active(pool)) { - /* - * If the pool is running and we are - * increasing, create some more threads now. - */ - n = newminthreads - pool->sp_threadcount; - if (n > 0) { - mtx_unlock(&pool->sp_lock); - while (n--) - svc_new_thread(pool); - mtx_lock(&pool->sp_lock); - } - } pool->sp_minthreads = newminthreads; + for (g = 0; g < pool->sp_groupcount; g++) { + pool->sp_groups[g].sg_minthreads = max(1, + pool->sp_minthreads / pool->sp_groupcount); + } mtx_unlock(&pool->sp_lock); } return (error); @@ -250,8 +276,7 @@ static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) { SVCPOOL *pool; - SVCTHREAD *st; - int newmaxthreads, error; + int newmaxthreads, error, g; pool = oidp->oid_arg1; newmaxthreads = pool->sp_maxthreads; @@ -260,17 +285,11 @@ svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) if (newmaxthreads < pool->sp_minthreads) return (EINVAL); mtx_lock(&pool->sp_lock); - if (newmaxthreads < pool->sp_maxthreads - && svcpool_active(pool)) { - /* - * If the pool is running and we are - * decreasing, wake up some idle threads to - * encourage them to exit. - */ - LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) - cv_signal(&st->st_cond); - } pool->sp_maxthreads = newmaxthreads; + for (g = 0; g < pool->sp_groupcount; g++) { + pool->sp_groups[g].sg_maxthreads = max(1, + pool->sp_maxthreads / pool->sp_groupcount); + } mtx_unlock(&pool->sp_lock); } return (error); @@ -283,13 +302,17 @@ void xprt_register(SVCXPRT *xprt) { SVCPOOL *pool = xprt->xp_pool; + SVCGROUP *grp; + int g; SVC_ACQUIRE(xprt); - mtx_lock(&pool->sp_lock); + g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; + xprt->xp_group = grp = &pool->sp_groups[g]; + mtx_lock(&grp->sg_lock); xprt->xp_registered = TRUE; xprt->xp_active = FALSE; - TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link); - mtx_unlock(&pool->sp_lock); + TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); + mtx_unlock(&grp->sg_lock); } /* @@ -300,29 +323,29 @@ xprt_register(SVCXPRT *xprt) static void xprt_unregister_locked(SVCXPRT *xprt) { - SVCPOOL *pool = xprt->xp_pool; + SVCGROUP *grp = xprt->xp_group; - mtx_assert(&pool->sp_lock, MA_OWNED); + mtx_assert(&grp->sg_lock, MA_OWNED); KASSERT(xprt->xp_registered == TRUE, ("xprt_unregister_locked: not registered")); xprt_inactive_locked(xprt); - TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link); + TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); xprt->xp_registered = FALSE; } void xprt_unregister(SVCXPRT *xprt) { - SVCPOOL *pool = xprt->xp_pool; + SVCGROUP *grp = xprt->xp_group; - mtx_lock(&pool->sp_lock); + mtx_lock(&grp->sg_lock); if (xprt->xp_registered == FALSE) { /* Already unregistered by another thread */ - mtx_unlock(&pool->sp_lock); + mtx_unlock(&grp->sg_lock); return; } xprt_unregister_locked(xprt); - mtx_unlock(&pool->sp_lock); + mtx_unlock(&grp->sg_lock); SVC_RELEASE(xprt); } @@ -333,11 +356,11 @@ xprt_unregister(SVCXPRT *xprt) static int xprt_assignthread(SVCXPRT *xprt) { - SVCPOOL *pool = xprt->xp_pool; + SVCGROUP *grp = xprt->xp_group; SVCTHREAD *st; - mtx_assert(&pool->sp_lock, MA_OWNED); - st = LIST_FIRST(&pool->sp_idlethreads); + mtx_assert(&grp->sg_lock, MA_OWNED); + st = LIST_FIRST(&grp->sg_idlethreads); if (st) { LIST_REMOVE(st, st_ilink); SVC_ACQUIRE(xprt); @@ -354,10 +377,10 @@ xprt_assignthread(SVCXPRT *xprt) * from a socket upcall). Don't create more * than one thread per second. */ - if (pool->sp_state == SVCPOOL_ACTIVE - && pool->sp_lastcreatetime < time_uptime - && pool->sp_threadcount < pool->sp_maxthreads) { - pool->sp_state = SVCPOOL_THREADWANTED; + if (grp->sg_state == SVCPOOL_ACTIVE + && grp->sg_lastcreatetime < time_uptime + && grp->sg_threadcount < grp->sg_maxthreads) { + grp->sg_state = SVCPOOL_THREADWANTED; } } return (FALSE); @@ -366,40 +389,40 @@ xprt_assignthread(SVCXPRT *xprt) void xprt_active(SVCXPRT *xprt) { - SVCPOOL *pool = xprt->xp_pool; + SVCGROUP *grp = xprt->xp_group; - mtx_lock(&pool->sp_lock); + mtx_lock(&grp->sg_lock); if (!xprt->xp_registered) { /* * Race with xprt_unregister - we lose. */ - mtx_unlock(&pool->sp_lock); + mtx_unlock(&grp->sg_lock); return; } if (!xprt->xp_active) { xprt->xp_active = TRUE; if (xprt->xp_thread == NULL) { - if (!svc_request_space_available(pool) || + if (!svc_request_space_available(xprt->xp_pool) || !xprt_assignthread(xprt)) - TAILQ_INSERT_TAIL(&pool->sp_active, xprt, + TAILQ_INSERT_TAIL(&grp->sg_active, xprt, xp_alink); } } - mtx_unlock(&pool->sp_lock); + mtx_unlock(&grp->sg_lock); } void xprt_inactive_locked(SVCXPRT *xprt) { - SVCPOOL *pool = xprt->xp_pool; + SVCGROUP *grp = xprt->xp_group; - mtx_assert(&pool->sp_lock, MA_OWNED); + mtx_assert(&grp->sg_lock, MA_OWNED); if (xprt->xp_active) { if (xprt->xp_thread == NULL) - TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); + TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); xprt->xp_active = FALSE; } } @@ -407,11 +430,11 @@ xprt_inactive_locked(SVCXPRT *xprt) void xprt_inactive(SVCXPRT *xprt) { - SVCPOOL *pool = xprt->xp_pool; + SVCGROUP *grp = xprt->xp_group; - mtx_lock(&pool->sp_lock); + mtx_lock(&grp->sg_lock); xprt_inactive_locked(xprt); - mtx_unlock(&pool->sp_lock); + mtx_unlock(&grp->sg_lock); } /* @@ -991,14 +1014,14 @@ svc_executereq(struct svc_req *rqstp) } static void -svc_checkidle(SVCPOOL *pool) +svc_checkidle(SVCGROUP *grp) { SVCXPRT *xprt, *nxprt; time_t timo; struct svcxprt_list cleanup; TAILQ_INIT(&cleanup); - TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) { + TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { /* * Only some transports have idle timers. Don't time * something out which is just waking up. @@ -1013,27 +1036,31 @@ svc_checkidle(SVCPOOL *pool) } } - mtx_unlock(&pool->sp_lock); + mtx_unlock(&grp->sg_lock); TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { SVC_RELEASE(xprt); } - mtx_lock(&pool->sp_lock); - + mtx_lock(&grp->sg_lock); } static void svc_assign_waiting_sockets(SVCPOOL *pool) { + SVCGROUP *grp; SVCXPRT *xprt; - - mtx_lock(&pool->sp_lock); - while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) { - if (xprt_assignthread(xprt)) - TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); - else - break; + int g; + + for (g = 0; g < pool->sp_groupcount; g++) { + grp = &pool->sp_groups[g]; + mtx_lock(&grp->sg_lock); + while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { + if (xprt_assignthread(xprt)) + TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); + else + break; + } + mtx_unlock(&grp->sg_lock); } - mtx_unlock(&pool->sp_lock); } static void @@ -1067,8 +1094,9 @@ svc_request_space_available(SVCPOOL *pool) } static void -svc_run_internal(SVCPOOL *pool, bool_t ismaster) +svc_run_internal(SVCGROUP *grp, bool_t ismaster) { + SVCPOOL *pool = grp->sg_pool; SVCTHREAD *st, *stpref; SVCXPRT *xprt; enum xprt_stat stat; @@ -1083,35 +1111,34 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) STAILQ_INIT(&st->st_reqs); cv_init(&st->st_cond, "rpcsvc"); - mtx_lock(&pool->sp_lock); - LIST_INSERT_HEAD(&pool->sp_threads, st, st_link); + mtx_lock(&grp->sg_lock); /* * If we are a new thread which was spawned to cope with * increased load, set the state back to SVCPOOL_ACTIVE. */ - if (pool->sp_state == SVCPOOL_THREADSTARTING) - pool->sp_state = SVCPOOL_ACTIVE; + if (grp->sg_state == SVCPOOL_THREADSTARTING) + grp->sg_state = SVCPOOL_ACTIVE; - while (pool->sp_state != SVCPOOL_CLOSING) { + while (grp->sg_state != SVCPOOL_CLOSING) { /* * Create new thread if requested. */ - if (pool->sp_state == SVCPOOL_THREADWANTED) { - pool->sp_state = SVCPOOL_THREADSTARTING; - pool->sp_lastcreatetime = time_uptime; - mtx_unlock(&pool->sp_lock); - svc_new_thread(pool); - mtx_lock(&pool->sp_lock); + if (grp->sg_state == SVCPOOL_THREADWANTED) { + grp->sg_state = SVCPOOL_THREADSTARTING; + grp->sg_lastcreatetime = time_uptime; + mtx_unlock(&grp->sg_lock); + svc_new_thread(grp); + mtx_lock(&grp->sg_lock); continue; } /* * Check for idle transports once per second. */ - if (time_uptime > pool->sp_lastidlecheck) { - pool->sp_lastidlecheck = time_uptime; - svc_checkidle(pool); + if (time_uptime > grp->sg_lastidlecheck) { + grp->sg_lastidlecheck = time_uptime; + svc_checkidle(grp); } xprt = st->st_xprt; @@ -1119,7 +1146,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) /* * Enforce maxthreads count. */ - if (pool->sp_threadcount > pool->sp_maxthreads) + if (grp->sg_threadcount > grp->sg_maxthreads) break; /* @@ -1128,22 +1155,22 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) * by a thread. */ if (svc_request_space_available(pool) && - (xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) { - TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); + (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { + TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); SVC_ACQUIRE(xprt); xprt->xp_thread = st; st->st_xprt = xprt; continue; } - LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink); + LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); if (ismaster || (!ismaster && - pool->sp_threadcount > pool->sp_minthreads)) + grp->sg_threadcount > grp->sg_minthreads)) error = cv_timedwait_sig(&st->st_cond, - &pool->sp_lock, 5 * hz); + &grp->sg_lock, 5 * hz); else error = cv_wait_sig(&st->st_cond, - &pool->sp_lock); + &grp->sg_lock); if (st->st_xprt == NULL) LIST_REMOVE(st, st_ilink); @@ -1152,19 +1179,19 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) */ if (error == EWOULDBLOCK) { if (!ismaster - && (pool->sp_threadcount - > pool->sp_minthreads) + && (grp->sg_threadcount + > grp->sg_minthreads) && !st->st_xprt) break; } else if (error) { - mtx_unlock(&pool->sp_lock); + mtx_unlock(&grp->sg_lock); svc_exit(pool); - mtx_lock(&pool->sp_lock); + mtx_lock(&grp->sg_lock); break; } continue; } - mtx_unlock(&pool->sp_lock); + mtx_unlock(&grp->sg_lock); /* * Drain the transport socket and queue up any RPCs. @@ -1194,7 +1221,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) rqstp, rq_link); } } while (rqstp == NULL && stat == XPRT_MOREREQS - && pool->sp_state != SVCPOOL_CLOSING); + && grp->sg_state != SVCPOOL_CLOSING); /* * Move this transport to the end of the active list to @@ -1202,16 +1229,16 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) * If this was the last queued request, svc_getreq will end * up calling xprt_inactive to remove from the active list. */ - mtx_lock(&pool->sp_lock); + mtx_lock(&grp->sg_lock); xprt->xp_thread = NULL; st->st_xprt = NULL; if (xprt->xp_active) { if (!svc_request_space_available(pool) || !xprt_assignthread(xprt)) - TAILQ_INSERT_TAIL(&pool->sp_active, + TAILQ_INSERT_TAIL(&grp->sg_active, xprt, xp_alink); } - mtx_unlock(&pool->sp_lock); + mtx_unlock(&grp->sg_lock); SVC_RELEASE(xprt); /* @@ -1228,7 +1255,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) } mtx_unlock(&st->st_lock); svc_change_space_used(pool, -sz); - mtx_lock(&pool->sp_lock); + mtx_lock(&grp->sg_lock); } if (st->st_xprt) { @@ -1236,46 +1263,43 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) st->st_xprt = NULL; SVC_RELEASE(xprt); } - KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); - LIST_REMOVE(st, st_link); - pool->sp_threadcount--; - - mtx_unlock(&pool->sp_lock); - mtx_destroy(&st->st_lock); cv_destroy(&st->st_cond); mem_free(st, sizeof(*st)); + grp->sg_threadcount--; if (!ismaster) - wakeup(pool); + wakeup(grp); + mtx_unlock(&grp->sg_lock); } static void svc_thread_start(void *arg) { - svc_run_internal((SVCPOOL *) arg, FALSE); + svc_run_internal((SVCGROUP *) arg, FALSE); kthread_exit(); } static void -svc_new_thread(SVCPOOL *pool) +svc_new_thread(SVCGROUP *grp) { + SVCPOOL *pool = grp->sg_pool; struct thread *td; - pool->sp_threadcount++; - kthread_add(svc_thread_start, pool, - pool->sp_proc, &td, 0, 0, + grp->sg_threadcount++; + kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, "%s: service", pool->sp_name); } void svc_run(SVCPOOL *pool) { - int i; + int g, i; struct proc *p; struct thread *td; + SVCGROUP *grp; p = curproc; td = curthread; @@ -1283,35 +1307,56 @@ svc_run(SVCPOOL *pool) "%s: master", pool->sp_name); pool->sp_state = SVCPOOL_ACTIVE; pool->sp_proc = p; - pool->sp_lastcreatetime = time_uptime; - pool->sp_threadcount = 1; - for (i = 1; i < pool->sp_minthreads; i++) { - svc_new_thread(pool); + /* Choose group count based on number of threads and CPUs. */ + pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, + min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); + for (g = 0; g < pool->sp_groupcount; g++) { + grp = &pool->sp_groups[g]; + grp->sg_minthreads = max(1, + pool->sp_minthreads / pool->sp_groupcount); + grp->sg_maxthreads = max(1, + pool->sp_maxthreads / pool->sp_groupcount); + grp->sg_lastcreatetime = time_uptime; } - svc_run_internal(pool, TRUE); - - mtx_lock(&pool->sp_lock); - while (pool->sp_threadcount > 0) - msleep(pool, &pool->sp_lock, 0, "svcexit", 0); - mtx_unlock(&pool->sp_lock); + /* Starting threads */ + for (g = 0; g < pool->sp_groupcount; g++) { + grp = &pool->sp_groups[g]; + for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) + svc_new_thread(grp); + } + pool->sp_groups[0].sg_threadcount++; + svc_run_internal(&pool->sp_groups[0], TRUE); + + /* Waiting for threads to stop. */ + for (g = 0; g < pool->sp_groupcount; g++) { + grp = &pool->sp_groups[g]; + mtx_lock(&grp->sg_lock); + while (grp->sg_threadcount > 0) + msleep(grp, &grp->sg_lock, 0, "svcexit", 0); + mtx_unlock(&grp->sg_lock); + } } void svc_exit(SVCPOOL *pool) { + SVCGROUP *grp; SVCTHREAD *st; - - mtx_lock(&pool->sp_lock); - - if (pool->sp_state != SVCPOOL_CLOSING) { - pool->sp_state = SVCPOOL_CLOSING; - LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) - cv_signal(&st->st_cond); + int g; + + pool->sp_state = SVCPOOL_CLOSING; + for (g = 0; g < pool->sp_groupcount; g++) { + grp = &pool->sp_groups[g]; + mtx_lock(&grp->sg_lock); + if (grp->sg_state != SVCPOOL_CLOSING) { + grp->sg_state = SVCPOOL_CLOSING; + LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) + cv_signal(&st->st_cond); + } + mtx_unlock(&grp->sg_lock); } - - mtx_unlock(&pool->sp_lock); } bool_t diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h index d16cf30..4f2c853 100644 --- a/sys/rpc/svc.h +++ b/sys/rpc/svc.h @@ -137,6 +137,7 @@ struct xp_ops2 { #ifdef _KERNEL struct __rpc_svcpool; +struct __rpc_svcgroup; struct __rpc_svcthread; #endif @@ -150,6 +151,7 @@ typedef struct __rpc_svcxprt { volatile u_int xp_refs; struct sx xp_lock; struct __rpc_svcpool *xp_pool; /* owning pool (see below) */ + struct __rpc_svcgroup *xp_group; /* owning group (see below) */ TAILQ_ENTRY(__rpc_svcxprt) xp_link; TAILQ_ENTRY(__rpc_svcxprt) xp_alink; bool_t xp_registered; /* xprt_register has been called */ @@ -245,8 +247,6 @@ struct svc_loss_callout { }; TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout); -struct __rpc_svcthread; - /* * Service request */ @@ -296,7 +296,6 @@ typedef struct __rpc_svcthread { SVCXPRT *st_xprt; /* transport we are processing */ struct svc_reqlist st_reqs; /* RPC requests to execute */ struct cv st_cond; /* sleeping for work */ - LIST_ENTRY(__rpc_svcthread) st_link; /* all threads list */ LIST_ENTRY(__rpc_svcthread) st_ilink; /* idle threads list */ LIST_ENTRY(__rpc_svcthread) st_alink; /* application thread list */ int st_p2; /* application workspace */ @@ -305,6 +304,36 @@ typedef struct __rpc_svcthread { LIST_HEAD(svcthread_list, __rpc_svcthread); /* + * A thread group contain all information needed to assign subset of + * transports to subset of threads. On systems with many CPUs and many + * threads that allows to reduce lock congestion and improve performance. + * Hundreds of threads on dozens of CPUs sharing the single pool lock do + * not scale well otherwise. + */ +TAILQ_HEAD(svcxprt_list, __rpc_svcxprt); +enum svcpool_state { + SVCPOOL_INIT, /* svc_run not called yet */ + SVCPOOL_ACTIVE, /* normal running state */ + SVCPOOL_THREADWANTED, /* new service thread requested */ + SVCPOOL_THREADSTARTING, /* new service thread started */ + SVCPOOL_CLOSING /* svc_exit called */ +}; +typedef struct __rpc_svcgroup { + struct mtx_padalign sg_lock; /* protect the thread/req lists */ + struct __rpc_svcpool *sg_pool; + enum svcpool_state sg_state; /* current pool state */ + struct svcxprt_list sg_xlist; /* all transports in the group */ + struct svcxprt_list sg_active; /* transports needing service */ + struct svcthread_list sg_idlethreads; /* idle service threads */ + + int sg_minthreads; /* minimum service thread count */ + int sg_maxthreads; /* maximum service thread count */ + int sg_threadcount; /* current service thread count */ + time_t sg_lastcreatetime; /* when we last started a thread */ + time_t sg_lastidlecheck; /* when we last checked idle transports */ +} SVCGROUP; + +/* * In the kernel, we can't use global variables to store lists of * transports etc. since otherwise we could not have two unrelated RPC * services running, each on its own thread. We solve this by @@ -316,32 +345,18 @@ LIST_HEAD(svcthread_list, __rpc_svcthread); * this to support something similar to the Solaris multi-threaded RPC * server. */ -TAILQ_HEAD(svcxprt_list, __rpc_svcxprt); -enum svcpool_state { - SVCPOOL_INIT, /* svc_run not called yet */ - SVCPOOL_ACTIVE, /* normal running state */ - SVCPOOL_THREADWANTED, /* new service thread requested */ - SVCPOOL_THREADSTARTING, /* new service thread started */ - SVCPOOL_CLOSING /* svc_exit called */ -}; typedef SVCTHREAD *pool_assign_fn(SVCTHREAD *, struct svc_req *); typedef void pool_done_fn(SVCTHREAD *, struct svc_req *); +#define SVC_MAXGROUPS 16 typedef struct __rpc_svcpool { struct mtx_padalign sp_lock; /* protect the transport lists */ const char *sp_name; /* pool name (e.g. "nfsd", "NLM" */ enum svcpool_state sp_state; /* current pool state */ struct proc *sp_proc; /* process which is in svc_run */ - struct svcxprt_list sp_xlist; /* all transports in the pool */ - struct svcxprt_list sp_active; /* transports needing service */ struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */ struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */ - struct svcthread_list sp_threads; /* service threads */ - struct svcthread_list sp_idlethreads; /* idle service threads */ int sp_minthreads; /* minimum service thread count */ int sp_maxthreads; /* maximum service thread count */ - int sp_threadcount; /* current service thread count */ - time_t sp_lastcreatetime; /* when we last started a thread */ - time_t sp_lastidlecheck; /* when we last checked idle transports */ /* * Hooks to allow an application to control request to thread @@ -364,6 +379,10 @@ typedef struct __rpc_svcpool { struct replay_cache *sp_rcache; /* optional replay cache */ struct sysctl_ctx_list sp_sysctl; + + int sp_groupcount; /* Number of groups in the pool. */ + int sp_nextgroup; /* Next group to assign port. */ + SVCGROUP sp_groups[SVC_MAXGROUPS]; /* Thread/port groups. */ } SVCPOOL; #else diff --git a/sys/rpc/svc_generic.c b/sys/rpc/svc_generic.c index b0c72da..59e36c2 100644 --- a/sys/rpc/svc_generic.c +++ b/sys/rpc/svc_generic.c @@ -86,7 +86,8 @@ svc_create( rpcvers_t versnum, /* Version number */ const char *nettype) /* Networktype token */ { - int num = 0; + int g, num = 0; + SVCGROUP *grp; SVCXPRT *xprt; struct netconfig *nconf; void *handle; @@ -96,11 +97,14 @@ svc_create( return (0); } while ((nconf = __rpc_getconf(handle)) != NULL) { - mtx_lock(&pool->sp_lock); - TAILQ_FOREACH(xprt, &pool->sp_xlist, xp_link) { - if (strcmp(xprt->xp_netid, nconf->nc_netid) == 0) { + for (g = 0; g < SVC_MAXGROUPS; g++) { + grp = &pool->sp_groups[g]; + mtx_lock(&grp->sg_lock); + TAILQ_FOREACH(xprt, &grp->sg_xlist, xp_link) { + if (strcmp(xprt->xp_netid, nconf->nc_netid)) + continue; /* Found an old one, use it */ - mtx_unlock(&pool->sp_lock); + mtx_unlock(&grp->sg_lock); (void) rpcb_unset(prognum, versnum, nconf); if (svc_reg(xprt, prognum, versnum, dispatch, nconf) == FALSE) { @@ -108,15 +112,15 @@ svc_create( "svc_create: could not register prog %u vers %u on %s\n", (unsigned)prognum, (unsigned)versnum, nconf->nc_netid); - mtx_lock(&pool->sp_lock); + mtx_lock(&grp->sg_lock); } else { num++; - mtx_lock(&pool->sp_lock); + mtx_lock(&grp->sg_lock); break; } } + mtx_unlock(&grp->sg_lock); } - mtx_unlock(&pool->sp_lock); if (xprt == NULL) { /* It was not found. Now create a new one */ xprt = svc_tp_create(pool, dispatch, prognum, versnum, |