summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sys/nfs/nfs_fha.c138
-rw-r--r--sys/nfs/nfs_fha.h14
-rw-r--r--sys/rpc/svc.c103
-rw-r--r--sys/rpc/svc.h4
4 files changed, 130 insertions, 129 deletions
diff --git a/sys/nfs/nfs_fha.c b/sys/nfs/nfs_fha.c
index 1892729..2b29421 100644
--- a/sys/nfs/nfs_fha.c
+++ b/sys/nfs/nfs_fha.c
@@ -52,13 +52,10 @@ void
fha_init(struct fha_params *softc)
{
char tmpstr[128];
+ int i;
- /*
- * A small hash table to map filehandles to fha_hash_entry
- * structures.
- */
- softc->g_fha.hashtable = hashinit(256, M_NFS_FHA,
- &softc->g_fha.hashmask);
+ for (i = 0; i < FHA_HASH_SIZE; i++)
+ mtx_init(&softc->fha_hash[i].mtx, "fhalock", NULL, MTX_DEF);
/*
* Set the default tuning parameters.
@@ -117,8 +114,11 @@ fha_init(struct fha_params *softc)
void
fha_uninit(struct fha_params *softc)
{
+ int i;
+
sysctl_ctx_free(&softc->sysctl_ctx);
- hashdestroy(softc->g_fha.hashtable, M_NFS_FHA, softc->g_fha.hashmask);
+ for (i = 0; i < FHA_HASH_SIZE; i++)
+ mtx_destroy(&softc->fha_hash[i].mtx);
}
/*
@@ -207,8 +207,13 @@ static void
fha_hash_entry_destroy(struct fha_hash_entry *e)
{
- if (e->num_rw + e->num_exclusive)
- panic("nonempty fhe");
+ mtx_assert(e->mtx, MA_OWNED);
+ KASSERT(e->num_rw == 0,
+ ("%d reqs on destroyed fhe %p", e->num_rw, e));
+ KASSERT(e->num_exclusive == 0,
+ ("%d exclusive reqs on destroyed fhe %p", e->num_exclusive, e));
+ KASSERT(e->num_threads == 0,
+ ("%d threads on destroyed fhe %p", e->num_threads, e));
free(e, M_NFS_FHA);
}
@@ -216,6 +221,7 @@ static void
fha_hash_entry_remove(struct fha_hash_entry *e)
{
+ mtx_assert(e->mtx, MA_OWNED);
LIST_REMOVE(e, link);
fha_hash_entry_destroy(e);
}
@@ -224,36 +230,22 @@ static struct fha_hash_entry *
fha_hash_entry_lookup(struct fha_params *softc, u_int64_t fh)
{
SVCPOOL *pool;
-
- pool = *softc->pool;
-
+ struct fha_hash_slot *fhs;
struct fha_hash_entry *fhe, *new_fhe;
- LIST_FOREACH(fhe, &softc->g_fha.hashtable[fh % softc->g_fha.hashmask],
- link)
+ pool = *softc->pool;
+ fhs = &softc->fha_hash[fh % FHA_HASH_SIZE];
+ new_fhe = fha_hash_entry_new(fh);
+ new_fhe->mtx = &fhs->mtx;
+ mtx_lock(&fhs->mtx);
+ LIST_FOREACH(fhe, &fhs->list, link)
if (fhe->fh == fh)
break;
-
if (!fhe) {
- /* Allocate a new entry. */
- mtx_unlock(&pool->sp_lock);
- new_fhe = fha_hash_entry_new(fh);
- mtx_lock(&pool->sp_lock);
-
- /* Double-check to make sure we still need the new entry. */
- LIST_FOREACH(fhe,
- &softc->g_fha.hashtable[fh % softc->g_fha.hashmask], link)
- if (fhe->fh == fh)
- break;
- if (!fhe) {
- fhe = new_fhe;
- LIST_INSERT_HEAD(
- &softc->g_fha.hashtable[fh % softc->g_fha.hashmask],
- fhe, link);
- } else
- fha_hash_entry_destroy(new_fhe);
- }
-
+ fhe = new_fhe;
+ LIST_INSERT_HEAD(&fhs->list, fhe, link);
+ } else
+ fha_hash_entry_destroy(new_fhe);
return (fhe);
}
@@ -261,6 +253,8 @@ static void
fha_hash_entry_add_thread(struct fha_hash_entry *fhe, SVCTHREAD *thread)
{
+ mtx_assert(fhe->mtx, MA_OWNED);
+ thread->st_p2 = 0;
LIST_INSERT_HEAD(&fhe->threads, thread, st_alink);
fhe->num_threads++;
}
@@ -269,6 +263,9 @@ static void
fha_hash_entry_remove_thread(struct fha_hash_entry *fhe, SVCTHREAD *thread)
{
+ mtx_assert(fhe->mtx, MA_OWNED);
+ KASSERT(thread->st_p2 == 0,
+ ("%d reqs on removed thread %p", thread->st_p2, thread));
LIST_REMOVE(thread, st_alink);
fhe->num_threads--;
}
@@ -280,6 +277,7 @@ static void
fha_hash_entry_add_op(struct fha_hash_entry *fhe, int locktype, int count)
{
+ mtx_assert(fhe->mtx, MA_OWNED);
if (LK_EXCLUSIVE == locktype)
fhe->num_exclusive += count;
else
@@ -306,7 +304,7 @@ fha_hash_entry_choose_thread(struct fha_params *softc,
pool = *softc->pool;
LIST_FOREACH(thread, &fhe->threads, st_alink) {
- req_count = thread->st_reqcount;
+ req_count = thread->st_p2;
/* If there are any writes in progress, use the first thread. */
if (fhe->num_exclusive) {
@@ -322,7 +320,7 @@ fha_hash_entry_choose_thread(struct fha_params *softc,
* exceed our per-thread load limit in the process.
*/
offset1 = i->offset;
- offset2 = STAILQ_FIRST(&thread->st_reqs)->rq_p3;
+ offset2 = thread->st_p3;
if (((offset1 >= offset2)
&& ((offset1 - offset2) < (1 << softc->ctls.bin_shift)))
@@ -360,28 +358,11 @@ fha_hash_entry_choose_thread(struct fha_params *softc,
*/
if ((softc->ctls.max_nfsds_per_fh == 0) ||
(fhe->num_threads < softc->ctls.max_nfsds_per_fh)) {
- /*
- * We can add a new thread, so try for an idle thread
- * first, and fall back to this_thread if none are idle.
- */
- if (STAILQ_EMPTY(&this_thread->st_reqs)) {
- thread = this_thread;
+ thread = this_thread;
#if 0
- ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO,
- "fha: %p(%d)t", thread, thread->st_reqcount);
-#endif
- } else if ((thread = LIST_FIRST(&pool->sp_idlethreads))) {
-#if 0
- ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO,
- "fha: %p(%d)i", thread, thread->st_reqcount);
-#endif
- } else {
- thread = this_thread;
-#if 0
- ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO,
- "fha: %p(%d)b", thread, thread->st_reqcount);
+ ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO,
+ "fha: %p(%d)t", thread, thread->st_p2);
#endif
- }
fha_hash_entry_add_thread(fhe, thread);
} else {
/*
@@ -411,16 +392,16 @@ fha_assign(SVCTHREAD *this_thread, struct svc_req *req,
/* Check to see whether we're enabled. */
if (softc->ctls.enable == 0)
- return (this_thread);
+ goto thist;
/*
* Only do placement if this is an NFS request.
*/
if (req->rq_prog != NFS_PROG)
- return (this_thread);
+ goto thist;
if (req->rq_vers != 2 && req->rq_vers != 3)
- return (this_thread);
+ goto thist;
fha_extract_info(req, &i, cb);
@@ -440,8 +421,21 @@ fha_assign(SVCTHREAD *this_thread, struct svc_req *req,
thread = fha_hash_entry_choose_thread(softc, fhe, &i, this_thread);
KASSERT(thread, ("fha_assign: NULL thread!"));
fha_hash_entry_add_op(fhe, i.locktype, 1);
+ thread->st_p2++;
+ thread->st_p3 = i.offset;
+
+ /*
+ * Grab the pool lock here to not let chosen thread go away before
+ * the new request inserted to its queue while we drop fhe lock.
+ */
+ mtx_lock(&(*softc->pool)->sp_lock);
+ mtx_unlock(fhe->mtx);
return (thread);
+thist:
+ req->rq_p1 = NULL;
+ mtx_lock(&(*softc->pool)->sp_lock);
+ return (this_thread);
}
/*
@@ -452,6 +446,7 @@ void
fha_nd_complete(SVCTHREAD *thread, struct svc_req *req)
{
struct fha_hash_entry *fhe = req->rq_p1;
+ struct mtx *mtx;
/*
* This may be called for reqs that didn't go through
@@ -460,13 +455,18 @@ fha_nd_complete(SVCTHREAD *thread, struct svc_req *req)
if (!fhe)
return;
+ mtx = fhe->mtx;
+ mtx_lock(mtx);
fha_hash_entry_add_op(fhe, req->rq_p2, -1);
-
- if (thread->st_reqcount == 0) {
+ thread->st_p2--;
+ KASSERT(thread->st_p2 >= 0, ("Negative request count %d on %p",
+ thread->st_p2, thread));
+ if (thread->st_p2 == 0) {
fha_hash_entry_remove_thread(fhe, thread);
if (0 == fhe->num_rw + fhe->num_exclusive)
fha_hash_entry_remove(fhe);
}
+ mtx_unlock(mtx);
}
int
@@ -489,10 +489,9 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, struct fha_params *softc)
}
pool = *softc->pool;
- mtx_lock(&pool->sp_lock);
count = 0;
- for (i = 0; i <= softc->g_fha.hashmask; i++)
- if (!LIST_EMPTY(&softc->g_fha.hashtable[i]))
+ for (i = 0; i < FHA_HASH_SIZE; i++)
+ if (!LIST_EMPTY(&softc->fha_hash[i].list))
count++;
if (count == 0) {
@@ -500,8 +499,9 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, struct fha_params *softc)
goto out;
}
- for (i = 0; i <= softc->g_fha.hashmask; i++) {
- LIST_FOREACH(fhe, &softc->g_fha.hashtable[i], link) {
+ for (i = 0; i < FHA_HASH_SIZE; i++) {
+ mtx_lock(&softc->fha_hash[i].mtx);
+ LIST_FOREACH(fhe, &softc->fha_hash[i].list, link) {
sbuf_printf(&sb, "%sfhe %p: {\n", first ? "" : ", ", fhe);
sbuf_printf(&sb, " fh: %ju\n", (uintmax_t) fhe->fh);
@@ -512,8 +512,7 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, struct fha_params *softc)
LIST_FOREACH(thread, &fhe->threads, st_alink) {
sbuf_printf(&sb, " thread %p offset %ju "
"(count %d)\n", thread,
- STAILQ_FIRST(&thread->st_reqs)->rq_p3,
- thread->st_reqcount);
+ thread->st_p3, thread->st_p2);
}
sbuf_printf(&sb, "}");
@@ -525,11 +524,10 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, struct fha_params *softc)
break;
}
}
+ mtx_unlock(&softc->fha_hash[i].mtx);
}
out:
- if (pool)
- mtx_unlock(&pool->sp_lock);
sbuf_trim(&sb);
sbuf_finish(&sb);
error = sysctl_handle_string(oidp, sbuf_data(&sb), sbuf_len(&sb), req);
diff --git a/sys/nfs/nfs_fha.h b/sys/nfs/nfs_fha.h
index d3dd039..58aa7fe 100644
--- a/sys/nfs/nfs_fha.h
+++ b/sys/nfs/nfs_fha.h
@@ -35,11 +35,7 @@
#define FHA_DEF_MAX_NFSDS_PER_FH 8
#define FHA_DEF_MAX_REQS_PER_NFSD 0 /* Unlimited */
-/* This is the global structure that represents the state of the fha system. */
-struct fha_global {
- struct fha_hash_entry_list *hashtable;
- u_long hashmask;
-};
+#define FHA_HASH_SIZE 251
struct fha_ctls {
int enable;
@@ -62,6 +58,7 @@ struct fha_ctls {
* avoid contention between threads over single files.
*/
struct fha_hash_entry {
+ struct mtx *mtx;
LIST_ENTRY(fha_hash_entry) link;
u_int64_t fh;
u_int32_t num_rw;
@@ -72,6 +69,11 @@ struct fha_hash_entry {
LIST_HEAD(fha_hash_entry_list, fha_hash_entry);
+struct fha_hash_slot {
+ struct fha_hash_entry_list list;
+ struct mtx mtx;
+};
+
/* A structure used for passing around data internally. */
struct fha_info {
u_int64_t fh;
@@ -93,7 +95,7 @@ struct fha_callbacks {
};
struct fha_params {
- struct fha_global g_fha;
+ struct fha_hash_slot fha_hash[FHA_HASH_SIZE];
struct sysctl_ctx_list sysctl_ctx;
struct sysctl_oid *sysctl_tree;
struct fha_ctls ctls;
diff --git a/sys/rpc/svc.c b/sys/rpc/svc.c
index f63300d..b42bafb 100644
--- a/sys/rpc/svc.c
+++ b/sys/rpc/svc.c
@@ -71,6 +71,8 @@ static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
char *);
static void svc_new_thread(SVCPOOL *pool);
static void xprt_unregister_locked(SVCXPRT *xprt);
+static void svc_change_space_used(SVCPOOL *pool, int delta);
+static bool_t svc_request_space_available(SVCPOOL *pool);
/* *************** SVCXPRT related stuff **************** */
@@ -373,7 +375,8 @@ xprt_active(SVCXPRT *xprt)
if (!xprt->xp_active) {
xprt->xp_active = TRUE;
if (xprt->xp_thread == NULL) {
- if (!xprt_assignthread(xprt))
+ if (!svc_request_space_available(pool) ||
+ !xprt_assignthread(xprt))
TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
xp_alink);
}
@@ -965,56 +968,63 @@ svc_assign_waiting_sockets(SVCPOOL *pool)
{
SVCXPRT *xprt;
+ mtx_lock(&pool->sp_lock);
while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
if (xprt_assignthread(xprt))
TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
else
break;
}
+ mtx_unlock(&pool->sp_lock);
}
-static bool_t
-svc_request_space_available(SVCPOOL *pool)
+static void
+svc_change_space_used(SVCPOOL *pool, int delta)
{
+ unsigned int value;
- mtx_assert(&pool->sp_lock, MA_OWNED);
-
- if (pool->sp_space_throttled) {
- /*
- * Below the low-water yet? If so, assign any waiting sockets.
- */
- if (pool->sp_space_used < pool->sp_space_low) {
- pool->sp_space_throttled = FALSE;
- svc_assign_waiting_sockets(pool);
- return TRUE;
- }
-
- return FALSE;
- } else {
- if (pool->sp_space_used
- >= pool->sp_space_high) {
+ value = atomic_fetchadd_int(&pool->sp_space_used, delta) + delta;
+ if (delta > 0) {
+ if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
pool->sp_space_throttled = TRUE;
pool->sp_space_throttle_count++;
- return FALSE;
}
-
- return TRUE;
+ if (value > pool->sp_space_used_highest)
+ pool->sp_space_used_highest = value;
+ } else {
+ if (value < pool->sp_space_low && pool->sp_space_throttled) {
+ pool->sp_space_throttled = FALSE;
+ svc_assign_waiting_sockets(pool);
+ }
}
}
+static bool_t
+svc_request_space_available(SVCPOOL *pool)
+{
+
+ if (pool->sp_space_throttled)
+ return (FALSE);
+ return (TRUE);
+}
+
static void
svc_run_internal(SVCPOOL *pool, bool_t ismaster)
{
+ struct svc_reqlist reqs;
SVCTHREAD *st, *stpref;
SVCXPRT *xprt;
enum xprt_stat stat;
struct svc_req *rqstp;
+ size_t sz;
int error;
st = mem_alloc(sizeof(*st));
+ st->st_pool = pool;
st->st_xprt = NULL;
STAILQ_INIT(&st->st_reqs);
cv_init(&st->st_cond, "rpcsvc");
+ STAILQ_INIT(&reqs);
mtx_lock(&pool->sp_lock);
LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
@@ -1108,15 +1118,14 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
* RPCs.
*/
xprt->xp_lastactive = time_uptime;
- stat = XPRT_IDLE;
do {
+ mtx_unlock(&pool->sp_lock);
if (!svc_request_space_available(pool))
break;
rqstp = NULL;
- mtx_unlock(&pool->sp_lock);
stat = svc_getreq(xprt, &rqstp);
- mtx_lock(&pool->sp_lock);
if (rqstp) {
+ svc_change_space_used(pool, rqstp->rq_size);
/*
* See if the application has
* a preference for some other
@@ -1126,17 +1135,12 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
if (pool->sp_assign)
stpref = pool->sp_assign(st,
rqstp);
+ else
+ mtx_lock(&pool->sp_lock);
- pool->sp_space_used +=
- rqstp->rq_size;
- if (pool->sp_space_used
- > pool->sp_space_used_highest)
- pool->sp_space_used_highest =
- pool->sp_space_used;
rqstp->rq_thread = stpref;
STAILQ_INSERT_TAIL(&stpref->st_reqs,
rqstp, rq_link);
- stpref->st_reqcount++;
/*
* If we assigned the request
@@ -1156,7 +1160,8 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
stpref->st_idle = FALSE;
cv_signal(&stpref->st_cond);
}
- }
+ } else
+ mtx_lock(&pool->sp_lock);
} while (stat == XPRT_MOREREQS
&& pool->sp_state != SVCPOOL_CLOSING);
@@ -1171,25 +1176,30 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
xprt->xp_thread = NULL;
st->st_xprt = NULL;
if (xprt->xp_active) {
- if (!xprt_assignthread(xprt))
+ if (!svc_request_space_available(pool) ||
+ !xprt_assignthread(xprt))
TAILQ_INSERT_TAIL(&pool->sp_active,
xprt, xp_alink);
}
+ STAILQ_CONCAT(&reqs, &st->st_reqs);
mtx_unlock(&pool->sp_lock);
SVC_RELEASE(xprt);
- mtx_lock(&pool->sp_lock);
+ } else {
+ STAILQ_CONCAT(&reqs, &st->st_reqs);
+ mtx_unlock(&pool->sp_lock);
}
/*
* Execute what we have queued.
*/
- while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
- size_t sz = rqstp->rq_size;
- mtx_unlock(&pool->sp_lock);
+ sz = 0;
+ while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) {
+ STAILQ_REMOVE_HEAD(&reqs, rq_link);
+ sz += rqstp->rq_size;
svc_executereq(rqstp);
- mtx_lock(&pool->sp_lock);
- pool->sp_space_used -= sz;
}
+ svc_change_space_used(pool, -sz);
+ mtx_lock(&pool->sp_lock);
}
if (st->st_xprt) {
@@ -1309,24 +1319,13 @@ void
svc_freereq(struct svc_req *rqstp)
{
SVCTHREAD *st;
- SVCXPRT *xprt;
SVCPOOL *pool;
st = rqstp->rq_thread;
- xprt = rqstp->rq_xprt;
- if (xprt)
- pool = xprt->xp_pool;
- else
- pool = NULL;
if (st) {
- mtx_lock(&pool->sp_lock);
- KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
- ("Freeing request out of order"));
- STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
- st->st_reqcount--;
+ pool = st->st_pool;
if (pool->sp_done)
pool->sp_done(st, rqstp);
- mtx_unlock(&pool->sp_lock);
}
if (rqstp->rq_auth.svc_ah_ops)
diff --git a/sys/rpc/svc.h b/sys/rpc/svc.h
index 051ebfb..7d9428e 100644
--- a/sys/rpc/svc.h
+++ b/sys/rpc/svc.h
@@ -275,14 +275,16 @@ STAILQ_HEAD(svc_reqlist, svc_req);
* thread to read and execute pending RPCs.
*/
typedef struct __rpc_svcthread {
+ struct __rpc_svcpool *st_pool;
SVCXPRT *st_xprt; /* transport we are processing */
struct svc_reqlist st_reqs; /* RPC requests to execute */
- int st_reqcount; /* number of queued reqs */
int st_idle; /* thread is on idle list */
struct cv st_cond; /* sleeping for work */
LIST_ENTRY(__rpc_svcthread) st_link; /* all threads list */
LIST_ENTRY(__rpc_svcthread) st_ilink; /* idle threads list */
LIST_ENTRY(__rpc_svcthread) st_alink; /* application thread list */
+ int st_p2; /* application workspace */
+ uint64_t st_p3; /* application workspace */
} SVCTHREAD;
LIST_HEAD(svcthread_list, __rpc_svcthread);
OpenPOWER on IntegriCloud