diff options
author | mav <mav@FreeBSD.org> | 2014-06-08 09:40:26 +0000 |
---|---|---|
committer | mav <mav@FreeBSD.org> | 2014-06-08 09:40:26 +0000 |
commit | aa54e738fe2ad03844beb77d1e655206c829d0f3 (patch) | |
tree | 649ba977a4e006579b01fdb75fef67caed842b83 /sys/rpc | |
parent | 8269c2a4e56192b60c09e4f1beca74a4d03f56eb (diff) | |
download | FreeBSD-src-aa54e738fe2ad03844beb77d1e655206c829d0f3.zip FreeBSD-src-aa54e738fe2ad03844beb77d1e655206c829d0f3.tar.gz |
Introduce new per-thread lock to protect the list of requests.
This allows to slightly simplify svc_run_internal() code: if we processed
all the requests in a queue, then we know that new one will not appear.
MFC after: 2 weeks
Diffstat (limited to 'sys/rpc')
-rw-r--r-- | sys/rpc/svc.c | 131 | ||||
-rw-r--r-- | sys/rpc/svc.h | 1 |
2 files changed, 54 insertions, 78 deletions
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c index 71f9d0a..c62f5e7 100644 --- a/sys/rpc/svc.c +++ b/sys/rpc/svc.c @@ -1070,7 +1070,6 @@ svc_request_space_available(SVCPOOL *pool) static void svc_run_internal(SVCPOOL *pool, bool_t ismaster) { - struct svc_reqlist reqs; SVCTHREAD *st, *stpref; SVCXPRT *xprt; enum xprt_stat stat; @@ -1079,11 +1078,11 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) int error; st = mem_alloc(sizeof(*st)); + mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); st->st_pool = pool; st->st_xprt = NULL; STAILQ_INIT(&st->st_reqs); cv_init(&st->st_cond, "rpcsvc"); - STAILQ_INIT(&reqs); mtx_lock(&pool->sp_lock); LIST_INSERT_HEAD(&pool->sp_threads, st, st_link); @@ -1117,7 +1116,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) } xprt = st->st_xprt; - if (!xprt && STAILQ_EMPTY(&st->st_reqs)) { + if (!xprt) { /* * Enforce maxthreads count. */ @@ -1159,8 +1158,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) if (!ismaster && (pool->sp_threadcount > pool->sp_minthreads) - && !st->st_xprt - && STAILQ_EMPTY(&st->st_reqs)) + && !st->st_xprt) break; } else if (error) { mtx_unlock(&pool->sp_lock); @@ -1170,93 +1168,69 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) } continue; } + mtx_unlock(&pool->sp_lock); - if (xprt) { - /* - * Drain the transport socket and queue up any - * RPCs. - */ - xprt->xp_lastactive = time_uptime; - do { - if (!svc_request_space_available(pool)) - break; - mtx_unlock(&pool->sp_lock); - rqstp = NULL; - stat = svc_getreq(xprt, &rqstp); - if (rqstp) { - svc_change_space_used(pool, rqstp->rq_size); - /* - * See if the application has - * a preference for some other - * thread. - */ - stpref = st; - if (pool->sp_assign) - stpref = pool->sp_assign(st, - rqstp); - else - mtx_lock(&pool->sp_lock); - - rqstp->rq_thread = stpref; + /* + * Drain the transport socket and queue up any RPCs. + */ + xprt->xp_lastactive = time_uptime; + do { + if (!svc_request_space_available(pool)) + break; + rqstp = NULL; + stat = svc_getreq(xprt, &rqstp); + if (rqstp) { + svc_change_space_used(pool, rqstp->rq_size); + /* + * See if the application has a preference + * for some other thread. + */ + if (pool->sp_assign) { + stpref = pool->sp_assign(st, rqstp); STAILQ_INSERT_TAIL(&stpref->st_reqs, rqstp, rq_link); - - /* - * If we assigned the request - * to another thread, make - * sure its awake and continue - * reading from the - * socket. Otherwise, try to - * find some other thread to - * read from the socket and - * execute the request - * immediately. - */ - if (stpref == st) - break; - if (stpref->st_idle) { - LIST_REMOVE(stpref, st_ilink); - stpref->st_idle = FALSE; - cv_signal(&stpref->st_cond); - } + mtx_unlock(&stpref->st_lock); + rqstp->rq_thread = stpref; + if (stpref != st) + rqstp = NULL; } else - mtx_lock(&pool->sp_lock); - } while (stat == XPRT_MOREREQS - && pool->sp_state != SVCPOOL_CLOSING); - - /* - * Move this transport to the end of the - * active list to ensure fairness when - * multiple transports are active. If this was - * the last queued request, svc_getreq will - * end up calling xprt_inactive to remove from - * the active list. - */ - xprt->xp_thread = NULL; - st->st_xprt = NULL; - if (xprt->xp_active) { - if (!svc_request_space_available(pool) || - !xprt_assignthread(xprt)) - TAILQ_INSERT_TAIL(&pool->sp_active, - xprt, xp_alink); + STAILQ_INSERT_TAIL(&st->st_reqs, + rqstp, rq_link); } - STAILQ_CONCAT(&reqs, &st->st_reqs); - mtx_unlock(&pool->sp_lock); - SVC_RELEASE(xprt); - } else { - STAILQ_CONCAT(&reqs, &st->st_reqs); - mtx_unlock(&pool->sp_lock); + } while (rqstp == NULL && stat == XPRT_MOREREQS + && pool->sp_state != SVCPOOL_CLOSING); + + /* + * Move this transport to the end of the active list to + * ensure fairness when multiple transports are active. + * If this was the last queued request, svc_getreq will end + * up calling xprt_inactive to remove from the active list. + */ + mtx_lock(&pool->sp_lock); + xprt->xp_thread = NULL; + st->st_xprt = NULL; + if (xprt->xp_active) { + if (!svc_request_space_available(pool) || + !xprt_assignthread(xprt)) + TAILQ_INSERT_TAIL(&pool->sp_active, + xprt, xp_alink); } + mtx_unlock(&pool->sp_lock); + SVC_RELEASE(xprt); /* * Execute what we have queued. */ sz = 0; - while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) { - STAILQ_REMOVE_HEAD(&reqs, rq_link); + mtx_lock(&st->st_lock); + while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { + STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); + mtx_unlock(&st->st_lock); sz += rqstp->rq_size; svc_executereq(rqstp); + mtx_lock(&st->st_lock); } + mtx_unlock(&st->st_lock); svc_change_space_used(pool, -sz); mtx_lock(&pool->sp_lock); } @@ -1273,6 +1247,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) mtx_unlock(&pool->sp_lock); + mtx_destroy(&st->st_lock); cv_destroy(&st->st_cond); mem_free(st, sizeof(*st)); diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h index a7f5f51..108c3b1 100644 --- a/sys/rpc/svc.h +++ b/sys/rpc/svc.h @@ -291,6 +291,7 @@ STAILQ_HEAD(svc_reqlist, svc_req); * thread to read and execute pending RPCs. */ typedef struct __rpc_svcthread { + struct mtx_padalign st_lock; /* protects st_reqs field */ struct __rpc_svcpool *st_pool; SVCXPRT *st_xprt; /* transport we are processing */ struct svc_reqlist st_reqs; /* RPC requests to execute */ |