summaryrefslogtreecommitdiffstats
path: root/sys/rpc
diff options
context:
space:
mode:
authormav <mav@FreeBSD.org>2014-06-08 09:40:26 +0000
committermav <mav@FreeBSD.org>2014-06-08 09:40:26 +0000
commitaa54e738fe2ad03844beb77d1e655206c829d0f3 (patch)
tree649ba977a4e006579b01fdb75fef67caed842b83 /sys/rpc
parent8269c2a4e56192b60c09e4f1beca74a4d03f56eb (diff)
downloadFreeBSD-src-aa54e738fe2ad03844beb77d1e655206c829d0f3.zip
FreeBSD-src-aa54e738fe2ad03844beb77d1e655206c829d0f3.tar.gz
Introduce new per-thread lock to protect the list of requests.
This allows to slightly simplify svc_run_internal() code: if we processed all the requests in a queue, then we know that new one will not appear. MFC after: 2 weeks
Diffstat (limited to 'sys/rpc')
-rw-r--r--sys/rpc/svc.c131
-rw-r--r--sys/rpc/svc.h1
2 files changed, 54 insertions, 78 deletions
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c
index 71f9d0a..c62f5e7 100644
--- a/sys/rpc/svc.c
+++ b/sys/rpc/svc.c
@@ -1070,7 +1070,6 @@ svc_request_space_available(SVCPOOL *pool)
static void
svc_run_internal(SVCPOOL *pool, bool_t ismaster)
{
- struct svc_reqlist reqs;
SVCTHREAD *st, *stpref;
SVCXPRT *xprt;
enum xprt_stat stat;
@@ -1079,11 +1078,11 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
int error;
st = mem_alloc(sizeof(*st));
+ mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
st->st_pool = pool;
st->st_xprt = NULL;
STAILQ_INIT(&st->st_reqs);
cv_init(&st->st_cond, "rpcsvc");
- STAILQ_INIT(&reqs);
mtx_lock(&pool->sp_lock);
LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
@@ -1117,7 +1116,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
}
xprt = st->st_xprt;
- if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
+ if (!xprt) {
/*
* Enforce maxthreads count.
*/
@@ -1159,8 +1158,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
if (!ismaster
&& (pool->sp_threadcount
> pool->sp_minthreads)
- && !st->st_xprt
- && STAILQ_EMPTY(&st->st_reqs))
+ && !st->st_xprt)
break;
} else if (error) {
mtx_unlock(&pool->sp_lock);
@@ -1170,93 +1168,69 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
}
continue;
}
+ mtx_unlock(&pool->sp_lock);
- if (xprt) {
- /*
- * Drain the transport socket and queue up any
- * RPCs.
- */
- xprt->xp_lastactive = time_uptime;
- do {
- if (!svc_request_space_available(pool))
- break;
- mtx_unlock(&pool->sp_lock);
- rqstp = NULL;
- stat = svc_getreq(xprt, &rqstp);
- if (rqstp) {
- svc_change_space_used(pool, rqstp->rq_size);
- /*
- * See if the application has
- * a preference for some other
- * thread.
- */
- stpref = st;
- if (pool->sp_assign)
- stpref = pool->sp_assign(st,
- rqstp);
- else
- mtx_lock(&pool->sp_lock);
-
- rqstp->rq_thread = stpref;
+ /*
+ * Drain the transport socket and queue up any RPCs.
+ */
+ xprt->xp_lastactive = time_uptime;
+ do {
+ if (!svc_request_space_available(pool))
+ break;
+ rqstp = NULL;
+ stat = svc_getreq(xprt, &rqstp);
+ if (rqstp) {
+ svc_change_space_used(pool, rqstp->rq_size);
+ /*
+ * See if the application has a preference
+ * for some other thread.
+ */
+ if (pool->sp_assign) {
+ stpref = pool->sp_assign(st, rqstp);
STAILQ_INSERT_TAIL(&stpref->st_reqs,
rqstp, rq_link);
-
- /*
- * If we assigned the request
- * to another thread, make
- * sure its awake and continue
- * reading from the
- * socket. Otherwise, try to
- * find some other thread to
- * read from the socket and
- * execute the request
- * immediately.
- */
- if (stpref == st)
- break;
- if (stpref->st_idle) {
- LIST_REMOVE(stpref, st_ilink);
- stpref->st_idle = FALSE;
- cv_signal(&stpref->st_cond);
- }
+ mtx_unlock(&stpref->st_lock);
+ rqstp->rq_thread = stpref;
+ if (stpref != st)
+ rqstp = NULL;
} else
- mtx_lock(&pool->sp_lock);
- } while (stat == XPRT_MOREREQS
- && pool->sp_state != SVCPOOL_CLOSING);
-
- /*
- * Move this transport to the end of the
- * active list to ensure fairness when
- * multiple transports are active. If this was
- * the last queued request, svc_getreq will
- * end up calling xprt_inactive to remove from
- * the active list.
- */
- xprt->xp_thread = NULL;
- st->st_xprt = NULL;
- if (xprt->xp_active) {
- if (!svc_request_space_available(pool) ||
- !xprt_assignthread(xprt))
- TAILQ_INSERT_TAIL(&pool->sp_active,
- xprt, xp_alink);
+ STAILQ_INSERT_TAIL(&st->st_reqs,
+ rqstp, rq_link);
}
- STAILQ_CONCAT(&reqs, &st->st_reqs);
- mtx_unlock(&pool->sp_lock);
- SVC_RELEASE(xprt);
- } else {
- STAILQ_CONCAT(&reqs, &st->st_reqs);
- mtx_unlock(&pool->sp_lock);
+ } while (rqstp == NULL && stat == XPRT_MOREREQS
+ && pool->sp_state != SVCPOOL_CLOSING);
+
+ /*
+ * Move this transport to the end of the active list to
+ * ensure fairness when multiple transports are active.
+ * If this was the last queued request, svc_getreq will end
+ * up calling xprt_inactive to remove from the active list.
+ */
+ mtx_lock(&pool->sp_lock);
+ xprt->xp_thread = NULL;
+ st->st_xprt = NULL;
+ if (xprt->xp_active) {
+ if (!svc_request_space_available(pool) ||
+ !xprt_assignthread(xprt))
+ TAILQ_INSERT_TAIL(&pool->sp_active,
+ xprt, xp_alink);
}
+ mtx_unlock(&pool->sp_lock);
+ SVC_RELEASE(xprt);
/*
* Execute what we have queued.
*/
sz = 0;
- while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) {
- STAILQ_REMOVE_HEAD(&reqs, rq_link);
+ mtx_lock(&st->st_lock);
+ while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
+ STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
+ mtx_unlock(&st->st_lock);
sz += rqstp->rq_size;
svc_executereq(rqstp);
+ mtx_lock(&st->st_lock);
}
+ mtx_unlock(&st->st_lock);
svc_change_space_used(pool, -sz);
mtx_lock(&pool->sp_lock);
}
@@ -1273,6 +1247,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
mtx_unlock(&pool->sp_lock);
+ mtx_destroy(&st->st_lock);
cv_destroy(&st->st_cond);
mem_free(st, sizeof(*st));
diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h
index a7f5f51..108c3b1 100644
--- a/sys/rpc/svc.h
+++ b/sys/rpc/svc.h
@@ -291,6 +291,7 @@ STAILQ_HEAD(svc_reqlist, svc_req);
* thread to read and execute pending RPCs.
*/
typedef struct __rpc_svcthread {
+ struct mtx_padalign st_lock; /* protects st_reqs field */
struct __rpc_svcpool *st_pool;
SVCXPRT *st_xprt; /* transport we are processing */
struct svc_reqlist st_reqs; /* RPC requests to execute */
OpenPOWER on IntegriCloud