diff options
author | mav <mav@FreeBSD.org> | 2014-06-22 18:01:40 +0000 |
---|---|---|
committer | mav <mav@FreeBSD.org> | 2014-06-22 18:01:40 +0000 |
commit | ad4e5fe06bf26964cbf5e3e5a1d6a6d85c9067c2 (patch) | |
tree | 0c21e786b3088670a512eab9e09a94fb59ed0c7d | |
parent | 6b3b15bb3a423532a03456e659c25bfa47c307ba (diff) | |
download | FreeBSD-src-ad4e5fe06bf26964cbf5e3e5a1d6a6d85c9067c2.zip FreeBSD-src-ad4e5fe06bf26964cbf5e3e5a1d6a6d85c9067c2.tar.gz |
MFC r267221, r267278:
Introduce new per-thread lock to protect the list of requests.
This allows to slightly simplify svc_run_internal() code: if we processed
all the requests in a queue, then we know that new one will not appear.
-rw-r--r-- | sys/nfs/nfs_fha.c | 10 | ||||
-rw-r--r-- | sys/rpc/svc.c | 133 | ||||
-rw-r--r-- | sys/rpc/svc.h | 1 |
3 files changed, 59 insertions, 85 deletions
diff --git a/sys/nfs/nfs_fha.c b/sys/nfs/nfs_fha.c index 2b29421..86f31ec 100644 --- a/sys/nfs/nfs_fha.c +++ b/sys/nfs/nfs_fha.c @@ -288,11 +288,7 @@ fha_hash_entry_add_op(struct fha_hash_entry *fhe, int locktype, int count) * Get the service thread currently associated with the fhe that is * appropriate to handle this operation. */ -SVCTHREAD * -fha_hash_entry_choose_thread(struct fha_params *softc, - struct fha_hash_entry *fhe, struct fha_info *i, SVCTHREAD *this_thread); - -SVCTHREAD * +static SVCTHREAD * fha_hash_entry_choose_thread(struct fha_params *softc, struct fha_hash_entry *fhe, struct fha_info *i, SVCTHREAD *this_thread) { @@ -428,13 +424,13 @@ fha_assign(SVCTHREAD *this_thread, struct svc_req *req, * Grab the pool lock here to not let chosen thread go away before * the new request inserted to its queue while we drop fhe lock. */ - mtx_lock(&(*softc->pool)->sp_lock); + mtx_lock(&thread->st_lock); mtx_unlock(fhe->mtx); return (thread); thist: req->rq_p1 = NULL; - mtx_lock(&(*softc->pool)->sp_lock); + mtx_lock(&this_thread->st_lock); return (this_thread); } diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c index 71f9d0a..9803e02 100644 --- a/sys/rpc/svc.c +++ b/sys/rpc/svc.c @@ -1070,7 +1070,6 @@ svc_request_space_available(SVCPOOL *pool) static void svc_run_internal(SVCPOOL *pool, bool_t ismaster) { - struct svc_reqlist reqs; SVCTHREAD *st, *stpref; SVCXPRT *xprt; enum xprt_stat stat; @@ -1079,11 +1078,11 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) int error; st = mem_alloc(sizeof(*st)); + mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); st->st_pool = pool; st->st_xprt = NULL; STAILQ_INIT(&st->st_reqs); cv_init(&st->st_cond, "rpcsvc"); - STAILQ_INIT(&reqs); mtx_lock(&pool->sp_lock); LIST_INSERT_HEAD(&pool->sp_threads, st, st_link); @@ -1117,7 +1116,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) } xprt = st->st_xprt; - if (!xprt && STAILQ_EMPTY(&st->st_reqs)) { + if (!xprt) { /* * Enforce maxthreads count. */ @@ -1159,8 +1158,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) if (!ismaster && (pool->sp_threadcount > pool->sp_minthreads) - && !st->st_xprt - && STAILQ_EMPTY(&st->st_reqs)) + && !st->st_xprt) break; } else if (error) { mtx_unlock(&pool->sp_lock); @@ -1170,93 +1168,71 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) } continue; } + mtx_unlock(&pool->sp_lock); - if (xprt) { - /* - * Drain the transport socket and queue up any - * RPCs. - */ - xprt->xp_lastactive = time_uptime; - do { - if (!svc_request_space_available(pool)) - break; - mtx_unlock(&pool->sp_lock); - rqstp = NULL; - stat = svc_getreq(xprt, &rqstp); - if (rqstp) { - svc_change_space_used(pool, rqstp->rq_size); - /* - * See if the application has - * a preference for some other - * thread. - */ - stpref = st; - if (pool->sp_assign) - stpref = pool->sp_assign(st, - rqstp); - else - mtx_lock(&pool->sp_lock); - + /* + * Drain the transport socket and queue up any RPCs. + */ + xprt->xp_lastactive = time_uptime; + do { + if (!svc_request_space_available(pool)) + break; + rqstp = NULL; + stat = svc_getreq(xprt, &rqstp); + if (rqstp) { + svc_change_space_used(pool, rqstp->rq_size); + /* + * See if the application has a preference + * for some other thread. + */ + if (pool->sp_assign) { + stpref = pool->sp_assign(st, rqstp); rqstp->rq_thread = stpref; STAILQ_INSERT_TAIL(&stpref->st_reqs, rqstp, rq_link); - - /* - * If we assigned the request - * to another thread, make - * sure its awake and continue - * reading from the - * socket. Otherwise, try to - * find some other thread to - * read from the socket and - * execute the request - * immediately. - */ - if (stpref == st) - break; - if (stpref->st_idle) { - LIST_REMOVE(stpref, st_ilink); - stpref->st_idle = FALSE; - cv_signal(&stpref->st_cond); - } - } else - mtx_lock(&pool->sp_lock); - } while (stat == XPRT_MOREREQS - && pool->sp_state != SVCPOOL_CLOSING); - - /* - * Move this transport to the end of the - * active list to ensure fairness when - * multiple transports are active. If this was - * the last queued request, svc_getreq will - * end up calling xprt_inactive to remove from - * the active list. - */ - xprt->xp_thread = NULL; - st->st_xprt = NULL; - if (xprt->xp_active) { - if (!svc_request_space_available(pool) || - !xprt_assignthread(xprt)) - TAILQ_INSERT_TAIL(&pool->sp_active, - xprt, xp_alink); + mtx_unlock(&stpref->st_lock); + if (stpref != st) + rqstp = NULL; + } else { + rqstp->rq_thread = st; + STAILQ_INSERT_TAIL(&st->st_reqs, + rqstp, rq_link); + } } - STAILQ_CONCAT(&reqs, &st->st_reqs); - mtx_unlock(&pool->sp_lock); - SVC_RELEASE(xprt); - } else { - STAILQ_CONCAT(&reqs, &st->st_reqs); - mtx_unlock(&pool->sp_lock); + } while (rqstp == NULL && stat == XPRT_MOREREQS + && pool->sp_state != SVCPOOL_CLOSING); + + /* + * Move this transport to the end of the active list to + * ensure fairness when multiple transports are active. + * If this was the last queued request, svc_getreq will end + * up calling xprt_inactive to remove from the active list. + */ + mtx_lock(&pool->sp_lock); + xprt->xp_thread = NULL; + st->st_xprt = NULL; + if (xprt->xp_active) { + if (!svc_request_space_available(pool) || + !xprt_assignthread(xprt)) + TAILQ_INSERT_TAIL(&pool->sp_active, + xprt, xp_alink); } + mtx_unlock(&pool->sp_lock); + SVC_RELEASE(xprt); /* * Execute what we have queued. */ sz = 0; - while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) { - STAILQ_REMOVE_HEAD(&reqs, rq_link); + mtx_lock(&st->st_lock); + while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { + STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); + mtx_unlock(&st->st_lock); sz += rqstp->rq_size; svc_executereq(rqstp); + mtx_lock(&st->st_lock); } + mtx_unlock(&st->st_lock); svc_change_space_used(pool, -sz); mtx_lock(&pool->sp_lock); } @@ -1273,6 +1249,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) mtx_unlock(&pool->sp_lock); + mtx_destroy(&st->st_lock); cv_destroy(&st->st_cond); mem_free(st, sizeof(*st)); diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h index a7f5f51..108c3b1 100644 --- a/sys/rpc/svc.h +++ b/sys/rpc/svc.h @@ -291,6 +291,7 @@ STAILQ_HEAD(svc_reqlist, svc_req); * thread to read and execute pending RPCs. */ typedef struct __rpc_svcthread { + struct mtx_padalign st_lock; /* protects st_reqs field */ struct __rpc_svcpool *st_pool; SVCXPRT *st_xprt; /* transport we are processing */ struct svc_reqlist st_reqs; /* RPC requests to execute */ |