summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormav <mav@FreeBSD.org>2014-06-22 18:01:40 +0000
committermav <mav@FreeBSD.org>2014-06-22 18:01:40 +0000
commitad4e5fe06bf26964cbf5e3e5a1d6a6d85c9067c2 (patch)
tree0c21e786b3088670a512eab9e09a94fb59ed0c7d
parent6b3b15bb3a423532a03456e659c25bfa47c307ba (diff)
downloadFreeBSD-src-ad4e5fe06bf26964cbf5e3e5a1d6a6d85c9067c2.zip
FreeBSD-src-ad4e5fe06bf26964cbf5e3e5a1d6a6d85c9067c2.tar.gz
MFC r267221, r267278:
Introduce new per-thread lock to protect the list of requests. This allows to slightly simplify svc_run_internal() code: if we processed all the requests in a queue, then we know that new one will not appear.
-rw-r--r--sys/nfs/nfs_fha.c10
-rw-r--r--sys/rpc/svc.c133
-rw-r--r--sys/rpc/svc.h1
3 files changed, 59 insertions, 85 deletions
diff --git a/sys/nfs/nfs_fha.c b/sys/nfs/nfs_fha.c
index 2b29421..86f31ec 100644
--- a/sys/nfs/nfs_fha.c
+++ b/sys/nfs/nfs_fha.c
@@ -288,11 +288,7 @@ fha_hash_entry_add_op(struct fha_hash_entry *fhe, int locktype, int count)
* Get the service thread currently associated with the fhe that is
* appropriate to handle this operation.
*/
-SVCTHREAD *
-fha_hash_entry_choose_thread(struct fha_params *softc,
- struct fha_hash_entry *fhe, struct fha_info *i, SVCTHREAD *this_thread);
-
-SVCTHREAD *
+static SVCTHREAD *
fha_hash_entry_choose_thread(struct fha_params *softc,
struct fha_hash_entry *fhe, struct fha_info *i, SVCTHREAD *this_thread)
{
@@ -428,13 +424,13 @@ fha_assign(SVCTHREAD *this_thread, struct svc_req *req,
* Grab the pool lock here to not let chosen thread go away before
* the new request inserted to its queue while we drop fhe lock.
*/
- mtx_lock(&(*softc->pool)->sp_lock);
+ mtx_lock(&thread->st_lock);
mtx_unlock(fhe->mtx);
return (thread);
thist:
req->rq_p1 = NULL;
- mtx_lock(&(*softc->pool)->sp_lock);
+ mtx_lock(&this_thread->st_lock);
return (this_thread);
}
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c
index 71f9d0a..9803e02 100644
--- a/sys/rpc/svc.c
+++ b/sys/rpc/svc.c
@@ -1070,7 +1070,6 @@ svc_request_space_available(SVCPOOL *pool)
static void
svc_run_internal(SVCPOOL *pool, bool_t ismaster)
{
- struct svc_reqlist reqs;
SVCTHREAD *st, *stpref;
SVCXPRT *xprt;
enum xprt_stat stat;
@@ -1079,11 +1078,11 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
int error;
st = mem_alloc(sizeof(*st));
+ mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
st->st_pool = pool;
st->st_xprt = NULL;
STAILQ_INIT(&st->st_reqs);
cv_init(&st->st_cond, "rpcsvc");
- STAILQ_INIT(&reqs);
mtx_lock(&pool->sp_lock);
LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
@@ -1117,7 +1116,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
}
xprt = st->st_xprt;
- if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
+ if (!xprt) {
/*
* Enforce maxthreads count.
*/
@@ -1159,8 +1158,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
if (!ismaster
&& (pool->sp_threadcount
> pool->sp_minthreads)
- && !st->st_xprt
- && STAILQ_EMPTY(&st->st_reqs))
+ && !st->st_xprt)
break;
} else if (error) {
mtx_unlock(&pool->sp_lock);
@@ -1170,93 +1168,71 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
}
continue;
}
+ mtx_unlock(&pool->sp_lock);
- if (xprt) {
- /*
- * Drain the transport socket and queue up any
- * RPCs.
- */
- xprt->xp_lastactive = time_uptime;
- do {
- if (!svc_request_space_available(pool))
- break;
- mtx_unlock(&pool->sp_lock);
- rqstp = NULL;
- stat = svc_getreq(xprt, &rqstp);
- if (rqstp) {
- svc_change_space_used(pool, rqstp->rq_size);
- /*
- * See if the application has
- * a preference for some other
- * thread.
- */
- stpref = st;
- if (pool->sp_assign)
- stpref = pool->sp_assign(st,
- rqstp);
- else
- mtx_lock(&pool->sp_lock);
-
+ /*
+ * Drain the transport socket and queue up any RPCs.
+ */
+ xprt->xp_lastactive = time_uptime;
+ do {
+ if (!svc_request_space_available(pool))
+ break;
+ rqstp = NULL;
+ stat = svc_getreq(xprt, &rqstp);
+ if (rqstp) {
+ svc_change_space_used(pool, rqstp->rq_size);
+ /*
+ * See if the application has a preference
+ * for some other thread.
+ */
+ if (pool->sp_assign) {
+ stpref = pool->sp_assign(st, rqstp);
rqstp->rq_thread = stpref;
STAILQ_INSERT_TAIL(&stpref->st_reqs,
rqstp, rq_link);
-
- /*
- * If we assigned the request
- * to another thread, make
- * sure its awake and continue
- * reading from the
- * socket. Otherwise, try to
- * find some other thread to
- * read from the socket and
- * execute the request
- * immediately.
- */
- if (stpref == st)
- break;
- if (stpref->st_idle) {
- LIST_REMOVE(stpref, st_ilink);
- stpref->st_idle = FALSE;
- cv_signal(&stpref->st_cond);
- }
- } else
- mtx_lock(&pool->sp_lock);
- } while (stat == XPRT_MOREREQS
- && pool->sp_state != SVCPOOL_CLOSING);
-
- /*
- * Move this transport to the end of the
- * active list to ensure fairness when
- * multiple transports are active. If this was
- * the last queued request, svc_getreq will
- * end up calling xprt_inactive to remove from
- * the active list.
- */
- xprt->xp_thread = NULL;
- st->st_xprt = NULL;
- if (xprt->xp_active) {
- if (!svc_request_space_available(pool) ||
- !xprt_assignthread(xprt))
- TAILQ_INSERT_TAIL(&pool->sp_active,
- xprt, xp_alink);
+ mtx_unlock(&stpref->st_lock);
+ if (stpref != st)
+ rqstp = NULL;
+ } else {
+ rqstp->rq_thread = st;
+ STAILQ_INSERT_TAIL(&st->st_reqs,
+ rqstp, rq_link);
+ }
}
- STAILQ_CONCAT(&reqs, &st->st_reqs);
- mtx_unlock(&pool->sp_lock);
- SVC_RELEASE(xprt);
- } else {
- STAILQ_CONCAT(&reqs, &st->st_reqs);
- mtx_unlock(&pool->sp_lock);
+ } while (rqstp == NULL && stat == XPRT_MOREREQS
+ && pool->sp_state != SVCPOOL_CLOSING);
+
+ /*
+ * Move this transport to the end of the active list to
+ * ensure fairness when multiple transports are active.
+ * If this was the last queued request, svc_getreq will end
+ * up calling xprt_inactive to remove from the active list.
+ */
+ mtx_lock(&pool->sp_lock);
+ xprt->xp_thread = NULL;
+ st->st_xprt = NULL;
+ if (xprt->xp_active) {
+ if (!svc_request_space_available(pool) ||
+ !xprt_assignthread(xprt))
+ TAILQ_INSERT_TAIL(&pool->sp_active,
+ xprt, xp_alink);
}
+ mtx_unlock(&pool->sp_lock);
+ SVC_RELEASE(xprt);
/*
* Execute what we have queued.
*/
sz = 0;
- while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) {
- STAILQ_REMOVE_HEAD(&reqs, rq_link);
+ mtx_lock(&st->st_lock);
+ while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
+ STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
+ mtx_unlock(&st->st_lock);
sz += rqstp->rq_size;
svc_executereq(rqstp);
+ mtx_lock(&st->st_lock);
}
+ mtx_unlock(&st->st_lock);
svc_change_space_used(pool, -sz);
mtx_lock(&pool->sp_lock);
}
@@ -1273,6 +1249,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
mtx_unlock(&pool->sp_lock);
+ mtx_destroy(&st->st_lock);
cv_destroy(&st->st_cond);
mem_free(st, sizeof(*st));
diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h
index a7f5f51..108c3b1 100644
--- a/sys/rpc/svc.h
+++ b/sys/rpc/svc.h
@@ -291,6 +291,7 @@ STAILQ_HEAD(svc_reqlist, svc_req);
* thread to read and execute pending RPCs.
*/
typedef struct __rpc_svcthread {
+ struct mtx_padalign st_lock; /* protects st_reqs field */
struct __rpc_svcpool *st_pool;
SVCXPRT *st_xprt; /* transport we are processing */
struct svc_reqlist st_reqs; /* RPC requests to execute */
OpenPOWER on IntegriCloud