diff options
-rw-r--r-- | sys/nfs/nfs_fha.c | 138 | ||||
-rw-r--r-- | sys/nfs/nfs_fha.h | 14 | ||||
-rw-r--r-- | sys/rpc/svc.c | 103 | ||||
-rw-r--r-- | sys/rpc/svc.h | 4 |
4 files changed, 130 insertions, 129 deletions
diff --git a/sys/nfs/nfs_fha.c b/sys/nfs/nfs_fha.c index 1892729..2b29421 100644 --- a/sys/nfs/nfs_fha.c +++ b/sys/nfs/nfs_fha.c @@ -52,13 +52,10 @@ void fha_init(struct fha_params *softc) { char tmpstr[128]; + int i; - /* - * A small hash table to map filehandles to fha_hash_entry - * structures. - */ - softc->g_fha.hashtable = hashinit(256, M_NFS_FHA, - &softc->g_fha.hashmask); + for (i = 0; i < FHA_HASH_SIZE; i++) + mtx_init(&softc->fha_hash[i].mtx, "fhalock", NULL, MTX_DEF); /* * Set the default tuning parameters. @@ -117,8 +114,11 @@ fha_init(struct fha_params *softc) void fha_uninit(struct fha_params *softc) { + int i; + sysctl_ctx_free(&softc->sysctl_ctx); - hashdestroy(softc->g_fha.hashtable, M_NFS_FHA, softc->g_fha.hashmask); + for (i = 0; i < FHA_HASH_SIZE; i++) + mtx_destroy(&softc->fha_hash[i].mtx); } /* @@ -207,8 +207,13 @@ static void fha_hash_entry_destroy(struct fha_hash_entry *e) { - if (e->num_rw + e->num_exclusive) - panic("nonempty fhe"); + mtx_assert(e->mtx, MA_OWNED); + KASSERT(e->num_rw == 0, + ("%d reqs on destroyed fhe %p", e->num_rw, e)); + KASSERT(e->num_exclusive == 0, + ("%d exclusive reqs on destroyed fhe %p", e->num_exclusive, e)); + KASSERT(e->num_threads == 0, + ("%d threads on destroyed fhe %p", e->num_threads, e)); free(e, M_NFS_FHA); } @@ -216,6 +221,7 @@ static void fha_hash_entry_remove(struct fha_hash_entry *e) { + mtx_assert(e->mtx, MA_OWNED); LIST_REMOVE(e, link); fha_hash_entry_destroy(e); } @@ -224,36 +230,22 @@ static struct fha_hash_entry * fha_hash_entry_lookup(struct fha_params *softc, u_int64_t fh) { SVCPOOL *pool; - - pool = *softc->pool; - + struct fha_hash_slot *fhs; struct fha_hash_entry *fhe, *new_fhe; - LIST_FOREACH(fhe, &softc->g_fha.hashtable[fh % softc->g_fha.hashmask], - link) + pool = *softc->pool; + fhs = &softc->fha_hash[fh % FHA_HASH_SIZE]; + new_fhe = fha_hash_entry_new(fh); + new_fhe->mtx = &fhs->mtx; + mtx_lock(&fhs->mtx); + LIST_FOREACH(fhe, &fhs->list, link) if (fhe->fh == fh) break; - if (!fhe) { - /* Allocate a new entry. */ - mtx_unlock(&pool->sp_lock); - new_fhe = fha_hash_entry_new(fh); - mtx_lock(&pool->sp_lock); - - /* Double-check to make sure we still need the new entry. */ - LIST_FOREACH(fhe, - &softc->g_fha.hashtable[fh % softc->g_fha.hashmask], link) - if (fhe->fh == fh) - break; - if (!fhe) { - fhe = new_fhe; - LIST_INSERT_HEAD( - &softc->g_fha.hashtable[fh % softc->g_fha.hashmask], - fhe, link); - } else - fha_hash_entry_destroy(new_fhe); - } - + fhe = new_fhe; + LIST_INSERT_HEAD(&fhs->list, fhe, link); + } else + fha_hash_entry_destroy(new_fhe); return (fhe); } @@ -261,6 +253,8 @@ static void fha_hash_entry_add_thread(struct fha_hash_entry *fhe, SVCTHREAD *thread) { + mtx_assert(fhe->mtx, MA_OWNED); + thread->st_p2 = 0; LIST_INSERT_HEAD(&fhe->threads, thread, st_alink); fhe->num_threads++; } @@ -269,6 +263,9 @@ static void fha_hash_entry_remove_thread(struct fha_hash_entry *fhe, SVCTHREAD *thread) { + mtx_assert(fhe->mtx, MA_OWNED); + KASSERT(thread->st_p2 == 0, + ("%d reqs on removed thread %p", thread->st_p2, thread)); LIST_REMOVE(thread, st_alink); fhe->num_threads--; } @@ -280,6 +277,7 @@ static void fha_hash_entry_add_op(struct fha_hash_entry *fhe, int locktype, int count) { + mtx_assert(fhe->mtx, MA_OWNED); if (LK_EXCLUSIVE == locktype) fhe->num_exclusive += count; else @@ -306,7 +304,7 @@ fha_hash_entry_choose_thread(struct fha_params *softc, pool = *softc->pool; LIST_FOREACH(thread, &fhe->threads, st_alink) { - req_count = thread->st_reqcount; + req_count = thread->st_p2; /* If there are any writes in progress, use the first thread. */ if (fhe->num_exclusive) { @@ -322,7 +320,7 @@ fha_hash_entry_choose_thread(struct fha_params *softc, * exceed our per-thread load limit in the process. */ offset1 = i->offset; - offset2 = STAILQ_FIRST(&thread->st_reqs)->rq_p3; + offset2 = thread->st_p3; if (((offset1 >= offset2) && ((offset1 - offset2) < (1 << softc->ctls.bin_shift))) @@ -360,28 +358,11 @@ fha_hash_entry_choose_thread(struct fha_params *softc, */ if ((softc->ctls.max_nfsds_per_fh == 0) || (fhe->num_threads < softc->ctls.max_nfsds_per_fh)) { - /* - * We can add a new thread, so try for an idle thread - * first, and fall back to this_thread if none are idle. - */ - if (STAILQ_EMPTY(&this_thread->st_reqs)) { - thread = this_thread; + thread = this_thread; #if 0 - ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO, - "fha: %p(%d)t", thread, thread->st_reqcount); -#endif - } else if ((thread = LIST_FIRST(&pool->sp_idlethreads))) { -#if 0 - ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO, - "fha: %p(%d)i", thread, thread->st_reqcount); -#endif - } else { - thread = this_thread; -#if 0 - ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO, - "fha: %p(%d)b", thread, thread->st_reqcount); + ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO, + "fha: %p(%d)t", thread, thread->st_p2); #endif - } fha_hash_entry_add_thread(fhe, thread); } else { /* @@ -411,16 +392,16 @@ fha_assign(SVCTHREAD *this_thread, struct svc_req *req, /* Check to see whether we're enabled. */ if (softc->ctls.enable == 0) - return (this_thread); + goto thist; /* * Only do placement if this is an NFS request. */ if (req->rq_prog != NFS_PROG) - return (this_thread); + goto thist; if (req->rq_vers != 2 && req->rq_vers != 3) - return (this_thread); + goto thist; fha_extract_info(req, &i, cb); @@ -440,8 +421,21 @@ fha_assign(SVCTHREAD *this_thread, struct svc_req *req, thread = fha_hash_entry_choose_thread(softc, fhe, &i, this_thread); KASSERT(thread, ("fha_assign: NULL thread!")); fha_hash_entry_add_op(fhe, i.locktype, 1); + thread->st_p2++; + thread->st_p3 = i.offset; + + /* + * Grab the pool lock here to not let chosen thread go away before + * the new request inserted to its queue while we drop fhe lock. + */ + mtx_lock(&(*softc->pool)->sp_lock); + mtx_unlock(fhe->mtx); return (thread); +thist: + req->rq_p1 = NULL; + mtx_lock(&(*softc->pool)->sp_lock); + return (this_thread); } /* @@ -452,6 +446,7 @@ void fha_nd_complete(SVCTHREAD *thread, struct svc_req *req) { struct fha_hash_entry *fhe = req->rq_p1; + struct mtx *mtx; /* * This may be called for reqs that didn't go through @@ -460,13 +455,18 @@ fha_nd_complete(SVCTHREAD *thread, struct svc_req *req) if (!fhe) return; + mtx = fhe->mtx; + mtx_lock(mtx); fha_hash_entry_add_op(fhe, req->rq_p2, -1); - - if (thread->st_reqcount == 0) { + thread->st_p2--; + KASSERT(thread->st_p2 >= 0, ("Negative request count %d on %p", + thread->st_p2, thread)); + if (thread->st_p2 == 0) { fha_hash_entry_remove_thread(fhe, thread); if (0 == fhe->num_rw + fhe->num_exclusive) fha_hash_entry_remove(fhe); } + mtx_unlock(mtx); } int @@ -489,10 +489,9 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, struct fha_params *softc) } pool = *softc->pool; - mtx_lock(&pool->sp_lock); count = 0; - for (i = 0; i <= softc->g_fha.hashmask; i++) - if (!LIST_EMPTY(&softc->g_fha.hashtable[i])) + for (i = 0; i < FHA_HASH_SIZE; i++) + if (!LIST_EMPTY(&softc->fha_hash[i].list)) count++; if (count == 0) { @@ -500,8 +499,9 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, struct fha_params *softc) goto out; } - for (i = 0; i <= softc->g_fha.hashmask; i++) { - LIST_FOREACH(fhe, &softc->g_fha.hashtable[i], link) { + for (i = 0; i < FHA_HASH_SIZE; i++) { + mtx_lock(&softc->fha_hash[i].mtx); + LIST_FOREACH(fhe, &softc->fha_hash[i].list, link) { sbuf_printf(&sb, "%sfhe %p: {\n", first ? "" : ", ", fhe); sbuf_printf(&sb, " fh: %ju\n", (uintmax_t) fhe->fh); @@ -512,8 +512,7 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, struct fha_params *softc) LIST_FOREACH(thread, &fhe->threads, st_alink) { sbuf_printf(&sb, " thread %p offset %ju " "(count %d)\n", thread, - STAILQ_FIRST(&thread->st_reqs)->rq_p3, - thread->st_reqcount); + thread->st_p3, thread->st_p2); } sbuf_printf(&sb, "}"); @@ -525,11 +524,10 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, struct fha_params *softc) break; } } + mtx_unlock(&softc->fha_hash[i].mtx); } out: - if (pool) - mtx_unlock(&pool->sp_lock); sbuf_trim(&sb); sbuf_finish(&sb); error = sysctl_handle_string(oidp, sbuf_data(&sb), sbuf_len(&sb), req); diff --git a/sys/nfs/nfs_fha.h b/sys/nfs/nfs_fha.h index d3dd039..58aa7fe 100644 --- a/sys/nfs/nfs_fha.h +++ b/sys/nfs/nfs_fha.h @@ -35,11 +35,7 @@ #define FHA_DEF_MAX_NFSDS_PER_FH 8 #define FHA_DEF_MAX_REQS_PER_NFSD 0 /* Unlimited */ -/* This is the global structure that represents the state of the fha system. */ -struct fha_global { - struct fha_hash_entry_list *hashtable; - u_long hashmask; -}; +#define FHA_HASH_SIZE 251 struct fha_ctls { int enable; @@ -62,6 +58,7 @@ struct fha_ctls { * avoid contention between threads over single files. */ struct fha_hash_entry { + struct mtx *mtx; LIST_ENTRY(fha_hash_entry) link; u_int64_t fh; u_int32_t num_rw; @@ -72,6 +69,11 @@ struct fha_hash_entry { LIST_HEAD(fha_hash_entry_list, fha_hash_entry); +struct fha_hash_slot { + struct fha_hash_entry_list list; + struct mtx mtx; +}; + /* A structure used for passing around data internally. */ struct fha_info { u_int64_t fh; @@ -93,7 +95,7 @@ struct fha_callbacks { }; struct fha_params { - struct fha_global g_fha; + struct fha_hash_slot fha_hash[FHA_HASH_SIZE]; struct sysctl_ctx_list sysctl_ctx; struct sysctl_oid *sysctl_tree; struct fha_ctls ctls; diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c index f63300d..b42bafb 100644 --- a/sys/rpc/svc.c +++ b/sys/rpc/svc.c @@ -71,6 +71,8 @@ static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, char *); static void svc_new_thread(SVCPOOL *pool); static void xprt_unregister_locked(SVCXPRT *xprt); +static void svc_change_space_used(SVCPOOL *pool, int delta); +static bool_t svc_request_space_available(SVCPOOL *pool); /* *************** SVCXPRT related stuff **************** */ @@ -373,7 +375,8 @@ xprt_active(SVCXPRT *xprt) if (!xprt->xp_active) { xprt->xp_active = TRUE; if (xprt->xp_thread == NULL) { - if (!xprt_assignthread(xprt)) + if (!svc_request_space_available(pool) || + !xprt_assignthread(xprt)) TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink); } @@ -965,56 +968,63 @@ svc_assign_waiting_sockets(SVCPOOL *pool) { SVCXPRT *xprt; + mtx_lock(&pool->sp_lock); while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) { if (xprt_assignthread(xprt)) TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); else break; } + mtx_unlock(&pool->sp_lock); } -static bool_t -svc_request_space_available(SVCPOOL *pool) +static void +svc_change_space_used(SVCPOOL *pool, int delta) { + unsigned int value; - mtx_assert(&pool->sp_lock, MA_OWNED); - - if (pool->sp_space_throttled) { - /* - * Below the low-water yet? If so, assign any waiting sockets. - */ - if (pool->sp_space_used < pool->sp_space_low) { - pool->sp_space_throttled = FALSE; - svc_assign_waiting_sockets(pool); - return TRUE; - } - - return FALSE; - } else { - if (pool->sp_space_used - >= pool->sp_space_high) { + value = atomic_fetchadd_int(&pool->sp_space_used, delta) + delta; + if (delta > 0) { + if (value >= pool->sp_space_high && !pool->sp_space_throttled) { pool->sp_space_throttled = TRUE; pool->sp_space_throttle_count++; - return FALSE; } - - return TRUE; + if (value > pool->sp_space_used_highest) + pool->sp_space_used_highest = value; + } else { + if (value < pool->sp_space_low && pool->sp_space_throttled) { + pool->sp_space_throttled = FALSE; + svc_assign_waiting_sockets(pool); + } } } +static bool_t +svc_request_space_available(SVCPOOL *pool) +{ + + if (pool->sp_space_throttled) + return (FALSE); + return (TRUE); +} + static void svc_run_internal(SVCPOOL *pool, bool_t ismaster) { + struct svc_reqlist reqs; SVCTHREAD *st, *stpref; SVCXPRT *xprt; enum xprt_stat stat; struct svc_req *rqstp; + size_t sz; int error; st = mem_alloc(sizeof(*st)); + st->st_pool = pool; st->st_xprt = NULL; STAILQ_INIT(&st->st_reqs); cv_init(&st->st_cond, "rpcsvc"); + STAILQ_INIT(&reqs); mtx_lock(&pool->sp_lock); LIST_INSERT_HEAD(&pool->sp_threads, st, st_link); @@ -1108,15 +1118,14 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) * RPCs. */ xprt->xp_lastactive = time_uptime; - stat = XPRT_IDLE; do { + mtx_unlock(&pool->sp_lock); if (!svc_request_space_available(pool)) break; rqstp = NULL; - mtx_unlock(&pool->sp_lock); stat = svc_getreq(xprt, &rqstp); - mtx_lock(&pool->sp_lock); if (rqstp) { + svc_change_space_used(pool, rqstp->rq_size); /* * See if the application has * a preference for some other @@ -1126,17 +1135,12 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) if (pool->sp_assign) stpref = pool->sp_assign(st, rqstp); + else + mtx_lock(&pool->sp_lock); - pool->sp_space_used += - rqstp->rq_size; - if (pool->sp_space_used - > pool->sp_space_used_highest) - pool->sp_space_used_highest = - pool->sp_space_used; rqstp->rq_thread = stpref; STAILQ_INSERT_TAIL(&stpref->st_reqs, rqstp, rq_link); - stpref->st_reqcount++; /* * If we assigned the request @@ -1156,7 +1160,8 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) stpref->st_idle = FALSE; cv_signal(&stpref->st_cond); } - } + } else + mtx_lock(&pool->sp_lock); } while (stat == XPRT_MOREREQS && pool->sp_state != SVCPOOL_CLOSING); @@ -1171,25 +1176,30 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster) xprt->xp_thread = NULL; st->st_xprt = NULL; if (xprt->xp_active) { - if (!xprt_assignthread(xprt)) + if (!svc_request_space_available(pool) || + !xprt_assignthread(xprt)) TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink); } + STAILQ_CONCAT(&reqs, &st->st_reqs); mtx_unlock(&pool->sp_lock); SVC_RELEASE(xprt); - mtx_lock(&pool->sp_lock); + } else { + STAILQ_CONCAT(&reqs, &st->st_reqs); + mtx_unlock(&pool->sp_lock); } /* * Execute what we have queued. */ - while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { - size_t sz = rqstp->rq_size; - mtx_unlock(&pool->sp_lock); + sz = 0; + while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) { + STAILQ_REMOVE_HEAD(&reqs, rq_link); + sz += rqstp->rq_size; svc_executereq(rqstp); - mtx_lock(&pool->sp_lock); - pool->sp_space_used -= sz; } + svc_change_space_used(pool, -sz); + mtx_lock(&pool->sp_lock); } if (st->st_xprt) { @@ -1309,24 +1319,13 @@ void svc_freereq(struct svc_req *rqstp) { SVCTHREAD *st; - SVCXPRT *xprt; SVCPOOL *pool; st = rqstp->rq_thread; - xprt = rqstp->rq_xprt; - if (xprt) - pool = xprt->xp_pool; - else - pool = NULL; if (st) { - mtx_lock(&pool->sp_lock); - KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs), - ("Freeing request out of order")); - STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); - st->st_reqcount--; + pool = st->st_pool; if (pool->sp_done) pool->sp_done(st, rqstp); - mtx_unlock(&pool->sp_lock); } if (rqstp->rq_auth.svc_ah_ops) diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h index 051ebfb..7d9428e 100644 --- a/sys/rpc/svc.h +++ b/sys/rpc/svc.h @@ -275,14 +275,16 @@ STAILQ_HEAD(svc_reqlist, svc_req); * thread to read and execute pending RPCs. */ typedef struct __rpc_svcthread { + struct __rpc_svcpool *st_pool; SVCXPRT *st_xprt; /* transport we are processing */ struct svc_reqlist st_reqs; /* RPC requests to execute */ - int st_reqcount; /* number of queued reqs */ int st_idle; /* thread is on idle list */ struct cv st_cond; /* sleeping for work */ LIST_ENTRY(__rpc_svcthread) st_link; /* all threads list */ LIST_ENTRY(__rpc_svcthread) st_ilink; /* idle threads list */ LIST_ENTRY(__rpc_svcthread) st_alink; /* application thread list */ + int st_p2; /* application workspace */ + uint64_t st_p3; /* application workspace */ } SVCTHREAD; LIST_HEAD(svcthread_list, __rpc_svcthread); |