summaryrefslogtreecommitdiffstats
path: root/sys/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'sys/rpc')
-rw-r--r--sys/rpc/svc.c86
-rw-r--r--sys/rpc/svc.h1
2 files changed, 46 insertions, 41 deletions
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c
index 5b48086..96d011a 100644
--- a/sys/rpc/svc.c
+++ b/sys/rpc/svc.c
@@ -293,12 +293,10 @@ xprt_unregister_locked(SVCXPRT *xprt)
{
SVCPOOL *pool = xprt->xp_pool;
+ mtx_assert(&pool->sp_lock, MA_OWNED);
KASSERT(xprt->xp_registered == TRUE,
("xprt_unregister_locked: not registered"));
- if (xprt->xp_active) {
- TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
- xprt->xp_active = FALSE;
- }
+ xprt_inactive_locked(xprt);
TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
xprt->xp_registered = FALSE;
}
@@ -320,25 +318,25 @@ xprt_unregister(SVCXPRT *xprt)
SVC_RELEASE(xprt);
}
-static void
+/*
+ * Attempt to assign a service thread to this transport.
+ */
+static int
xprt_assignthread(SVCXPRT *xprt)
{
SVCPOOL *pool = xprt->xp_pool;
SVCTHREAD *st;
- /*
- * Attempt to assign a service thread to this
- * transport.
- */
- LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
- if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
- break;
- }
+ mtx_assert(&pool->sp_lock, MA_OWNED);
+ st = LIST_FIRST(&pool->sp_idlethreads);
if (st) {
+ LIST_REMOVE(st, st_ilink);
+ st->st_idle = FALSE;
SVC_ACQUIRE(xprt);
xprt->xp_thread = st;
st->st_xprt = xprt;
cv_signal(&st->st_cond);
+ return (TRUE);
} else {
/*
* See if we can create a new thread. The
@@ -354,6 +352,7 @@ xprt_assignthread(SVCXPRT *xprt)
pool->sp_state = SVCPOOL_THREADWANTED;
}
}
+ return (FALSE);
}
void
@@ -372,9 +371,12 @@ xprt_active(SVCXPRT *xprt)
}
if (!xprt->xp_active) {
- TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
xprt->xp_active = TRUE;
- xprt_assignthread(xprt);
+ if (xprt->xp_thread == NULL) {
+ if (!xprt_assignthread(xprt))
+ TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
+ xp_alink);
+ }
}
mtx_unlock(&pool->sp_lock);
@@ -385,8 +387,10 @@ xprt_inactive_locked(SVCXPRT *xprt)
{
SVCPOOL *pool = xprt->xp_pool;
+ mtx_assert(&pool->sp_lock, MA_OWNED);
if (xprt->xp_active) {
- TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
+ if (xprt->xp_thread == NULL)
+ TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
xprt->xp_active = FALSE;
}
}
@@ -948,10 +952,11 @@ svc_assign_waiting_sockets(SVCPOOL *pool)
{
SVCXPRT *xprt;
- TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
- if (!xprt->xp_thread) {
- xprt_assignthread(xprt);
- }
+ while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
+ if (xprt_assignthread(xprt))
+ TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
+ else
+ break;
}
}
@@ -1042,21 +1047,17 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
* active transport which isn't being serviced
* by a thread.
*/
- if (svc_request_space_available(pool)) {
- TAILQ_FOREACH(xprt, &pool->sp_active,
- xp_alink) {
- if (!xprt->xp_thread) {
- SVC_ACQUIRE(xprt);
- xprt->xp_thread = st;
- st->st_xprt = xprt;
- break;
- }
- }
- }
- if (st->st_xprt)
+ if (svc_request_space_available(pool) &&
+ (xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
+ TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
+ SVC_ACQUIRE(xprt);
+ xprt->xp_thread = st;
+ st->st_xprt = xprt;
continue;
+ }
LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
+ st->st_idle = TRUE;
if (ismaster || (!ismaster &&
pool->sp_threadcount > pool->sp_minthreads))
error = cv_timedwait_sig(&st->st_cond,
@@ -1064,7 +1065,10 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
else
error = cv_wait_sig(&st->st_cond,
&pool->sp_lock);
- LIST_REMOVE(st, st_ilink);
+ if (st->st_idle) {
+ LIST_REMOVE(st, st_ilink);
+ st->st_idle = FALSE;
+ }
/*
* Reduce worker thread count when idle.
@@ -1132,11 +1136,12 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
* execute the request
* immediately.
*/
- if (stpref != st) {
- cv_signal(&stpref->st_cond);
- continue;
- } else {
+ if (stpref == st)
break;
+ if (stpref->st_idle) {
+ LIST_REMOVE(stpref, st_ilink);
+ stpref->st_idle = FALSE;
+ cv_signal(&stpref->st_cond);
}
}
} while (stat == XPRT_MOREREQS
@@ -1153,10 +1158,9 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
xprt->xp_thread = NULL;
st->st_xprt = NULL;
if (xprt->xp_active) {
- xprt_assignthread(xprt);
- TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
- TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
- xp_alink);
+ if (!xprt_assignthread(xprt))
+ TAILQ_INSERT_TAIL(&pool->sp_active,
+ xprt, xp_alink);
}
mtx_unlock(&pool->sp_lock);
SVC_RELEASE(xprt);
diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h
index 28a3c5d..0b2884f 100644
--- a/sys/rpc/svc.h
+++ b/sys/rpc/svc.h
@@ -278,6 +278,7 @@ typedef struct __rpc_svcthread {
SVCXPRT *st_xprt; /* transport we are processing */
struct svc_reqlist st_reqs; /* RPC requests to execute */
int st_reqcount; /* number of queued reqs */
+ int st_idle; /* thread is on idle list */
struct cv st_cond; /* sleeping for work */
LIST_ENTRY(__rpc_svcthread) st_link; /* all threads list */
LIST_ENTRY(__rpc_svcthread) st_ilink; /* idle threads list */
OpenPOWER on IntegriCloud