summaryrefslogtreecommitdiffstats
path: root/sys/rpc/svc.c
diff options
context:
space:
mode:
authormav <mav@FreeBSD.org>2014-06-22 18:03:53 +0000
committermav <mav@FreeBSD.org>2014-06-22 18:03:53 +0000
commit23b32162cef338290cead8bd5a58c7de5978da64 (patch)
tree85aa30fad6ec700fb429a20dcffa3d20d0099273 /sys/rpc/svc.c
parent1d8eae9d0a71314b260971ff1d920c6685a0b2a4 (diff)
downloadFreeBSD-src-23b32162cef338290cead8bd5a58c7de5978da64.zip
FreeBSD-src-23b32162cef338290cead8bd5a58c7de5978da64.tar.gz
MFC r267228:
Split RPC pool threads into number of smaller semi-isolated groups. Old design with unified thread pool was good from the point of thread utilization. But single pool-wide mutex became huge congestion point for systems with many CPUs. To reduce the congestion create several thread groups within a pool (one group for every 6 CPUs and 12 threads), each group with own mutex. Each connection during its registration is assigned to one of the groups in round-robin fashion. File affinify code may still move requests between the groups, but otherwise groups are self-contained.
Diffstat (limited to 'sys/rpc/svc.c')
-rw-r--r--sys/rpc/svc.c357
1 files changed, 201 insertions, 156 deletions
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c
index 5062a84..95d4c49 100644
--- a/sys/rpc/svc.c
+++ b/sys/rpc/svc.c
@@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$");
#include <sys/queue.h>
#include <sys/socketvar.h>
#include <sys/systm.h>
+#include <sys/smp.h>
#include <sys/sx.h>
#include <sys/ucred.h>
@@ -70,7 +71,7 @@ __FBSDID("$FreeBSD$");
static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
char *);
-static void svc_new_thread(SVCPOOL *pool);
+static void svc_new_thread(SVCGROUP *grp);
static void xprt_unregister_locked(SVCXPRT *xprt);
static void svc_change_space_used(SVCPOOL *pool, int delta);
static bool_t svc_request_space_available(SVCPOOL *pool);
@@ -79,11 +80,14 @@ static bool_t svc_request_space_available(SVCPOOL *pool);
static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
+static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
SVCPOOL*
svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
{
SVCPOOL *pool;
+ SVCGROUP *grp;
+ int g;
pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
@@ -91,15 +95,22 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
pool->sp_name = name;
pool->sp_state = SVCPOOL_INIT;
pool->sp_proc = NULL;
- TAILQ_INIT(&pool->sp_xlist);
- TAILQ_INIT(&pool->sp_active);
TAILQ_INIT(&pool->sp_callouts);
TAILQ_INIT(&pool->sp_lcallouts);
- LIST_INIT(&pool->sp_threads);
- LIST_INIT(&pool->sp_idlethreads);
pool->sp_minthreads = 1;
pool->sp_maxthreads = 1;
- pool->sp_threadcount = 0;
+ pool->sp_groupcount = 1;
+ for (g = 0; g < SVC_MAXGROUPS; g++) {
+ grp = &pool->sp_groups[g];
+ mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
+ grp->sg_pool = pool;
+ grp->sg_state = SVCPOOL_ACTIVE;
+ TAILQ_INIT(&grp->sg_xlist);
+ TAILQ_INIT(&grp->sg_active);
+ LIST_INIT(&grp->sg_idlethreads);
+ grp->sg_minthreads = 1;
+ grp->sg_maxthreads = 1;
+ }
/*
* Don't use more than a quarter of mbuf clusters or more than
@@ -114,12 +125,19 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
if (sysctl_base) {
SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
"minthreads", CTLTYPE_INT | CTLFLAG_RW,
- pool, 0, svcpool_minthread_sysctl, "I", "");
+ pool, 0, svcpool_minthread_sysctl, "I",
+ "Minimal number of threads");
SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
"maxthreads", CTLTYPE_INT | CTLFLAG_RW,
- pool, 0, svcpool_maxthread_sysctl, "I", "");
+ pool, 0, svcpool_maxthread_sysctl, "I",
+ "Maximal number of threads");
+ SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
+ "threads", CTLTYPE_INT | CTLFLAG_RD,
+ pool, 0, svcpool_threads_sysctl, "I",
+ "Current number of threads");
SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
- "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
+ "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
+ "Number of thread groups");
SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
"request_space_used", CTLFLAG_RD,
@@ -158,20 +176,29 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
void
svcpool_destroy(SVCPOOL *pool)
{
+ SVCGROUP *grp;
SVCXPRT *xprt, *nxprt;
struct svc_callout *s;
struct svc_loss_callout *sl;
struct svcxprt_list cleanup;
+ int g;
TAILQ_INIT(&cleanup);
- mtx_lock(&pool->sp_lock);
- while (TAILQ_FIRST(&pool->sp_xlist)) {
- xprt = TAILQ_FIRST(&pool->sp_xlist);
- xprt_unregister_locked(xprt);
- TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
+ for (g = 0; g < SVC_MAXGROUPS; g++) {
+ grp = &pool->sp_groups[g];
+ mtx_lock(&grp->sg_lock);
+ while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
+ xprt_unregister_locked(xprt);
+ TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
+ }
+ mtx_unlock(&grp->sg_lock);
+ }
+ TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
+ SVC_RELEASE(xprt);
}
+ mtx_lock(&pool->sp_lock);
while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
mtx_unlock(&pool->sp_lock);
svc_unreg(pool, s->sc_prog, s->sc_vers);
@@ -184,10 +211,10 @@ svcpool_destroy(SVCPOOL *pool)
}
mtx_unlock(&pool->sp_lock);
- TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
- SVC_RELEASE(xprt);
+ for (g = 0; g < SVC_MAXGROUPS; g++) {
+ grp = &pool->sp_groups[g];
+ mtx_destroy(&grp->sg_lock);
}
-
mtx_destroy(&pool->sp_lock);
if (pool->sp_rcache)
@@ -197,14 +224,23 @@ svcpool_destroy(SVCPOOL *pool)
free(pool, M_RPC);
}
-static bool_t
-svcpool_active(SVCPOOL *pool)
+/*
+ * Sysctl handler to get the present thread count on a pool
+ */
+static int
+svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
{
- enum svcpool_state state = pool->sp_state;
+ SVCPOOL *pool;
+ int threads, error, g;
- if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
- return (FALSE);
- return (TRUE);
+ pool = oidp->oid_arg1;
+ threads = 0;
+ mtx_lock(&pool->sp_lock);
+ for (g = 0; g < pool->sp_groupcount; g++)
+ threads += pool->sp_groups[g].sg_threadcount;
+ mtx_unlock(&pool->sp_lock);
+ error = sysctl_handle_int(oidp, &threads, 0, req);
+ return (error);
}
/*
@@ -214,7 +250,7 @@ static int
svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
{
SVCPOOL *pool;
- int newminthreads, error, n;
+ int newminthreads, error, g;
pool = oidp->oid_arg1;
newminthreads = pool->sp_minthreads;
@@ -223,21 +259,11 @@ svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
if (newminthreads > pool->sp_maxthreads)
return (EINVAL);
mtx_lock(&pool->sp_lock);
- if (newminthreads > pool->sp_minthreads
- && svcpool_active(pool)) {
- /*
- * If the pool is running and we are
- * increasing, create some more threads now.
- */
- n = newminthreads - pool->sp_threadcount;
- if (n > 0) {
- mtx_unlock(&pool->sp_lock);
- while (n--)
- svc_new_thread(pool);
- mtx_lock(&pool->sp_lock);
- }
- }
pool->sp_minthreads = newminthreads;
+ for (g = 0; g < pool->sp_groupcount; g++) {
+ pool->sp_groups[g].sg_minthreads = max(1,
+ pool->sp_minthreads / pool->sp_groupcount);
+ }
mtx_unlock(&pool->sp_lock);
}
return (error);
@@ -250,8 +276,7 @@ static int
svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
{
SVCPOOL *pool;
- SVCTHREAD *st;
- int newmaxthreads, error;
+ int newmaxthreads, error, g;
pool = oidp->oid_arg1;
newmaxthreads = pool->sp_maxthreads;
@@ -260,17 +285,11 @@ svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
if (newmaxthreads < pool->sp_minthreads)
return (EINVAL);
mtx_lock(&pool->sp_lock);
- if (newmaxthreads < pool->sp_maxthreads
- && svcpool_active(pool)) {
- /*
- * If the pool is running and we are
- * decreasing, wake up some idle threads to
- * encourage them to exit.
- */
- LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
- cv_signal(&st->st_cond);
- }
pool->sp_maxthreads = newmaxthreads;
+ for (g = 0; g < pool->sp_groupcount; g++) {
+ pool->sp_groups[g].sg_maxthreads = max(1,
+ pool->sp_maxthreads / pool->sp_groupcount);
+ }
mtx_unlock(&pool->sp_lock);
}
return (error);
@@ -283,13 +302,17 @@ void
xprt_register(SVCXPRT *xprt)
{
SVCPOOL *pool = xprt->xp_pool;
+ SVCGROUP *grp;
+ int g;
SVC_ACQUIRE(xprt);
- mtx_lock(&pool->sp_lock);
+ g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
+ xprt->xp_group = grp = &pool->sp_groups[g];
+ mtx_lock(&grp->sg_lock);
xprt->xp_registered = TRUE;
xprt->xp_active = FALSE;
- TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
- mtx_unlock(&pool->sp_lock);
+ TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
+ mtx_unlock(&grp->sg_lock);
}
/*
@@ -300,29 +323,29 @@ xprt_register(SVCXPRT *xprt)
static void
xprt_unregister_locked(SVCXPRT *xprt)
{
- SVCPOOL *pool = xprt->xp_pool;
+ SVCGROUP *grp = xprt->xp_group;
- mtx_assert(&pool->sp_lock, MA_OWNED);
+ mtx_assert(&grp->sg_lock, MA_OWNED);
KASSERT(xprt->xp_registered == TRUE,
("xprt_unregister_locked: not registered"));
xprt_inactive_locked(xprt);
- TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
+ TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
xprt->xp_registered = FALSE;
}
void
xprt_unregister(SVCXPRT *xprt)
{
- SVCPOOL *pool = xprt->xp_pool;
+ SVCGROUP *grp = xprt->xp_group;
- mtx_lock(&pool->sp_lock);
+ mtx_lock(&grp->sg_lock);
if (xprt->xp_registered == FALSE) {
/* Already unregistered by another thread */
- mtx_unlock(&pool->sp_lock);
+ mtx_unlock(&grp->sg_lock);
return;
}
xprt_unregister_locked(xprt);
- mtx_unlock(&pool->sp_lock);
+ mtx_unlock(&grp->sg_lock);
SVC_RELEASE(xprt);
}
@@ -333,11 +356,11 @@ xprt_unregister(SVCXPRT *xprt)
static int
xprt_assignthread(SVCXPRT *xprt)
{
- SVCPOOL *pool = xprt->xp_pool;
+ SVCGROUP *grp = xprt->xp_group;
SVCTHREAD *st;
- mtx_assert(&pool->sp_lock, MA_OWNED);
- st = LIST_FIRST(&pool->sp_idlethreads);
+ mtx_assert(&grp->sg_lock, MA_OWNED);
+ st = LIST_FIRST(&grp->sg_idlethreads);
if (st) {
LIST_REMOVE(st, st_ilink);
SVC_ACQUIRE(xprt);
@@ -354,10 +377,10 @@ xprt_assignthread(SVCXPRT *xprt)
* from a socket upcall). Don't create more
* than one thread per second.
*/
- if (pool->sp_state == SVCPOOL_ACTIVE
- && pool->sp_lastcreatetime < time_uptime
- && pool->sp_threadcount < pool->sp_maxthreads) {
- pool->sp_state = SVCPOOL_THREADWANTED;
+ if (grp->sg_state == SVCPOOL_ACTIVE
+ && grp->sg_lastcreatetime < time_uptime
+ && grp->sg_threadcount < grp->sg_maxthreads) {
+ grp->sg_state = SVCPOOL_THREADWANTED;
}
}
return (FALSE);
@@ -366,40 +389,40 @@ xprt_assignthread(SVCXPRT *xprt)
void
xprt_active(SVCXPRT *xprt)
{
- SVCPOOL *pool = xprt->xp_pool;
+ SVCGROUP *grp = xprt->xp_group;
- mtx_lock(&pool->sp_lock);
+ mtx_lock(&grp->sg_lock);
if (!xprt->xp_registered) {
/*
* Race with xprt_unregister - we lose.
*/
- mtx_unlock(&pool->sp_lock);
+ mtx_unlock(&grp->sg_lock);
return;
}
if (!xprt->xp_active) {
xprt->xp_active = TRUE;
if (xprt->xp_thread == NULL) {
- if (!svc_request_space_available(pool) ||
+ if (!svc_request_space_available(xprt->xp_pool) ||
!xprt_assignthread(xprt))
- TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
+ TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
xp_alink);
}
}
- mtx_unlock(&pool->sp_lock);
+ mtx_unlock(&grp->sg_lock);
}
void
xprt_inactive_locked(SVCXPRT *xprt)
{
- SVCPOOL *pool = xprt->xp_pool;
+ SVCGROUP *grp = xprt->xp_group;
- mtx_assert(&pool->sp_lock, MA_OWNED);
+ mtx_assert(&grp->sg_lock, MA_OWNED);
if (xprt->xp_active) {
if (xprt->xp_thread == NULL)
- TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
+ TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
xprt->xp_active = FALSE;
}
}
@@ -407,11 +430,11 @@ xprt_inactive_locked(SVCXPRT *xprt)
void
xprt_inactive(SVCXPRT *xprt)
{
- SVCPOOL *pool = xprt->xp_pool;
+ SVCGROUP *grp = xprt->xp_group;
- mtx_lock(&pool->sp_lock);
+ mtx_lock(&grp->sg_lock);
xprt_inactive_locked(xprt);
- mtx_unlock(&pool->sp_lock);
+ mtx_unlock(&grp->sg_lock);
}
/*
@@ -991,14 +1014,14 @@ svc_executereq(struct svc_req *rqstp)
}
static void
-svc_checkidle(SVCPOOL *pool)
+svc_checkidle(SVCGROUP *grp)
{
SVCXPRT *xprt, *nxprt;
time_t timo;
struct svcxprt_list cleanup;
TAILQ_INIT(&cleanup);
- TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
+ TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
/*
* Only some transports have idle timers. Don't time
* something out which is just waking up.
@@ -1013,27 +1036,31 @@ svc_checkidle(SVCPOOL *pool)
}
}
- mtx_unlock(&pool->sp_lock);
+ mtx_unlock(&grp->sg_lock);
TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
SVC_RELEASE(xprt);
}
- mtx_lock(&pool->sp_lock);
-
+ mtx_lock(&grp->sg_lock);
}
static void
svc_assign_waiting_sockets(SVCPOOL *pool)
{
+ SVCGROUP *grp;
SVCXPRT *xprt;
-
- mtx_lock(&pool->sp_lock);
- while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
- if (xprt_assignthread(xprt))
- TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
- else
- break;
+ int g;
+
+ for (g = 0; g < pool->sp_groupcount; g++) {
+ grp = &pool->sp_groups[g];
+ mtx_lock(&grp->sg_lock);
+ while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
+ if (xprt_assignthread(xprt))
+ TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
+ else
+ break;
+ }
+ mtx_unlock(&grp->sg_lock);
}
- mtx_unlock(&pool->sp_lock);
}
static void
@@ -1067,8 +1094,9 @@ svc_request_space_available(SVCPOOL *pool)
}
static void
-svc_run_internal(SVCPOOL *pool, bool_t ismaster)
+svc_run_internal(SVCGROUP *grp, bool_t ismaster)
{
+ SVCPOOL *pool = grp->sg_pool;
SVCTHREAD *st, *stpref;
SVCXPRT *xprt;
enum xprt_stat stat;
@@ -1083,35 +1111,34 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
STAILQ_INIT(&st->st_reqs);
cv_init(&st->st_cond, "rpcsvc");
- mtx_lock(&pool->sp_lock);
- LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
+ mtx_lock(&grp->sg_lock);
/*
* If we are a new thread which was spawned to cope with
* increased load, set the state back to SVCPOOL_ACTIVE.
*/
- if (pool->sp_state == SVCPOOL_THREADSTARTING)
- pool->sp_state = SVCPOOL_ACTIVE;
+ if (grp->sg_state == SVCPOOL_THREADSTARTING)
+ grp->sg_state = SVCPOOL_ACTIVE;
- while (pool->sp_state != SVCPOOL_CLOSING) {
+ while (grp->sg_state != SVCPOOL_CLOSING) {
/*
* Create new thread if requested.
*/
- if (pool->sp_state == SVCPOOL_THREADWANTED) {
- pool->sp_state = SVCPOOL_THREADSTARTING;
- pool->sp_lastcreatetime = time_uptime;
- mtx_unlock(&pool->sp_lock);
- svc_new_thread(pool);
- mtx_lock(&pool->sp_lock);
+ if (grp->sg_state == SVCPOOL_THREADWANTED) {
+ grp->sg_state = SVCPOOL_THREADSTARTING;
+ grp->sg_lastcreatetime = time_uptime;
+ mtx_unlock(&grp->sg_lock);
+ svc_new_thread(grp);
+ mtx_lock(&grp->sg_lock);
continue;
}
/*
* Check for idle transports once per second.
*/
- if (time_uptime > pool->sp_lastidlecheck) {
- pool->sp_lastidlecheck = time_uptime;
- svc_checkidle(pool);
+ if (time_uptime > grp->sg_lastidlecheck) {
+ grp->sg_lastidlecheck = time_uptime;
+ svc_checkidle(grp);
}
xprt = st->st_xprt;
@@ -1119,7 +1146,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
/*
* Enforce maxthreads count.
*/
- if (pool->sp_threadcount > pool->sp_maxthreads)
+ if (grp->sg_threadcount > grp->sg_maxthreads)
break;
/*
@@ -1128,22 +1155,22 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
* by a thread.
*/
if (svc_request_space_available(pool) &&
- (xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
- TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
+ (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
+ TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
SVC_ACQUIRE(xprt);
xprt->xp_thread = st;
st->st_xprt = xprt;
continue;
}
- LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
+ LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
if (ismaster || (!ismaster &&
- pool->sp_threadcount > pool->sp_minthreads))
+ grp->sg_threadcount > grp->sg_minthreads))
error = cv_timedwait_sig(&st->st_cond,
- &pool->sp_lock, 5 * hz);
+ &grp->sg_lock, 5 * hz);
else
error = cv_wait_sig(&st->st_cond,
- &pool->sp_lock);
+ &grp->sg_lock);
if (st->st_xprt == NULL)
LIST_REMOVE(st, st_ilink);
@@ -1152,19 +1179,19 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
*/
if (error == EWOULDBLOCK) {
if (!ismaster
- && (pool->sp_threadcount
- > pool->sp_minthreads)
+ && (grp->sg_threadcount
+ > grp->sg_minthreads)
&& !st->st_xprt)
break;
} else if (error) {
- mtx_unlock(&pool->sp_lock);
+ mtx_unlock(&grp->sg_lock);
svc_exit(pool);
- mtx_lock(&pool->sp_lock);
+ mtx_lock(&grp->sg_lock);
break;
}
continue;
}
- mtx_unlock(&pool->sp_lock);
+ mtx_unlock(&grp->sg_lock);
/*
* Drain the transport socket and queue up any RPCs.
@@ -1196,7 +1223,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
}
}
} while (rqstp == NULL && stat == XPRT_MOREREQS
- && pool->sp_state != SVCPOOL_CLOSING);
+ && grp->sg_state != SVCPOOL_CLOSING);
/*
* Move this transport to the end of the active list to
@@ -1204,16 +1231,16 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
* If this was the last queued request, svc_getreq will end
* up calling xprt_inactive to remove from the active list.
*/
- mtx_lock(&pool->sp_lock);
+ mtx_lock(&grp->sg_lock);
xprt->xp_thread = NULL;
st->st_xprt = NULL;
if (xprt->xp_active) {
if (!svc_request_space_available(pool) ||
!xprt_assignthread(xprt))
- TAILQ_INSERT_TAIL(&pool->sp_active,
+ TAILQ_INSERT_TAIL(&grp->sg_active,
xprt, xp_alink);
}
- mtx_unlock(&pool->sp_lock);
+ mtx_unlock(&grp->sg_lock);
SVC_RELEASE(xprt);
/*
@@ -1230,7 +1257,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
}
mtx_unlock(&st->st_lock);
svc_change_space_used(pool, -sz);
- mtx_lock(&pool->sp_lock);
+ mtx_lock(&grp->sg_lock);
}
if (st->st_xprt) {
@@ -1238,46 +1265,43 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
st->st_xprt = NULL;
SVC_RELEASE(xprt);
}
-
KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
- LIST_REMOVE(st, st_link);
- pool->sp_threadcount--;
-
- mtx_unlock(&pool->sp_lock);
-
mtx_destroy(&st->st_lock);
cv_destroy(&st->st_cond);
mem_free(st, sizeof(*st));
+ grp->sg_threadcount--;
if (!ismaster)
- wakeup(pool);
+ wakeup(grp);
+ mtx_unlock(&grp->sg_lock);
}
static void
svc_thread_start(void *arg)
{
- svc_run_internal((SVCPOOL *) arg, FALSE);
+ svc_run_internal((SVCGROUP *) arg, FALSE);
kthread_exit();
}
static void
-svc_new_thread(SVCPOOL *pool)
+svc_new_thread(SVCGROUP *grp)
{
+ SVCPOOL *pool = grp->sg_pool;
struct thread *td;
- pool->sp_threadcount++;
- kthread_add(svc_thread_start, pool,
- pool->sp_proc, &td, 0, 0,
+ grp->sg_threadcount++;
+ kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
"%s: service", pool->sp_name);
}
void
svc_run(SVCPOOL *pool)
{
- int i;
+ int g, i;
struct proc *p;
struct thread *td;
+ SVCGROUP *grp;
p = curproc;
td = curthread;
@@ -1285,35 +1309,56 @@ svc_run(SVCPOOL *pool)
"%s: master", pool->sp_name);
pool->sp_state = SVCPOOL_ACTIVE;
pool->sp_proc = p;
- pool->sp_lastcreatetime = time_uptime;
- pool->sp_threadcount = 1;
- for (i = 1; i < pool->sp_minthreads; i++) {
- svc_new_thread(pool);
+ /* Choose group count based on number of threads and CPUs. */
+ pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
+ min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
+ for (g = 0; g < pool->sp_groupcount; g++) {
+ grp = &pool->sp_groups[g];
+ grp->sg_minthreads = max(1,
+ pool->sp_minthreads / pool->sp_groupcount);
+ grp->sg_maxthreads = max(1,
+ pool->sp_maxthreads / pool->sp_groupcount);
+ grp->sg_lastcreatetime = time_uptime;
}
- svc_run_internal(pool, TRUE);
-
- mtx_lock(&pool->sp_lock);
- while (pool->sp_threadcount > 0)
- msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
- mtx_unlock(&pool->sp_lock);
+ /* Starting threads */
+ for (g = 0; g < pool->sp_groupcount; g++) {
+ grp = &pool->sp_groups[g];
+ for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
+ svc_new_thread(grp);
+ }
+ pool->sp_groups[0].sg_threadcount++;
+ svc_run_internal(&pool->sp_groups[0], TRUE);
+
+ /* Waiting for threads to stop. */
+ for (g = 0; g < pool->sp_groupcount; g++) {
+ grp = &pool->sp_groups[g];
+ mtx_lock(&grp->sg_lock);
+ while (grp->sg_threadcount > 0)
+ msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
+ mtx_unlock(&grp->sg_lock);
+ }
}
void
svc_exit(SVCPOOL *pool)
{
+ SVCGROUP *grp;
SVCTHREAD *st;
-
- mtx_lock(&pool->sp_lock);
-
- if (pool->sp_state != SVCPOOL_CLOSING) {
- pool->sp_state = SVCPOOL_CLOSING;
- LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
- cv_signal(&st->st_cond);
+ int g;
+
+ pool->sp_state = SVCPOOL_CLOSING;
+ for (g = 0; g < pool->sp_groupcount; g++) {
+ grp = &pool->sp_groups[g];
+ mtx_lock(&grp->sg_lock);
+ if (grp->sg_state != SVCPOOL_CLOSING) {
+ grp->sg_state = SVCPOOL_CLOSING;
+ LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
+ cv_signal(&st->st_cond);
+ }
+ mtx_unlock(&grp->sg_lock);
}
-
- mtx_unlock(&pool->sp_lock);
}
bool_t
OpenPOWER on IntegriCloud