diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2016-03-22 13:16:21 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2016-03-22 13:16:21 -0700 |
commit | 01cde1538e1dff4254e340f606177a870131a01f (patch) | |
tree | 474c049c444e885ff83e808c36a94cceab5bf8a3 | |
parent | 243d50678583100855862bc084b8b307eea67f68 (diff) | |
parent | f35592a97460f57d1089fd674176e9f34ba311f2 (diff) | |
download | op-kernel-dev-01cde1538e1dff4254e340f606177a870131a01f.zip op-kernel-dev-01cde1538e1dff4254e340f606177a870131a01f.tar.gz |
Merge tag 'nfs-for-4.6-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust:
"Highlights include:
Features:
- Add support for multiple NFSv4.1 callbacks in flight
- Initial patchset for RPC multipath support
- Adapt RPC/RDMA to use the new completion queue API
Bugfixes and cleanups:
- nfs4: nfs4_ff_layout_prepare_ds should return NULL if connection failed
- Cleanups to remove nfs_inode_dio_wait and nfs4_file_fsync
- Fix RPC/RDMA credit accounting
- Properly handle RDMA_ERROR replies
- xprtrdma: Do not wait if ib_post_send() fails
- xprtrdma: Segment head and tail XDR buffers on page boundaries
- xprtrdma cleanups for dprintk, physical_op_map and unused macros"
* tag 'nfs-for-4.6-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (35 commits)
nfs/blocklayout: make sure making a aligned read request
nfs4: nfs4_ff_layout_prepare_ds should return NULL if connection failed
nfs: remove nfs_inode_dio_wait
nfs: remove nfs4_file_fsync
xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs
xprtrdma: Use an anonymous union in struct rpcrdma_mw
xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs
xprtrdma: Serialize credit accounting again
xprtrdma: Properly handle RDMA_ERROR replies
rpcrdma: Add RPCRDMA_HDRLEN_ERR
xprtrdma: Do not wait if ib_post_send() fails
xprtrdma: Segment head and tail XDR buffers on page boundaries
xprtrdma: Clean up dprintk format string containing a newline
xprtrdma: Clean up physical_op_map()
xprtrdma: Clean up unused RPCRDMA_INLINE_PAD_THRESH macro
NFS add callback_ops to nfs4_proc_bind_conn_to_session_callback
pnfs/NFSv4.1: Add multipath capabilities to pNFS flexfiles servers over NFSv3
SUNRPC: Allow addition of new transports to a struct rpc_clnt
NFSv4.1: nfs4_proc_bind_conn_to_session must iterate over all connections
SUNRPC: Make NFS swap work with multipath
...
33 files changed, 1327 insertions, 487 deletions
diff --git a/fs/nfs/blocklayout/blocklayout.c b/fs/nfs/blocklayout/blocklayout.c index ddd0138..8bc870e 100644 --- a/fs/nfs/blocklayout/blocklayout.c +++ b/fs/nfs/blocklayout/blocklayout.c @@ -743,7 +743,7 @@ bl_set_layoutdriver(struct nfs_server *server, const struct nfs_fh *fh) static bool is_aligned_req(struct nfs_pageio_descriptor *pgio, - struct nfs_page *req, unsigned int alignment) + struct nfs_page *req, unsigned int alignment, bool is_write) { /* * Always accept buffered writes, higher layers take care of the @@ -758,7 +758,8 @@ is_aligned_req(struct nfs_pageio_descriptor *pgio, if (IS_ALIGNED(req->wb_bytes, alignment)) return true; - if (req_offset(req) + req->wb_bytes == i_size_read(pgio->pg_inode)) { + if (is_write && + (req_offset(req) + req->wb_bytes == i_size_read(pgio->pg_inode))) { /* * If the write goes up to the inode size, just write * the full page. Data past the inode size is @@ -775,7 +776,7 @@ is_aligned_req(struct nfs_pageio_descriptor *pgio, static void bl_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *req) { - if (!is_aligned_req(pgio, req, SECTOR_SIZE)) { + if (!is_aligned_req(pgio, req, SECTOR_SIZE, false)) { nfs_pageio_reset_read_mds(pgio); return; } @@ -791,7 +792,7 @@ static size_t bl_pg_test_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev, struct nfs_page *req) { - if (!is_aligned_req(pgio, req, SECTOR_SIZE)) + if (!is_aligned_req(pgio, req, SECTOR_SIZE, false)) return 0; return pnfs_generic_pg_test(pgio, prev, req); } @@ -824,7 +825,7 @@ bl_pg_init_write(struct nfs_pageio_descriptor *pgio, struct nfs_page *req) { u64 wb_size; - if (!is_aligned_req(pgio, req, PAGE_SIZE)) { + if (!is_aligned_req(pgio, req, PAGE_SIZE, true)) { nfs_pageio_reset_write_mds(pgio); return; } @@ -846,7 +847,7 @@ static size_t bl_pg_test_write(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev, struct nfs_page *req) { - if (!is_aligned_req(pgio, req, PAGE_SIZE)) + if (!is_aligned_req(pgio, req, PAGE_SIZE, true)) return 0; return pnfs_generic_pg_test(pgio, prev, req); } diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h index ff8195b..5fe1cec 100644 --- a/fs/nfs/callback.h +++ b/fs/nfs/callback.h @@ -37,10 +37,11 @@ enum nfs4_callback_opnum { OP_CB_ILLEGAL = 10044, }; +struct nfs4_slot; struct cb_process_state { __be32 drc_status; struct nfs_client *clp; - u32 slotid; + struct nfs4_slot *slot; u32 minorversion; struct net *net; }; diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c index f0939d0..618ced3 100644 --- a/fs/nfs/callback_proc.c +++ b/fs/nfs/callback_proc.c @@ -354,47 +354,38 @@ out: * a single outstanding callback request at a time. */ static __be32 -validate_seqid(struct nfs4_slot_table *tbl, struct cb_sequenceargs * args) +validate_seqid(const struct nfs4_slot_table *tbl, const struct nfs4_slot *slot, + const struct cb_sequenceargs * args) { - struct nfs4_slot *slot; - - dprintk("%s enter. slotid %u seqid %u\n", - __func__, args->csa_slotid, args->csa_sequenceid); + dprintk("%s enter. slotid %u seqid %u, slot table seqid: %u\n", + __func__, args->csa_slotid, args->csa_sequenceid, slot->seq_nr); - if (args->csa_slotid >= NFS41_BC_MAX_CALLBACKS) + if (args->csa_slotid > tbl->server_highest_slotid) return htonl(NFS4ERR_BADSLOT); - slot = tbl->slots + args->csa_slotid; - dprintk("%s slot table seqid: %u\n", __func__, slot->seq_nr); - - /* Normal */ - if (likely(args->csa_sequenceid == slot->seq_nr + 1)) - goto out_ok; - /* Replay */ if (args->csa_sequenceid == slot->seq_nr) { dprintk("%s seqid %u is a replay\n", __func__, args->csa_sequenceid); + if (nfs4_test_locked_slot(tbl, slot->slot_nr)) + return htonl(NFS4ERR_DELAY); /* Signal process_op to set this error on next op */ if (args->csa_cachethis == 0) return htonl(NFS4ERR_RETRY_UNCACHED_REP); - /* The ca_maxresponsesize_cached is 0 with no DRC */ - else if (args->csa_cachethis == 1) - return htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE); + /* Liar! We never allowed you to set csa_cachethis != 0 */ + return htonl(NFS4ERR_SEQ_FALSE_RETRY); } /* Wraparound */ - if (args->csa_sequenceid == 1 && (slot->seq_nr + 1) == 0) { - slot->seq_nr = 1; - goto out_ok; - } + if (unlikely(slot->seq_nr == 0xFFFFFFFFU)) { + if (args->csa_sequenceid == 1) + return htonl(NFS4_OK); + } else if (likely(args->csa_sequenceid == slot->seq_nr + 1)) + return htonl(NFS4_OK); /* Misordered request */ return htonl(NFS4ERR_SEQ_MISORDERED); -out_ok: - tbl->highest_used_slotid = args->csa_slotid; - return htonl(NFS4_OK); } /* @@ -473,6 +464,12 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args, tbl = &clp->cl_session->bc_slot_table; slot = tbl->slots + args->csa_slotid; + /* Set up res before grabbing the spinlock */ + memcpy(&res->csr_sessionid, &args->csa_sessionid, + sizeof(res->csr_sessionid)); + res->csr_sequenceid = args->csa_sequenceid; + res->csr_slotid = args->csa_slotid; + spin_lock(&tbl->slot_tbl_lock); /* state manager is resetting the session */ if (test_bit(NFS4_SLOT_TBL_DRAINING, &tbl->slot_tbl_state)) { @@ -485,18 +482,26 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args, goto out_unlock; } - memcpy(&res->csr_sessionid, &args->csa_sessionid, - sizeof(res->csr_sessionid)); - res->csr_sequenceid = args->csa_sequenceid; - res->csr_slotid = args->csa_slotid; - res->csr_highestslotid = NFS41_BC_MAX_CALLBACKS - 1; - res->csr_target_highestslotid = NFS41_BC_MAX_CALLBACKS - 1; + status = htonl(NFS4ERR_BADSLOT); + slot = nfs4_lookup_slot(tbl, args->csa_slotid); + if (IS_ERR(slot)) + goto out_unlock; + + res->csr_highestslotid = tbl->server_highest_slotid; + res->csr_target_highestslotid = tbl->target_highest_slotid; - status = validate_seqid(tbl, args); + status = validate_seqid(tbl, slot, args); if (status) goto out_unlock; + if (!nfs4_try_to_lock_slot(tbl, slot)) { + status = htonl(NFS4ERR_DELAY); + goto out_unlock; + } + cps->slot = slot; - cps->slotid = args->csa_slotid; + /* The ca_maxresponsesize_cached is 0 with no DRC */ + if (args->csa_cachethis != 0) + return htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE); /* * Check for pending referring calls. If a match is found, a @@ -513,7 +518,7 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args, * If CB_SEQUENCE returns an error, then the state of the slot * (sequence ID, cached reply) MUST NOT change. */ - slot->seq_nr++; + slot->seq_nr = args->csa_sequenceid; out_unlock: spin_unlock(&tbl->slot_tbl_lock); diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c index 646cdac..976c906 100644 --- a/fs/nfs/callback_xdr.c +++ b/fs/nfs/callback_xdr.c @@ -752,7 +752,8 @@ preprocess_nfs41_op(int nop, unsigned int op_nr, struct callback_op **op) return htonl(NFS_OK); } -static void nfs4_callback_free_slot(struct nfs4_session *session) +static void nfs4_callback_free_slot(struct nfs4_session *session, + struct nfs4_slot *slot) { struct nfs4_slot_table *tbl = &session->bc_slot_table; @@ -761,15 +762,17 @@ static void nfs4_callback_free_slot(struct nfs4_session *session) * Let the state manager know callback processing done. * A single slot, so highest used slotid is either 0 or -1 */ - tbl->highest_used_slotid = NFS4_NO_SLOT; + nfs4_free_slot(tbl, slot); nfs4_slot_tbl_drain_complete(tbl); spin_unlock(&tbl->slot_tbl_lock); } static void nfs4_cb_free_slot(struct cb_process_state *cps) { - if (cps->slotid != NFS4_NO_SLOT) - nfs4_callback_free_slot(cps->clp->cl_session); + if (cps->slot) { + nfs4_callback_free_slot(cps->clp->cl_session, cps->slot); + cps->slot = NULL; + } } #else /* CONFIG_NFS_V4_1 */ @@ -893,7 +896,6 @@ static __be32 nfs4_callback_compound(struct svc_rqst *rqstp, void *argp, void *r struct cb_process_state cps = { .drc_status = 0, .clp = NULL, - .slotid = NFS4_NO_SLOT, .net = SVC_NET(rqstp), }; unsigned int nops = 0; diff --git a/fs/nfs/file.c b/fs/nfs/file.c index 748bb81..89bf093 100644 --- a/fs/nfs/file.c +++ b/fs/nfs/file.c @@ -233,7 +233,7 @@ EXPORT_SYMBOL_GPL(nfs_file_mmap); * nfs_file_write() that a write error occurred, and hence cause it to * fall back to doing a synchronous write. */ -int +static int nfs_file_fsync_commit(struct file *file, loff_t start, loff_t end, int datasync) { struct nfs_open_context *ctx = nfs_file_open_context(file); @@ -263,9 +263,8 @@ nfs_file_fsync_commit(struct file *file, loff_t start, loff_t end, int datasync) out: return ret; } -EXPORT_SYMBOL_GPL(nfs_file_fsync_commit); -static int +int nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync) { int ret; @@ -273,13 +272,15 @@ nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync) trace_nfs_fsync_enter(inode); - nfs_inode_dio_wait(inode); + inode_dio_wait(inode); do { ret = filemap_write_and_wait_range(inode->i_mapping, start, end); if (ret != 0) break; inode_lock(inode); ret = nfs_file_fsync_commit(file, start, end, datasync); + if (!ret) + ret = pnfs_sync_inode(inode, !!datasync); inode_unlock(inode); /* * If nfs_file_fsync_commit detected a server reboot, then @@ -293,6 +294,7 @@ nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync) trace_nfs_fsync_exit(inode, ret); return ret; } +EXPORT_SYMBOL_GPL(nfs_file_fsync); /* * Decide whether a read/modify/write cycle may be more efficient @@ -368,7 +370,7 @@ start: /* * Wait for O_DIRECT to complete */ - nfs_inode_dio_wait(mapping->host); + inode_dio_wait(mapping->host); page = grab_cache_page_write_begin(mapping, index, flags); if (!page) diff --git a/fs/nfs/flexfilelayout/flexfilelayoutdev.c b/fs/nfs/flexfilelayout/flexfilelayoutdev.c index eb37046..add0e5a7 100644 --- a/fs/nfs/flexfilelayout/flexfilelayoutdev.c +++ b/fs/nfs/flexfilelayout/flexfilelayoutdev.c @@ -418,6 +418,8 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx, pnfs_error_mark_layout_for_return(ino, lseg); } else pnfs_error_mark_layout_for_return(ino, lseg); + ds = NULL; + goto out; } out_update_creds: if (ff_layout_update_mirror_cred(mirror, ds)) diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c index 86faecf..33d18c4 100644 --- a/fs/nfs/inode.c +++ b/fs/nfs/inode.c @@ -141,7 +141,7 @@ void nfs_evict_inode(struct inode *inode) int nfs_sync_inode(struct inode *inode) { - nfs_inode_dio_wait(inode); + inode_dio_wait(inode); return nfs_wb_all(inode); } EXPORT_SYMBOL_GPL(nfs_sync_inode); diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h index 9a547aa..565f813 100644 --- a/fs/nfs/internal.h +++ b/fs/nfs/internal.h @@ -358,7 +358,7 @@ int nfs_mknod(struct inode *, struct dentry *, umode_t, dev_t); int nfs_rename(struct inode *, struct dentry *, struct inode *, struct dentry *); /* file.c */ -int nfs_file_fsync_commit(struct file *, loff_t, loff_t, int); +int nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync); loff_t nfs_file_llseek(struct file *, loff_t, int); ssize_t nfs_file_read(struct kiocb *, struct iov_iter *); ssize_t nfs_file_splice_read(struct file *, loff_t *, struct pipe_inode_info *, @@ -515,10 +515,6 @@ extern int nfs_sillyrename(struct inode *dir, struct dentry *dentry); /* direct.c */ void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo, struct nfs_direct_req *dreq); -static inline void nfs_inode_dio_wait(struct inode *inode) -{ - inode_dio_wait(inode); -} extern ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq); /* nfs4proc.c */ diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c index 57ca1c8..22c35ab 100644 --- a/fs/nfs/nfs4file.c +++ b/fs/nfs/nfs4file.c @@ -128,37 +128,6 @@ nfs4_file_flush(struct file *file, fl_owner_t id) return vfs_fsync(file, 0); } -static int -nfs4_file_fsync(struct file *file, loff_t start, loff_t end, int datasync) -{ - int ret; - struct inode *inode = file_inode(file); - - trace_nfs_fsync_enter(inode); - - nfs_inode_dio_wait(inode); - do { - ret = filemap_write_and_wait_range(inode->i_mapping, start, end); - if (ret != 0) - break; - inode_lock(inode); - ret = nfs_file_fsync_commit(file, start, end, datasync); - if (!ret) - ret = pnfs_sync_inode(inode, !!datasync); - inode_unlock(inode); - /* - * If nfs_file_fsync_commit detected a server reboot, then - * resend all dirty pages that might have been covered by - * the NFS_CONTEXT_RESEND_WRITES flag - */ - start = 0; - end = LLONG_MAX; - } while (ret == -EAGAIN); - - trace_nfs_fsync_exit(inode, ret); - return ret; -} - #ifdef CONFIG_NFS_V4_2 static loff_t nfs4_file_llseek(struct file *filep, loff_t offset, int whence) { @@ -266,7 +235,7 @@ const struct file_operations nfs4_file_operations = { .open = nfs4_file_open, .flush = nfs4_file_flush, .release = nfs_file_release, - .fsync = nfs4_file_fsync, + .fsync = nfs_file_fsync, .lock = nfs_lock, .flock = nfs_flock, .splice_read = nfs_file_splice_read, diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 400a70b..327b8c3 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -6783,13 +6783,26 @@ nfs41_same_server_scope(struct nfs41_server_scope *a, return false; } +static void +nfs4_bind_one_conn_to_session_done(struct rpc_task *task, void *calldata) +{ +} + +static const struct rpc_call_ops nfs4_bind_one_conn_to_session_ops = { + .rpc_call_done = &nfs4_bind_one_conn_to_session_done, +}; + /* - * nfs4_proc_bind_conn_to_session() + * nfs4_proc_bind_one_conn_to_session() * * The 4.1 client currently uses the same TCP connection for the * fore and backchannel. */ -int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred) +static +int nfs4_proc_bind_one_conn_to_session(struct rpc_clnt *clnt, + struct rpc_xprt *xprt, + struct nfs_client *clp, + struct rpc_cred *cred) { int status; struct nfs41_bind_conn_to_session_args args = { @@ -6804,6 +6817,14 @@ int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred .rpc_resp = &res, .rpc_cred = cred, }; + struct rpc_task_setup task_setup_data = { + .rpc_client = clnt, + .rpc_xprt = xprt, + .callback_ops = &nfs4_bind_one_conn_to_session_ops, + .rpc_message = &msg, + .flags = RPC_TASK_TIMEOUT, + }; + struct rpc_task *task; dprintk("--> %s\n", __func__); @@ -6811,7 +6832,16 @@ int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred if (!(clp->cl_session->flags & SESSION4_BACK_CHAN)) args.dir = NFS4_CDFC4_FORE; - status = rpc_call_sync(clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT); + /* Do not set the backchannel flag unless this is clnt->cl_xprt */ + if (xprt != rcu_access_pointer(clnt->cl_xprt)) + args.dir = NFS4_CDFC4_FORE; + + task = rpc_run_task(&task_setup_data); + if (!IS_ERR(task)) { + status = task->tk_status; + rpc_put_task(task); + } else + status = PTR_ERR(task); trace_nfs4_bind_conn_to_session(clp, status); if (status == 0) { if (memcmp(res.sessionid.data, @@ -6838,6 +6868,31 @@ out: return status; } +struct rpc_bind_conn_calldata { + struct nfs_client *clp; + struct rpc_cred *cred; +}; + +static int +nfs4_proc_bind_conn_to_session_callback(struct rpc_clnt *clnt, + struct rpc_xprt *xprt, + void *calldata) +{ + struct rpc_bind_conn_calldata *p = calldata; + + return nfs4_proc_bind_one_conn_to_session(clnt, xprt, p->clp, p->cred); +} + +int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred) +{ + struct rpc_bind_conn_calldata data = { + .clp = clp, + .cred = cred, + }; + return rpc_clnt_iterate_for_each_xprt(clp->cl_rpcclient, + nfs4_proc_bind_conn_to_session_callback, &data); +} + /* * Minimum set of SP4_MACH_CRED operations from RFC 5661 in the enforce map * and operations we'd like to see to enable certain features in the allow map @@ -7320,7 +7375,7 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args) args->bc_attrs.max_resp_sz = PAGE_SIZE; args->bc_attrs.max_resp_sz_cached = 0; args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS; - args->bc_attrs.max_reqs = 1; + args->bc_attrs.max_reqs = NFS41_BC_MAX_CALLBACKS; dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u " "max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n", diff --git a/fs/nfs/nfs4session.c b/fs/nfs/nfs4session.c index e23366e..332d06e 100644 --- a/fs/nfs/nfs4session.c +++ b/fs/nfs/nfs4session.c @@ -135,6 +135,43 @@ static struct nfs4_slot *nfs4_find_or_create_slot(struct nfs4_slot_table *tbl, return ERR_PTR(-ENOMEM); } +static void nfs4_lock_slot(struct nfs4_slot_table *tbl, + struct nfs4_slot *slot) +{ + u32 slotid = slot->slot_nr; + + __set_bit(slotid, tbl->used_slots); + if (slotid > tbl->highest_used_slotid || + tbl->highest_used_slotid == NFS4_NO_SLOT) + tbl->highest_used_slotid = slotid; + slot->generation = tbl->generation; +} + +/* + * nfs4_try_to_lock_slot - Given a slot try to allocate it + * + * Note: must be called with the slot_tbl_lock held. + */ +bool nfs4_try_to_lock_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot) +{ + if (nfs4_test_locked_slot(tbl, slot->slot_nr)) + return false; + nfs4_lock_slot(tbl, slot); + return true; +} + +/* + * nfs4_lookup_slot - Find a slot but don't allocate it + * + * Note: must be called with the slot_tbl_lock held. + */ +struct nfs4_slot *nfs4_lookup_slot(struct nfs4_slot_table *tbl, u32 slotid) +{ + if (slotid <= tbl->max_slotid) + return nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT); + return ERR_PTR(-E2BIG); +} + /* * nfs4_alloc_slot - efficiently look for a free slot * @@ -153,18 +190,11 @@ struct nfs4_slot *nfs4_alloc_slot(struct nfs4_slot_table *tbl) __func__, tbl->used_slots[0], tbl->highest_used_slotid, tbl->max_slotid + 1); slotid = find_first_zero_bit(tbl->used_slots, tbl->max_slotid + 1); - if (slotid > tbl->max_slotid) - goto out; - ret = nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT); - if (IS_ERR(ret)) - goto out; - __set_bit(slotid, tbl->used_slots); - if (slotid > tbl->highest_used_slotid || - tbl->highest_used_slotid == NFS4_NO_SLOT) - tbl->highest_used_slotid = slotid; - ret->generation = tbl->generation; - -out: + if (slotid <= tbl->max_slotid) { + ret = nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT); + if (!IS_ERR(ret)) + nfs4_lock_slot(tbl, ret); + } dprintk("<-- %s used_slots=%04lx highest_used=%u slotid=%u\n", __func__, tbl->used_slots[0], tbl->highest_used_slotid, !IS_ERR(ret) ? ret->slot_nr : NFS4_NO_SLOT); diff --git a/fs/nfs/nfs4session.h b/fs/nfs/nfs4session.h index e3ea2c5..5b51298 100644 --- a/fs/nfs/nfs4session.h +++ b/fs/nfs/nfs4session.h @@ -77,6 +77,8 @@ extern int nfs4_setup_slot_table(struct nfs4_slot_table *tbl, unsigned int max_reqs, const char *queue); extern void nfs4_shutdown_slot_table(struct nfs4_slot_table *tbl); extern struct nfs4_slot *nfs4_alloc_slot(struct nfs4_slot_table *tbl); +extern struct nfs4_slot *nfs4_lookup_slot(struct nfs4_slot_table *tbl, u32 slotid); +extern bool nfs4_try_to_lock_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot); extern void nfs4_free_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot); extern void nfs4_slot_tbl_drain_complete(struct nfs4_slot_table *tbl); bool nfs41_wake_and_assign_slot(struct nfs4_slot_table *tbl, @@ -88,6 +90,12 @@ static inline bool nfs4_slot_tbl_draining(struct nfs4_slot_table *tbl) return !!test_bit(NFS4_SLOT_TBL_DRAINING, &tbl->slot_tbl_state); } +static inline bool nfs4_test_locked_slot(const struct nfs4_slot_table *tbl, + u32 slotid) +{ + return !!test_bit(slotid, tbl->used_slots); +} + #if defined(CONFIG_NFS_V4_1) extern void nfs41_set_target_slotid(struct nfs4_slot_table *tbl, u32 target_highest_slotid); diff --git a/fs/nfs/pnfs_nfs.c b/fs/nfs/pnfs_nfs.c index 81ac648..4aaed89 100644 --- a/fs/nfs/pnfs_nfs.c +++ b/fs/nfs/pnfs_nfs.c @@ -606,12 +606,22 @@ static int _nfs4_pnfs_v3_ds_connect(struct nfs_server *mds_srv, dprintk("%s: DS %s: trying address %s\n", __func__, ds->ds_remotestr, da->da_remotestr); - clp = get_v3_ds_connect(mds_srv->nfs_client, + if (!IS_ERR(clp)) { + struct xprt_create xprt_args = { + .ident = XPRT_TRANSPORT_TCP, + .net = clp->cl_net, + .dstaddr = (struct sockaddr *)&da->da_addr, + .addrlen = da->da_addrlen, + .servername = clp->cl_hostname, + }; + /* Add this address as an alias */ + rpc_clnt_add_xprt(clp->cl_rpcclient, &xprt_args, + rpc_clnt_test_and_add_xprt, NULL); + } else + clp = get_v3_ds_connect(mds_srv->nfs_client, (struct sockaddr *)&da->da_addr, da->da_addrlen, IPPROTO_TCP, timeo, retrans, au_flavor); - if (!IS_ERR(clp)) - break; } if (IS_ERR(clp)) { diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h index 131032f..9a7ddba 100644 --- a/include/linux/sunrpc/clnt.h +++ b/include/linux/sunrpc/clnt.h @@ -25,6 +25,7 @@ #include <asm/signal.h> #include <linux/path.h> #include <net/ipv6.h> +#include <linux/sunrpc/xprtmultipath.h> struct rpc_inode; @@ -67,6 +68,7 @@ struct rpc_clnt { #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) struct dentry *cl_debugfs; /* debugfs directory */ #endif + struct rpc_xprt_iter cl_xpi; }; /* @@ -139,7 +141,6 @@ struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args, struct rpc_xprt *xprt); struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *, const struct rpc_program *, u32); -void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt); struct rpc_clnt *rpc_clone_client(struct rpc_clnt *); struct rpc_clnt *rpc_clone_client_set_auth(struct rpc_clnt *, rpc_authflavor_t); @@ -181,6 +182,21 @@ size_t rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t); const char *rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t); int rpc_localaddr(struct rpc_clnt *, struct sockaddr *, size_t); +int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt, + int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *), + void *data); + +int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt, + struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt, + void *dummy); +int rpc_clnt_add_xprt(struct rpc_clnt *, struct xprt_create *, + int (*setup)(struct rpc_clnt *, + struct rpc_xprt_switch *, + struct rpc_xprt *, + void *), + void *data); + const char *rpc_proc_name(const struct rpc_task *task); #endif /* __KERNEL__ */ #endif /* _LINUX_SUNRPC_CLNT_H */ diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h index f33c5a4..3b1ff38 100644 --- a/include/linux/sunrpc/rpc_rdma.h +++ b/include/linux/sunrpc/rpc_rdma.h @@ -93,6 +93,12 @@ struct rpcrdma_msg { __be32 rm_pempty[3]; /* 3 empty chunk lists */ } rm_padded; + struct { + __be32 rm_err; + __be32 rm_vers_low; + __be32 rm_vers_high; + } rm_error; + __be32 rm_chunks[0]; /* read, write and reply chunks */ } rm_body; @@ -102,17 +108,13 @@ struct rpcrdma_msg { * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks */ #define RPCRDMA_HDRLEN_MIN (sizeof(__be32) * 7) +#define RPCRDMA_HDRLEN_ERR (sizeof(__be32) * 5) enum rpcrdma_errcode { ERR_VERS = 1, ERR_CHUNK = 2 }; -struct rpcrdma_err_vers { - uint32_t rdma_vers_low; /* Version range supported by peer */ - uint32_t rdma_vers_high; -}; - enum rpcrdma_proc { RDMA_MSG = 0, /* An RPC call or reply msg */ RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */ diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h index d703f0e..05a1809 100644 --- a/include/linux/sunrpc/sched.h +++ b/include/linux/sunrpc/sched.h @@ -42,40 +42,43 @@ struct rpc_wait { */ struct rpc_task { atomic_t tk_count; /* Reference count */ + int tk_status; /* result of last operation */ struct list_head tk_task; /* global list of tasks */ - struct rpc_clnt * tk_client; /* RPC client */ - struct rpc_rqst * tk_rqstp; /* RPC request */ - - /* - * RPC call state - */ - struct rpc_message tk_msg; /* RPC call info */ /* * callback to be executed after waking up * action next procedure for async tasks - * tk_ops caller callbacks */ void (*tk_callback)(struct rpc_task *); void (*tk_action)(struct rpc_task *); - const struct rpc_call_ops *tk_ops; - void * tk_calldata; unsigned long tk_timeout; /* timeout for rpc_sleep() */ unsigned long tk_runstate; /* Task run status */ - struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could - * be any workqueue - */ + struct rpc_wait_queue *tk_waitqueue; /* RPC wait queue we're on */ union { struct work_struct tk_work; /* Async task work queue */ struct rpc_wait tk_wait; /* RPC wait */ } u; + /* + * RPC call state + */ + struct rpc_message tk_msg; /* RPC call info */ + void * tk_calldata; /* Caller private data */ + const struct rpc_call_ops *tk_ops; /* Caller callbacks */ + + struct rpc_clnt * tk_client; /* RPC client */ + struct rpc_xprt * tk_xprt; /* Transport */ + + struct rpc_rqst * tk_rqstp; /* RPC request */ + + struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could + * be any workqueue + */ ktime_t tk_start; /* RPC task init timestamp */ pid_t tk_owner; /* Process id for batching tasks */ - int tk_status; /* result of last operation */ unsigned short tk_flags; /* misc flags */ unsigned short tk_timeouts; /* maj timeouts */ @@ -100,6 +103,7 @@ struct rpc_call_ops { struct rpc_task_setup { struct rpc_task *task; struct rpc_clnt *rpc_client; + struct rpc_xprt *rpc_xprt; const struct rpc_message *rpc_message; const struct rpc_call_ops *callback_ops; void *callback_data; diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 69ef5b3..fb0d212 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -13,6 +13,7 @@ #include <linux/socket.h> #include <linux/in.h> #include <linux/ktime.h> +#include <linux/kref.h> #include <linux/sunrpc/sched.h> #include <linux/sunrpc/xdr.h> #include <linux/sunrpc/msg_prot.h> @@ -166,7 +167,7 @@ enum xprt_transports { }; struct rpc_xprt { - atomic_t count; /* Reference count */ + struct kref kref; /* Reference count */ struct rpc_xprt_ops * ops; /* transport methods */ const struct rpc_timeout *timeout; /* timeout parms */ @@ -197,6 +198,11 @@ struct rpc_xprt { unsigned int bind_index; /* bind function index */ /* + * Multipath + */ + struct list_head xprt_switch; + + /* * Connection of transports */ unsigned long bind_timeout, @@ -256,6 +262,7 @@ struct rpc_xprt { struct dentry *debugfs; /* debugfs directory */ atomic_t inject_disconnect; #endif + struct rcu_head rcu; }; #if defined(CONFIG_SUNRPC_BACKCHANNEL) @@ -318,24 +325,13 @@ int xprt_adjust_timeout(struct rpc_rqst *req); void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task); void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task); void xprt_release(struct rpc_task *task); +struct rpc_xprt * xprt_get(struct rpc_xprt *xprt); void xprt_put(struct rpc_xprt *xprt); struct rpc_xprt * xprt_alloc(struct net *net, size_t size, unsigned int num_prealloc, unsigned int max_req); void xprt_free(struct rpc_xprt *); -/** - * xprt_get - return a reference to an RPC transport. - * @xprt: pointer to the transport - * - */ -static inline struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) -{ - if (atomic_inc_not_zero(&xprt->count)) - return xprt; - return NULL; -} - static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p) { return p + xprt->tsh_size; diff --git a/include/linux/sunrpc/xprtmultipath.h b/include/linux/sunrpc/xprtmultipath.h new file mode 100644 index 0000000..5a9acff --- /dev/null +++ b/include/linux/sunrpc/xprtmultipath.h @@ -0,0 +1,69 @@ +/* + * RPC client multipathing definitions + * + * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. + * + * Trond Myklebust <trond.myklebust@primarydata.com> + */ +#ifndef _NET_SUNRPC_XPRTMULTIPATH_H +#define _NET_SUNRPC_XPRTMULTIPATH_H + +struct rpc_xprt_iter_ops; +struct rpc_xprt_switch { + spinlock_t xps_lock; + struct kref xps_kref; + + unsigned int xps_nxprts; + struct list_head xps_xprt_list; + + struct net * xps_net; + + const struct rpc_xprt_iter_ops *xps_iter_ops; + + struct rcu_head xps_rcu; +}; + +struct rpc_xprt_iter { + struct rpc_xprt_switch __rcu *xpi_xpswitch; + struct rpc_xprt * xpi_cursor; + + const struct rpc_xprt_iter_ops *xpi_ops; +}; + + +struct rpc_xprt_iter_ops { + void (*xpi_rewind)(struct rpc_xprt_iter *); + struct rpc_xprt *(*xpi_xprt)(struct rpc_xprt_iter *); + struct rpc_xprt *(*xpi_next)(struct rpc_xprt_iter *); +}; + +extern struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, + gfp_t gfp_flags); + +extern struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps); +extern void xprt_switch_put(struct rpc_xprt_switch *xps); + +extern void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps); + +extern void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt); +extern void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt); + +extern void xprt_iter_init(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *xps); + +extern void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *xps); + +extern void xprt_iter_destroy(struct rpc_xprt_iter *xpi); + +extern struct rpc_xprt_switch *xprt_iter_xchg_switch( + struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *newswitch); + +extern struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi); +extern struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi); +extern struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi); + +#endif diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h index b7b279b..767190b 100644 --- a/include/linux/sunrpc/xprtrdma.h +++ b/include/linux/sunrpc/xprtrdma.h @@ -54,8 +54,6 @@ #define RPCRDMA_DEF_INLINE (1024) /* default inline max */ -#define RPCRDMA_INLINE_PAD_THRESH (512)/* payload threshold to pad (bytes) */ - /* Memory registration strategies, by number. * This is part of a kernel / user space API. Do not remove. */ enum rpcrdma_memreg { diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile index b512fbd..ea7ffa1 100644 --- a/net/sunrpc/Makefile +++ b/net/sunrpc/Makefile @@ -12,7 +12,8 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \ svc.o svcsock.o svcauth.o svcauth_unix.o \ addr.o rpcb_clnt.o timer.o xdr.o \ sunrpc_syms.o cache.o rpc_pipe.o \ - svc_xprt.o + svc_xprt.o \ + xprtmultipath.o sunrpc-$(CONFIG_SUNRPC_DEBUG) += debugfs.o sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o sunrpc-$(CONFIG_PROC_FS) += stats.o diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index cabf586..8c6bc79 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1181,12 +1181,12 @@ static struct rpc_auth * gss_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt) { struct gss_auth *gss_auth; - struct rpc_xprt *xprt = rcu_access_pointer(clnt->cl_xprt); + struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch); while (clnt != clnt->cl_parent) { struct rpc_clnt *parent = clnt->cl_parent; /* Find the original parent for this transport */ - if (rcu_access_pointer(parent->cl_xprt) != xprt) + if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps) break; clnt = parent; } diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index b7f2104..7e0c9bf 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -354,6 +354,7 @@ static void rpc_free_clid(struct rpc_clnt *clnt) } static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, + struct rpc_xprt_switch *xps, struct rpc_xprt *xprt, struct rpc_clnt *parent) { @@ -411,6 +412,8 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, } rpc_clnt_set_transport(clnt, xprt, timeout); + xprt_iter_init(&clnt->cl_xpi, xps); + xprt_switch_put(xps); clnt->cl_rtt = &clnt->cl_rtt_default; rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval); @@ -438,6 +441,7 @@ out_no_clid: out_err: rpciod_down(); out_no_rpciod: + xprt_switch_put(xps); xprt_put(xprt); return ERR_PTR(err); } @@ -446,8 +450,13 @@ struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args, struct rpc_xprt *xprt) { struct rpc_clnt *clnt = NULL; + struct rpc_xprt_switch *xps; - clnt = rpc_new_client(args, xprt, NULL); + xps = xprt_switch_alloc(xprt, GFP_KERNEL); + if (xps == NULL) + return ERR_PTR(-ENOMEM); + + clnt = rpc_new_client(args, xps, xprt, NULL); if (IS_ERR(clnt)) return clnt; @@ -564,6 +573,7 @@ EXPORT_SYMBOL_GPL(rpc_create); static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args, struct rpc_clnt *clnt) { + struct rpc_xprt_switch *xps; struct rpc_xprt *xprt; struct rpc_clnt *new; int err; @@ -571,13 +581,17 @@ static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args, err = -ENOMEM; rcu_read_lock(); xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); + xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch)); rcu_read_unlock(); - if (xprt == NULL) + if (xprt == NULL || xps == NULL) { + xprt_put(xprt); + xprt_switch_put(xps); goto out_err; + } args->servername = xprt->servername; args->nodename = clnt->cl_nodename; - new = rpc_new_client(args, xprt, clnt); + new = rpc_new_client(args, xps, xprt, clnt); if (IS_ERR(new)) { err = PTR_ERR(new); goto out_err; @@ -657,6 +671,7 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt, { const struct rpc_timeout *old_timeo; rpc_authflavor_t pseudoflavor; + struct rpc_xprt_switch *xps, *oldxps; struct rpc_xprt *xprt, *old; struct rpc_clnt *parent; int err; @@ -668,10 +683,17 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt, return PTR_ERR(xprt); } + xps = xprt_switch_alloc(xprt, GFP_KERNEL); + if (xps == NULL) { + xprt_put(xprt); + return -ENOMEM; + } + pseudoflavor = clnt->cl_auth->au_flavor; old_timeo = clnt->cl_timeout; old = rpc_clnt_set_transport(clnt, xprt, timeout); + oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps); rpc_unregister_client(clnt); __rpc_clnt_remove_pipedir(clnt); @@ -697,20 +719,74 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt, synchronize_rcu(); if (parent != clnt) rpc_release_client(parent); + xprt_switch_put(oldxps); xprt_put(old); dprintk("RPC: replaced xprt for clnt %p\n", clnt); return 0; out_revert: + xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps); rpc_clnt_set_transport(clnt, old, old_timeo); clnt->cl_parent = parent; rpc_client_register(clnt, pseudoflavor, NULL); + xprt_switch_put(xps); xprt_put(xprt); dprintk("RPC: failed to switch xprt for clnt %p\n", clnt); return err; } EXPORT_SYMBOL_GPL(rpc_switch_client_transport); +static +int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi) +{ + struct rpc_xprt_switch *xps; + + rcu_read_lock(); + xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch)); + rcu_read_unlock(); + if (xps == NULL) + return -EAGAIN; + xprt_iter_init_listall(xpi, xps); + xprt_switch_put(xps); + return 0; +} + +/** + * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports + * @clnt: pointer to client + * @fn: function to apply + * @data: void pointer to function data + * + * Iterates through the list of RPC transports currently attached to the + * client and applies the function fn(clnt, xprt, data). + * + * On error, the iteration stops, and the function returns the error value. + */ +int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt, + int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *), + void *data) +{ + struct rpc_xprt_iter xpi; + int ret; + + ret = rpc_clnt_xprt_iter_init(clnt, &xpi); + if (ret) + return ret; + for (;;) { + struct rpc_xprt *xprt = xprt_iter_get_next(&xpi); + + if (!xprt) + break; + ret = fn(clnt, xprt, data); + xprt_put(xprt); + if (ret < 0) + break; + } + xprt_iter_destroy(&xpi); + return ret; +} +EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt); + /* * Kill all tasks for the given client. * XXX: kill their descendants as well? @@ -783,6 +859,7 @@ rpc_free_client(struct rpc_clnt *clnt) rpc_free_iostats(clnt->cl_metrics); clnt->cl_metrics = NULL; xprt_put(rcu_dereference_raw(clnt->cl_xprt)); + xprt_iter_destroy(&clnt->cl_xpi); rpciod_down(); rpc_free_clid(clnt); kfree(clnt); @@ -868,6 +945,7 @@ EXPORT_SYMBOL_GPL(rpc_bind_new_program); void rpc_task_release_client(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; + struct rpc_xprt *xprt = task->tk_xprt; if (clnt != NULL) { /* Remove from client task list */ @@ -878,13 +956,22 @@ void rpc_task_release_client(struct rpc_task *task) rpc_release_client(clnt); } + + if (xprt != NULL) { + task->tk_xprt = NULL; + + xprt_put(xprt); + } } static void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt) { + if (clnt != NULL) { rpc_task_release_client(task); + if (task->tk_xprt == NULL) + task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi); task->tk_client = clnt; atomic_inc(&clnt->cl_count); if (clnt->cl_softrtry) @@ -900,14 +987,6 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt) } } -void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt) -{ - rpc_task_release_client(task); - rpc_task_set_client(task, clnt); -} -EXPORT_SYMBOL_GPL(rpc_task_reset_client); - - static void rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg) { @@ -2104,11 +2183,9 @@ call_timeout(struct rpc_task *task) } if (RPC_IS_SOFT(task)) { if (clnt->cl_chatty) { - rcu_read_lock(); printk(KERN_NOTICE "%s: server %s not responding, timed out\n", clnt->cl_program->name, - rcu_dereference(clnt->cl_xprt)->servername); - rcu_read_unlock(); + task->tk_xprt->servername); } if (task->tk_flags & RPC_TASK_TIMEOUT) rpc_exit(task, -ETIMEDOUT); @@ -2120,11 +2197,9 @@ call_timeout(struct rpc_task *task) if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) { task->tk_flags |= RPC_CALL_MAJORSEEN; if (clnt->cl_chatty) { - rcu_read_lock(); printk(KERN_NOTICE "%s: server %s not responding, still trying\n", clnt->cl_program->name, - rcu_dereference(clnt->cl_xprt)->servername); - rcu_read_unlock(); + task->tk_xprt->servername); } } rpc_force_rebind(clnt); @@ -2154,11 +2229,9 @@ call_decode(struct rpc_task *task) if (task->tk_flags & RPC_CALL_MAJORSEEN) { if (clnt->cl_chatty) { - rcu_read_lock(); printk(KERN_NOTICE "%s: server %s OK\n", clnt->cl_program->name, - rcu_dereference(clnt->cl_xprt)->servername); - rcu_read_unlock(); + task->tk_xprt->servername); } task->tk_flags &= ~RPC_CALL_MAJORSEEN; } @@ -2312,11 +2385,9 @@ rpc_verify_header(struct rpc_task *task) task->tk_action = call_bind; goto out_retry; case RPC_AUTH_TOOWEAK: - rcu_read_lock(); printk(KERN_NOTICE "RPC: server %s requires stronger " "authentication.\n", - rcu_dereference(clnt->cl_xprt)->servername); - rcu_read_unlock(); + task->tk_xprt->servername); break; default: dprintk("RPC: %5u %s: unknown auth error: %x\n", @@ -2341,27 +2412,27 @@ rpc_verify_header(struct rpc_task *task) case RPC_SUCCESS: return p; case RPC_PROG_UNAVAIL: - dprintk_rcu("RPC: %5u %s: program %u is unsupported " + dprintk("RPC: %5u %s: program %u is unsupported " "by server %s\n", task->tk_pid, __func__, (unsigned int)clnt->cl_prog, - rcu_dereference(clnt->cl_xprt)->servername); + task->tk_xprt->servername); error = -EPFNOSUPPORT; goto out_err; case RPC_PROG_MISMATCH: - dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported " + dprintk("RPC: %5u %s: program %u, version %u unsupported " "by server %s\n", task->tk_pid, __func__, (unsigned int)clnt->cl_prog, (unsigned int)clnt->cl_vers, - rcu_dereference(clnt->cl_xprt)->servername); + task->tk_xprt->servername); error = -EPROTONOSUPPORT; goto out_err; case RPC_PROC_UNAVAIL: - dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, " + dprintk("RPC: %5u %s: proc %s unsupported by program %u, " "version %u on server %s\n", task->tk_pid, __func__, rpc_proc_name(task), clnt->cl_prog, clnt->cl_vers, - rcu_dereference(clnt->cl_xprt)->servername); + task->tk_xprt->servername); error = -EOPNOTSUPP; goto out_err; case RPC_GARBAGE_ARGS: @@ -2421,7 +2492,10 @@ static int rpc_ping(struct rpc_clnt *clnt) return err; } -struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags) +static +struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt, + struct rpc_xprt *xprt, struct rpc_cred *cred, int flags, + const struct rpc_call_ops *ops, void *data) { struct rpc_message msg = { .rpc_proc = &rpcproc_null, @@ -2429,14 +2503,140 @@ struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int }; struct rpc_task_setup task_setup_data = { .rpc_client = clnt, + .rpc_xprt = xprt, .rpc_message = &msg, - .callback_ops = &rpc_default_ops, + .callback_ops = (ops != NULL) ? ops : &rpc_default_ops, + .callback_data = data, .flags = flags, }; + return rpc_run_task(&task_setup_data); } + +struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags) +{ + return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL); +} EXPORT_SYMBOL_GPL(rpc_call_null); +struct rpc_cb_add_xprt_calldata { + struct rpc_xprt_switch *xps; + struct rpc_xprt *xprt; +}; + +static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata) +{ + struct rpc_cb_add_xprt_calldata *data = calldata; + + if (task->tk_status == 0) + rpc_xprt_switch_add_xprt(data->xps, data->xprt); +} + +static void rpc_cb_add_xprt_release(void *calldata) +{ + struct rpc_cb_add_xprt_calldata *data = calldata; + + xprt_put(data->xprt); + xprt_switch_put(data->xps); + kfree(data); +} + +const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = { + .rpc_call_done = rpc_cb_add_xprt_done, + .rpc_release = rpc_cb_add_xprt_release, +}; + +/** + * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt + * @clnt: pointer to struct rpc_clnt + * @xps: pointer to struct rpc_xprt_switch, + * @xprt: pointer struct rpc_xprt + * @dummy: unused + */ +int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt, + struct rpc_xprt_switch *xps, struct rpc_xprt *xprt, + void *dummy) +{ + struct rpc_cb_add_xprt_calldata *data; + struct rpc_cred *cred; + struct rpc_task *task; + + data = kmalloc(sizeof(*data), GFP_NOFS); + if (!data) + return -ENOMEM; + data->xps = xprt_switch_get(xps); + data->xprt = xprt_get(xprt); + + cred = authnull_ops.lookup_cred(NULL, NULL, 0); + task = rpc_call_null_helper(clnt, xprt, cred, + RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC, + &rpc_cb_add_xprt_call_ops, data); + put_rpccred(cred); + if (IS_ERR(task)) + return PTR_ERR(task); + rpc_put_task(task); + return 1; +} +EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt); + +/** + * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt + * @clnt: pointer to struct rpc_clnt + * @xprtargs: pointer to struct xprt_create + * @setup: callback to test and/or set up the connection + * @data: pointer to setup function data + * + * Creates a new transport using the parameters set in args and + * adds it to clnt. + * If ping is set, then test that connectivity succeeds before + * adding the new transport. + * + */ +int rpc_clnt_add_xprt(struct rpc_clnt *clnt, + struct xprt_create *xprtargs, + int (*setup)(struct rpc_clnt *, + struct rpc_xprt_switch *, + struct rpc_xprt *, + void *), + void *data) +{ + struct rpc_xprt_switch *xps; + struct rpc_xprt *xprt; + unsigned char resvport; + int ret = 0; + + rcu_read_lock(); + xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch)); + xprt = xprt_iter_xprt(&clnt->cl_xpi); + if (xps == NULL || xprt == NULL) { + rcu_read_unlock(); + return -EAGAIN; + } + resvport = xprt->resvport; + rcu_read_unlock(); + + xprt = xprt_create_transport(xprtargs); + if (IS_ERR(xprt)) { + ret = PTR_ERR(xprt); + goto out_put_switch; + } + xprt->resvport = resvport; + + rpc_xprt_switch_set_roundrobin(xps); + if (setup) { + ret = setup(clnt, xps, xprt, data); + if (ret != 0) + goto out_put_xprt; + } + rpc_xprt_switch_add_xprt(xps, xprt); +out_put_xprt: + xprt_put(xprt); +out_put_switch: + xprt_switch_put(xps); + return ret; +} +EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt); + #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) static void rpc_show_header(void) { @@ -2483,57 +2683,39 @@ void rpc_show_tasks(struct net *net) #endif #if IS_ENABLED(CONFIG_SUNRPC_SWAP) +static int +rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt, + struct rpc_xprt *xprt, + void *dummy) +{ + return xprt_enable_swap(xprt); +} + int rpc_clnt_swap_activate(struct rpc_clnt *clnt) { - int ret = 0; - struct rpc_xprt *xprt; - - if (atomic_inc_return(&clnt->cl_swapper) == 1) { -retry: - rcu_read_lock(); - xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); - rcu_read_unlock(); - if (!xprt) { - /* - * If we didn't get a reference, then we likely are - * racing with a migration event. Wait for a grace - * period and try again. - */ - synchronize_rcu(); - goto retry; - } - - ret = xprt_enable_swap(xprt); - xprt_put(xprt); - } - return ret; + if (atomic_inc_return(&clnt->cl_swapper) == 1) + return rpc_clnt_iterate_for_each_xprt(clnt, + rpc_clnt_swap_activate_callback, NULL); + return 0; } EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate); +static int +rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt, + struct rpc_xprt *xprt, + void *dummy) +{ + xprt_disable_swap(xprt); + return 0; +} + void rpc_clnt_swap_deactivate(struct rpc_clnt *clnt) { - struct rpc_xprt *xprt; - - if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) { -retry: - rcu_read_lock(); - xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); - rcu_read_unlock(); - if (!xprt) { - /* - * If we didn't get a reference, then we likely are - * racing with a migration event. Wait for a grace - * period and try again. - */ - synchronize_rcu(); - goto retry; - } - - xprt_disable_swap(xprt); - xprt_put(xprt); - } + if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) + rpc_clnt_iterate_for_each_xprt(clnt, + rpc_clnt_swap_deactivate_callback, NULL); } EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate); #endif /* CONFIG_SUNRPC_SWAP */ diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c index cf5770d..5b30603 100644 --- a/net/sunrpc/rpcb_clnt.c +++ b/net/sunrpc/rpcb_clnt.c @@ -648,10 +648,10 @@ static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt, struct rpcbi static struct rpc_clnt *rpcb_find_transport_owner(struct rpc_clnt *clnt) { struct rpc_clnt *parent = clnt->cl_parent; - struct rpc_xprt *xprt = rcu_dereference(clnt->cl_xprt); + struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch); while (parent != clnt) { - if (rcu_dereference(parent->cl_xprt) != xprt) + if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps) break; if (clnt->cl_autobind) break; @@ -683,11 +683,9 @@ void rpcb_getport_async(struct rpc_task *task) int status; rcu_read_lock(); - do { - clnt = rpcb_find_transport_owner(task->tk_client); - xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); - } while (xprt == NULL); + clnt = rpcb_find_transport_owner(task->tk_client); rcu_read_unlock(); + xprt = xprt_get(task->tk_xprt); dprintk("RPC: %5u %s(%s, %u, %u, %d)\n", task->tk_pid, __func__, diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 73ad57a..fcfd48d 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -909,6 +909,8 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta /* Initialize workqueue for async tasks */ task->tk_workqueue = task_setup_data->workqueue; + task->tk_xprt = xprt_get(task_setup_data->rpc_xprt); + if (task->tk_ops->rpc_call_prepare != NULL) task->tk_action = rpc_prepare_task; diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 37edea6..216a138 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -48,6 +48,7 @@ #include <linux/sunrpc/clnt.h> #include <linux/sunrpc/metrics.h> #include <linux/sunrpc/bc_xprt.h> +#include <linux/rcupdate.h> #include <trace/events/sunrpc.h> @@ -1166,7 +1167,7 @@ void xprt_free(struct rpc_xprt *xprt) { put_net(xprt->xprt_net); xprt_free_all_slots(xprt); - kfree(xprt); + kfree_rcu(xprt, rcu); } EXPORT_SYMBOL_GPL(xprt_free); @@ -1180,7 +1181,7 @@ EXPORT_SYMBOL_GPL(xprt_free); */ void xprt_reserve(struct rpc_task *task) { - struct rpc_xprt *xprt; + struct rpc_xprt *xprt = task->tk_xprt; task->tk_status = 0; if (task->tk_rqstp != NULL) @@ -1188,11 +1189,8 @@ void xprt_reserve(struct rpc_task *task) task->tk_timeout = 0; task->tk_status = -EAGAIN; - rcu_read_lock(); - xprt = rcu_dereference(task->tk_client->cl_xprt); if (!xprt_throttle_congested(xprt, task)) xprt->ops->alloc_slot(xprt, task); - rcu_read_unlock(); } /** @@ -1206,7 +1204,7 @@ void xprt_reserve(struct rpc_task *task) */ void xprt_retry_reserve(struct rpc_task *task) { - struct rpc_xprt *xprt; + struct rpc_xprt *xprt = task->tk_xprt; task->tk_status = 0; if (task->tk_rqstp != NULL) @@ -1214,10 +1212,7 @@ void xprt_retry_reserve(struct rpc_task *task) task->tk_timeout = 0; task->tk_status = -EAGAIN; - rcu_read_lock(); - xprt = rcu_dereference(task->tk_client->cl_xprt); xprt->ops->alloc_slot(xprt, task); - rcu_read_unlock(); } static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) @@ -1264,11 +1259,9 @@ void xprt_release(struct rpc_task *task) if (req == NULL) { if (task->tk_client) { - rcu_read_lock(); - xprt = rcu_dereference(task->tk_client->cl_xprt); + xprt = task->tk_xprt; if (xprt->snd_task == task) xprt_release_write(xprt, task); - rcu_read_unlock(); } return; } @@ -1307,7 +1300,7 @@ void xprt_release(struct rpc_task *task) static void xprt_init(struct rpc_xprt *xprt, struct net *net) { - atomic_set(&xprt->count, 1); + kref_init(&xprt->kref); spin_lock_init(&xprt->transport_lock); spin_lock_init(&xprt->reserve_lock); @@ -1318,6 +1311,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net) spin_lock_init(&xprt->bc_pa_lock); INIT_LIST_HEAD(&xprt->bc_pa_list); #endif /* CONFIG_SUNRPC_BACKCHANNEL */ + INIT_LIST_HEAD(&xprt->xprt_switch); xprt->last_used = jiffies; xprt->cwnd = RPC_INITCWND; @@ -1415,6 +1409,24 @@ static void xprt_destroy(struct rpc_xprt *xprt) xprt->ops->destroy(xprt); } +static void xprt_destroy_kref(struct kref *kref) +{ + xprt_destroy(container_of(kref, struct rpc_xprt, kref)); +} + +/** + * xprt_get - return a reference to an RPC transport. + * @xprt: pointer to the transport + * + */ +struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) +{ + if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) + return xprt; + return NULL; +} +EXPORT_SYMBOL_GPL(xprt_get); + /** * xprt_put - release a reference to an RPC transport. * @xprt: pointer to the transport @@ -1422,7 +1434,7 @@ static void xprt_destroy(struct rpc_xprt *xprt) */ void xprt_put(struct rpc_xprt *xprt) { - if (atomic_dec_and_test(&xprt->count)) - xprt_destroy(xprt); + if (xprt != NULL) + kref_put(&xprt->kref, xprt_destroy_kref); } EXPORT_SYMBOL_GPL(xprt_put); diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c new file mode 100644 index 0000000..e7fd769 --- /dev/null +++ b/net/sunrpc/xprtmultipath.c @@ -0,0 +1,475 @@ +/* + * Multipath support for RPC + * + * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. + * + * Trond Myklebust <trond.myklebust@primarydata.com> + * + */ +#include <linux/types.h> +#include <linux/kref.h> +#include <linux/list.h> +#include <linux/rcupdate.h> +#include <linux/rculist.h> +#include <linux/slab.h> +#include <asm/cmpxchg.h> +#include <linux/spinlock.h> +#include <linux/sunrpc/xprt.h> +#include <linux/sunrpc/xprtmultipath.h> + +typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head, + const struct rpc_xprt *cur); + +static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; +static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; +static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; + +static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt) +{ + if (unlikely(xprt_get(xprt) == NULL)) + return; + list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); + smp_wmb(); + if (xps->xps_nxprts == 0) + xps->xps_net = xprt->xprt_net; + xps->xps_nxprts++; +} + +/** + * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch + * @xps: pointer to struct rpc_xprt_switch + * @xprt: pointer to struct rpc_xprt + * + * Adds xprt to the end of the list of struct rpc_xprt in xps. + */ +void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt) +{ + if (xprt == NULL) + return; + spin_lock(&xps->xps_lock); + if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) + xprt_switch_add_xprt_locked(xps, xprt); + spin_unlock(&xps->xps_lock); +} + +static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt) +{ + if (unlikely(xprt == NULL)) + return; + xps->xps_nxprts--; + if (xps->xps_nxprts == 0) + xps->xps_net = NULL; + smp_wmb(); + list_del_rcu(&xprt->xprt_switch); +} + +/** + * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch + * @xps: pointer to struct rpc_xprt_switch + * @xprt: pointer to struct rpc_xprt + * + * Removes xprt from the list of struct rpc_xprt in xps. + */ +void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt) +{ + spin_lock(&xps->xps_lock); + xprt_switch_remove_xprt_locked(xps, xprt); + spin_unlock(&xps->xps_lock); + xprt_put(xprt); +} + +/** + * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch + * @xprt: pointer to struct rpc_xprt + * @gfp_flags: allocation flags + * + * On success, returns an initialised struct rpc_xprt_switch, containing + * the entry xprt. Returns NULL on failure. + */ +struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, + gfp_t gfp_flags) +{ + struct rpc_xprt_switch *xps; + + xps = kmalloc(sizeof(*xps), gfp_flags); + if (xps != NULL) { + spin_lock_init(&xps->xps_lock); + kref_init(&xps->xps_kref); + xps->xps_nxprts = 0; + INIT_LIST_HEAD(&xps->xps_xprt_list); + xps->xps_iter_ops = &rpc_xprt_iter_singular; + xprt_switch_add_xprt_locked(xps, xprt); + } + + return xps; +} + +static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) +{ + spin_lock(&xps->xps_lock); + while (!list_empty(&xps->xps_xprt_list)) { + struct rpc_xprt *xprt; + + xprt = list_first_entry(&xps->xps_xprt_list, + struct rpc_xprt, xprt_switch); + xprt_switch_remove_xprt_locked(xps, xprt); + spin_unlock(&xps->xps_lock); + xprt_put(xprt); + spin_lock(&xps->xps_lock); + } + spin_unlock(&xps->xps_lock); +} + +static void xprt_switch_free(struct kref *kref) +{ + struct rpc_xprt_switch *xps = container_of(kref, + struct rpc_xprt_switch, xps_kref); + + xprt_switch_free_entries(xps); + kfree_rcu(xps, xps_rcu); +} + +/** + * xprt_switch_get - Return a reference to a rpc_xprt_switch + * @xps: pointer to struct rpc_xprt_switch + * + * Returns a reference to xps unless the refcount is already zero. + */ +struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) +{ + if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) + return xps; + return NULL; +} + +/** + * xprt_switch_put - Release a reference to a rpc_xprt_switch + * @xps: pointer to struct rpc_xprt_switch + * + * Release the reference to xps, and free it once the refcount is zero. + */ +void xprt_switch_put(struct rpc_xprt_switch *xps) +{ + if (xps != NULL) + kref_put(&xps->xps_kref, xprt_switch_free); +} + +/** + * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch + * @xps: pointer to struct rpc_xprt_switch + * + * Sets a round-robin default policy for iterators acting on xps. + */ +void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) +{ + if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) + WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); +} + +static +const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) +{ + if (xpi->xpi_ops != NULL) + return xpi->xpi_ops; + return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; +} + +static +void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) +{ +} + +static +void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) +{ + WRITE_ONCE(xpi->xpi_cursor, NULL); +} + +static +struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) +{ + return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch); +} + +static +struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) +{ + struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); + + if (xps == NULL) + return NULL; + return xprt_switch_find_first_entry(&xps->xps_xprt_list); +} + +static +struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, + const struct rpc_xprt *cur) +{ + struct rpc_xprt *pos; + + list_for_each_entry_rcu(pos, head, xprt_switch) { + if (cur == pos) + return pos; + } + return NULL; +} + +static +struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) +{ + struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); + struct list_head *head; + + if (xps == NULL) + return NULL; + head = &xps->xps_xprt_list; + if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) + return xprt_switch_find_first_entry(head); + return xprt_switch_find_current_entry(head, xpi->xpi_cursor); +} + +static +struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, + const struct rpc_xprt *cur) +{ + struct rpc_xprt *pos, *prev = NULL; + + list_for_each_entry_rcu(pos, head, xprt_switch) { + if (cur == prev) + return pos; + prev = pos; + } + return NULL; +} + +static +struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head, + struct rpc_xprt **cursor, + xprt_switch_find_xprt_t find_next) +{ + struct rpc_xprt *cur, *pos, *old; + + cur = READ_ONCE(*cursor); + for (;;) { + old = cur; + pos = find_next(head, old); + if (pos == NULL) + break; + cur = cmpxchg_relaxed(cursor, old, pos); + if (cur == old) + break; + } + return pos; +} + +static +struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, + xprt_switch_find_xprt_t find_next) +{ + struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); + struct list_head *head; + + if (xps == NULL) + return NULL; + head = &xps->xps_xprt_list; + if (xps->xps_nxprts < 2) + return xprt_switch_find_first_entry(head); + return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next); +} + +static +struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, + const struct rpc_xprt *cur) +{ + struct rpc_xprt *ret; + + ret = xprt_switch_find_next_entry(head, cur); + if (ret != NULL) + return ret; + return xprt_switch_find_first_entry(head); +} + +static +struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) +{ + return xprt_iter_next_entry_multiple(xpi, + xprt_switch_find_next_entry_roundrobin); +} + +static +struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) +{ + return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry); +} + +/* + * xprt_iter_rewind - Resets the xprt iterator + * @xpi: pointer to rpc_xprt_iter + * + * Resets xpi to ensure that it points to the first entry in the list + * of transports. + */ +static +void xprt_iter_rewind(struct rpc_xprt_iter *xpi) +{ + rcu_read_lock(); + xprt_iter_ops(xpi)->xpi_rewind(xpi); + rcu_read_unlock(); +} + +static void __xprt_iter_init(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *xps, + const struct rpc_xprt_iter_ops *ops) +{ + rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); + xpi->xpi_cursor = NULL; + xpi->xpi_ops = ops; +} + +/** + * xprt_iter_init - Initialise an xprt iterator + * @xpi: pointer to rpc_xprt_iter + * @xps: pointer to rpc_xprt_switch + * + * Initialises the iterator to use the default iterator ops + * as set in xps. This function is mainly intended for internal + * use in the rpc_client. + */ +void xprt_iter_init(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *xps) +{ + __xprt_iter_init(xpi, xps, NULL); +} + +/** + * xprt_iter_init_listall - Initialise an xprt iterator + * @xpi: pointer to rpc_xprt_iter + * @xps: pointer to rpc_xprt_switch + * + * Initialises the iterator to iterate once through the entire list + * of entries in xps. + */ +void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *xps) +{ + __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); +} + +/** + * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch + * @xpi: pointer to rpc_xprt_iter + * @xps: pointer to a new rpc_xprt_switch or NULL + * + * Swaps out the existing xpi->xpi_xpswitch with a new value. + */ +struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *newswitch) +{ + struct rpc_xprt_switch __rcu *oldswitch; + + /* Atomically swap out the old xpswitch */ + oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); + if (newswitch != NULL) + xprt_iter_rewind(xpi); + return rcu_dereference_protected(oldswitch, true); +} + +/** + * xprt_iter_destroy - Destroys the xprt iterator + * @xpi pointer to rpc_xprt_iter + */ +void xprt_iter_destroy(struct rpc_xprt_iter *xpi) +{ + xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); +} + +/** + * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor + * @xpi: pointer to rpc_xprt_iter + * + * Returns a pointer to the struct rpc_xprt that is currently + * pointed to by the cursor. + * Caller must be holding rcu_read_lock(). + */ +struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) +{ + WARN_ON_ONCE(!rcu_read_lock_held()); + return xprt_iter_ops(xpi)->xpi_xprt(xpi); +} + +static +struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, + struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) +{ + struct rpc_xprt *ret; + + do { + ret = fn(xpi); + if (ret == NULL) + break; + ret = xprt_get(ret); + } while (ret == NULL); + return ret; +} + +/** + * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor + * @xpi: pointer to rpc_xprt_iter + * + * Returns a reference to the struct rpc_xprt that is currently + * pointed to by the cursor. + */ +struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) +{ + struct rpc_xprt *xprt; + + rcu_read_lock(); + xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); + rcu_read_unlock(); + return xprt; +} + +/** + * xprt_iter_get_next - Returns the next rpc_xprt following the cursor + * @xpi: pointer to rpc_xprt_iter + * + * Returns a reference to the struct rpc_xprt that immediately follows the + * entry pointed to by the cursor. + */ +struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) +{ + struct rpc_xprt *xprt; + + rcu_read_lock(); + xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); + rcu_read_unlock(); + return xprt; +} + +/* Policy for always returning the first entry in the rpc_xprt_switch */ +static +const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { + .xpi_rewind = xprt_iter_no_rewind, + .xpi_xprt = xprt_iter_first_entry, + .xpi_next = xprt_iter_first_entry, +}; + +/* Policy for round-robin iteration of entries in the rpc_xprt_switch */ +static +const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { + .xpi_rewind = xprt_iter_default_rewind, + .xpi_xprt = xprt_iter_current_entry, + .xpi_next = xprt_iter_next_entry_roundrobin, +}; + +/* Policy for once-through iteration of entries in the rpc_xprt_switch */ +static +const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { + .xpi_rewind = xprt_iter_default_rewind, + .xpi_xprt = xprt_iter_current_entry, + .xpi_next = xprt_iter_next_entry_all, +}; diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index c14f3a4..b289e10 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -80,13 +80,13 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) if (!r) goto out; - r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES * - sizeof(u64), GFP_KERNEL); - if (!r->r.fmr.physaddrs) + r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES * + sizeof(u64), GFP_KERNEL); + if (!r->fmr.physaddrs) goto out_free; - r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); - if (IS_ERR(r->r.fmr.fmr)) + r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); + if (IS_ERR(r->fmr.fmr)) goto out_fmr_err; list_add(&r->mw_list, &buf->rb_mws); @@ -95,9 +95,9 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) return 0; out_fmr_err: - rc = PTR_ERR(r->r.fmr.fmr); + rc = PTR_ERR(r->fmr.fmr); dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc); - kfree(r->r.fmr.physaddrs); + kfree(r->fmr.physaddrs); out_free: kfree(r); out: @@ -109,7 +109,7 @@ __fmr_unmap(struct rpcrdma_mw *r) { LIST_HEAD(l); - list_add(&r->r.fmr.fmr->list, &l); + list_add(&r->fmr.fmr->list, &l); return ib_unmap_fmr(&l); } @@ -148,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, nsegs = RPCRDMA_MAX_FMR_SGES; for (i = 0; i < nsegs;) { rpcrdma_map_one(device, seg, direction); - mw->r.fmr.physaddrs[i] = seg->mr_dma; + mw->fmr.physaddrs[i] = seg->mr_dma; len += seg->mr_len; ++seg; ++i; @@ -158,13 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, break; } - rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs, + rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs, i, seg1->mr_dma); if (rc) goto out_maperr; seg1->rl_mw = mw; - seg1->mr_rkey = mw->r.fmr.fmr->rkey; + seg1->mr_rkey = mw->fmr.fmr->rkey; seg1->mr_base = seg1->mr_dma + pageoff; seg1->mr_nsegs = i; seg1->mr_len = len; @@ -219,7 +219,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) seg = &req->rl_segments[i]; mw = seg->rl_mw; - list_add(&mw->r.fmr.fmr->list, &unmap_list); + list_add(&mw->fmr.fmr->list, &unmap_list); i += seg->mr_nsegs; } @@ -281,9 +281,9 @@ fmr_op_destroy(struct rpcrdma_buffer *buf) while (!list_empty(&buf->rb_all)) { r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); list_del(&r->mw_all); - kfree(r->r.fmr.physaddrs); + kfree(r->fmr.physaddrs); - rc = ib_dealloc_fmr(r->r.fmr.fmr); + rc = ib_dealloc_fmr(r->fmr.fmr); if (rc) dprintk("RPC: %s: ib_dealloc_fmr failed %i\n", __func__, rc); diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index e165673..c250924 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -109,20 +109,20 @@ static void __frwr_recovery_worker(struct work_struct *work) { struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, - r.frmr.fr_work); - struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt; + frmr.fr_work); + struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt; unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - if (ib_dereg_mr(r->r.frmr.fr_mr)) + if (ib_dereg_mr(r->frmr.fr_mr)) goto out_fail; - r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); - if (IS_ERR(r->r.frmr.fr_mr)) + r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); + if (IS_ERR(r->frmr.fr_mr)) goto out_fail; dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); - r->r.frmr.fr_state = FRMR_IS_INVALID; + r->frmr.fr_state = FRMR_IS_INVALID; rpcrdma_put_mw(r_xprt, r); return; @@ -137,15 +137,15 @@ out_fail: static void __frwr_queue_recovery(struct rpcrdma_mw *r) { - INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker); - queue_work(frwr_recovery_wq, &r->r.frmr.fr_work); + INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker); + queue_work(frwr_recovery_wq, &r->frmr.fr_work); } static int __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, unsigned int depth) { - struct rpcrdma_frmr *f = &r->r.frmr; + struct rpcrdma_frmr *f = &r->frmr; int rc; f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); @@ -158,6 +158,8 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, sg_init_table(f->sg, depth); + init_completion(&f->fr_linv_done); + return 0; out_mr_err: @@ -179,11 +181,11 @@ __frwr_release(struct rpcrdma_mw *r) { int rc; - rc = ib_dereg_mr(r->r.frmr.fr_mr); + rc = ib_dereg_mr(r->frmr.fr_mr); if (rc) dprintk("RPC: %s: ib_dereg_mr status %i\n", __func__, rc); - kfree(r->r.frmr.sg); + kfree(r->frmr.sg); } static int @@ -244,39 +246,76 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt) rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth); } -/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs - * to be reset. +static void +__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr, + const char *wr) +{ + frmr->fr_state = FRMR_IS_STALE; + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("rpcrdma: %s: %s (%u/0x%x)\n", + wr, ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); +} + +/** + * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC + * @cq: completion queue (ignored) + * @wc: completed WR * - * WARNING: Only wr_id and status are reliable at this point */ static void -__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r) +frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) { - if (likely(wc->status == IB_WC_SUCCESS)) - return; - - /* WARNING: Only wr_id and status are reliable at this point */ - r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - if (wc->status == IB_WC_WR_FLUSH_ERR) - dprintk("RPC: %s: frmr %p flushed\n", __func__, r); - else - pr_warn("RPC: %s: frmr %p error, status %s (%d)\n", - __func__, r, ib_wc_status_msg(wc->status), wc->status); + struct rpcrdma_frmr *frmr; + struct ib_cqe *cqe; - r->r.frmr.fr_state = FRMR_IS_STALE; + /* WARNING: Only wr_cqe and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS) { + cqe = wc->wr_cqe; + frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); + __frwr_sendcompletion_flush(wc, frmr, "fastreg"); + } } +/** + * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + */ static void -frwr_sendcompletion(struct ib_wc *wc) +frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) { - struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - struct rpcrdma_frmr *f = &r->r.frmr; + struct rpcrdma_frmr *frmr; + struct ib_cqe *cqe; - if (unlikely(wc->status != IB_WC_SUCCESS)) - __frwr_sendcompletion_flush(wc, r); + /* WARNING: Only wr_cqe and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS) { + cqe = wc->wr_cqe; + frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); + __frwr_sendcompletion_flush(wc, frmr, "localinv"); + } +} - if (f->fr_waiter) - complete(&f->fr_linv_done); +/** + * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + * Awaken anyone waiting for an MR to finish being fenced. + */ +static void +frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) +{ + struct rpcrdma_frmr *frmr; + struct ib_cqe *cqe; + + /* WARNING: Only wr_cqe and status are reliable at this point */ + cqe = wc->wr_cqe; + frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); + if (wc->status != IB_WC_SUCCESS) + __frwr_sendcompletion_flush(wc, frmr, "localinv"); + complete_all(&frmr->fr_linv_done); } static int @@ -313,8 +352,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt) list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); - r->mw_sendcompletion = frwr_sendcompletion; - r->r.frmr.fr_xprt = r_xprt; + r->frmr.fr_xprt = r_xprt; } return 0; @@ -347,10 +385,9 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, mw = rpcrdma_get_mw(r_xprt); if (!mw) return -ENOMEM; - } while (mw->r.frmr.fr_state != FRMR_IS_INVALID); - frmr = &mw->r.frmr; + } while (mw->frmr.fr_state != FRMR_IS_INVALID); + frmr = &mw->frmr; frmr->fr_state = FRMR_IS_VALID; - frmr->fr_waiter = false; mr = frmr->fr_mr; reg_wr = &frmr->fr_regwr; @@ -400,7 +437,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, reg_wr->wr.next = NULL; reg_wr->wr.opcode = IB_WR_REG_MR; - reg_wr->wr.wr_id = (uintptr_t)mw; + frmr->fr_cqe.done = frwr_wc_fastreg; + reg_wr->wr.wr_cqe = &frmr->fr_cqe; reg_wr->wr.num_sge = 0; reg_wr->wr.send_flags = 0; reg_wr->mr = mr; @@ -434,15 +472,15 @@ static struct ib_send_wr * __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) { struct rpcrdma_mw *mw = seg->rl_mw; - struct rpcrdma_frmr *f = &mw->r.frmr; + struct rpcrdma_frmr *f = &mw->frmr; struct ib_send_wr *invalidate_wr; - f->fr_waiter = false; f->fr_state = FRMR_IS_INVALID; invalidate_wr = &f->fr_invwr; memset(invalidate_wr, 0, sizeof(*invalidate_wr)); - invalidate_wr->wr_id = (unsigned long)(void *)mw; + f->fr_cqe.done = frwr_wc_localinv; + invalidate_wr->wr_cqe = &f->fr_cqe; invalidate_wr->opcode = IB_WR_LOCAL_INV; invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey; @@ -455,7 +493,7 @@ __frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, { struct ib_device *device = r_xprt->rx_ia.ri_device; struct rpcrdma_mw *mw = seg->rl_mw; - struct rpcrdma_frmr *f = &mw->r.frmr; + struct rpcrdma_frmr *f = &mw->frmr; seg->rl_mw = NULL; @@ -504,15 +542,15 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) i += seg->mr_nsegs; } - f = &seg->rl_mw->r.frmr; + f = &seg->rl_mw->frmr; /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain * are complete. */ f->fr_invwr.send_flags = IB_SEND_SIGNALED; - f->fr_waiter = true; - init_completion(&f->fr_linv_done); + f->fr_cqe.done = frwr_wc_localinv_wake; + reinit_completion(&f->fr_linv_done); INIT_CQCOUNT(&r_xprt->rx_ep); /* Transport disconnect drains the receive CQ before it @@ -520,14 +558,18 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * unless ri_id->qp is a valid pointer. */ rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr); - if (rc) + if (rc) { pr_warn("%s: ib_post_send failed %i\n", __func__, rc); + rdma_disconnect(ia->ri_id); + goto unmap; + } wait_for_completion(&f->fr_linv_done); /* ORDER: Now DMA unmap all of the req's MRs, and return * them to the free MW list. */ +unmap: for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { seg = &req->rl_segments[i]; @@ -549,7 +591,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) struct rpcrdma_mr_seg *seg1 = seg; struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_mw *mw = seg1->rl_mw; - struct rpcrdma_frmr *frmr = &mw->r.frmr; + struct rpcrdma_frmr *frmr = &mw->frmr; struct ib_send_wr *invalidate_wr, *bad_wr; int rc, nsegs = seg->mr_nsegs; @@ -557,10 +599,11 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) seg1->rl_mw = NULL; frmr->fr_state = FRMR_IS_INVALID; - invalidate_wr = &mw->r.frmr.fr_invwr; + invalidate_wr = &mw->frmr.fr_invwr; memset(invalidate_wr, 0, sizeof(*invalidate_wr)); - invalidate_wr->wr_id = (uintptr_t)mw; + frmr->fr_cqe.done = frwr_wc_localinv; + invalidate_wr->wr_cqe = &frmr->fr_cqe; invalidate_wr->opcode = IB_WR_LOCAL_INV; invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey; DECR_CQCOUNT(&r_xprt->rx_ep); diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c index dbb302e..481b9b6 100644 --- a/net/sunrpc/xprtrdma/physical_ops.c +++ b/net/sunrpc/xprtrdma/physical_ops.c @@ -68,7 +68,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing)); seg->mr_rkey = ia->ri_dma_mr->rkey; seg->mr_base = seg->mr_dma; - seg->mr_nsegs = 1; return 1; } diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 0f28f2d..888823b 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -132,6 +132,33 @@ rpcrdma_tail_pullup(struct xdr_buf *buf) return tlen; } +/* Split "vec" on page boundaries into segments. FMR registers pages, + * not a byte range. Other modes coalesce these segments into a single + * MR when they can. + */ +static int +rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, + int n, int nsegs) +{ + size_t page_offset; + u32 remaining; + char *base; + + base = vec->iov_base; + page_offset = offset_in_page(base); + remaining = vec->iov_len; + while (remaining && n < nsegs) { + seg[n].mr_page = NULL; + seg[n].mr_offset = base; + seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); + remaining -= seg[n].mr_len; + base += seg[n].mr_len; + ++n; + page_offset = 0; + } + return n; +} + /* * Chunk assembly from upper layer xdr_buf. * @@ -150,11 +177,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, int page_base; struct page **ppages; - if (pos == 0 && xdrbuf->head[0].iov_len) { - seg[n].mr_page = NULL; - seg[n].mr_offset = xdrbuf->head[0].iov_base; - seg[n].mr_len = xdrbuf->head[0].iov_len; - ++n; + if (pos == 0) { + n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs); + if (n == nsegs) + return -EIO; } len = xdrbuf->page_len; @@ -192,13 +218,9 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, * xdr pad bytes, saving the server an RDMA operation. */ if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) return n; + n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs); if (n == nsegs) - /* Tail remains, but we're out of segments */ return -EIO; - seg[n].mr_page = NULL; - seg[n].mr_offset = xdrbuf->tail[0].iov_base; - seg[n].mr_len = xdrbuf->tail[0].iov_len; - ++n; } return n; @@ -773,20 +795,17 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; struct rpc_xprt *xprt = &r_xprt->rx_xprt; __be32 *iptr; - int rdmalen, status; + int rdmalen, status, rmerr; unsigned long cwnd; - u32 credits; dprintk("RPC: %s: incoming rep %p\n", __func__, rep); if (rep->rr_len == RPCRDMA_BAD_LEN) goto out_badstatus; - if (rep->rr_len < RPCRDMA_HDRLEN_MIN) + if (rep->rr_len < RPCRDMA_HDRLEN_ERR) goto out_shortreply; headerp = rdmab_to_msg(rep->rr_rdmabuf); - if (headerp->rm_vers != rpcrdma_version) - goto out_badversion; #if defined(CONFIG_SUNRPC_BACKCHANNEL) if (rpcrdma_is_bcall(headerp)) goto out_bcall; @@ -809,15 +828,16 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) */ list_del_init(&rqst->rq_list); spin_unlock_bh(&xprt->transport_lock); - dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" - " RPC request 0x%p xid 0x%08x\n", - __func__, rep, req, rqst, - be32_to_cpu(headerp->rm_xid)); + dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", + __func__, rep, req, be32_to_cpu(headerp->rm_xid)); /* from here on, the reply is no longer an orphan */ req->rl_reply = rep; xprt->reestablish_timeout = 0; + if (headerp->rm_vers != rpcrdma_version) + goto out_badversion; + /* check for expected message types */ /* The order of some of these tests is important. */ switch (headerp->rm_type) { @@ -878,6 +898,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) status = rdmalen; break; + case rdma_error: + goto out_rdmaerr; + badheader: default: dprintk("%s: invalid rpcrdma reply header (type %d):" @@ -893,6 +916,7 @@ badheader: break; } +out: /* Invalidate and flush the data payloads before waking the * waiting application. This guarantees the memory region is * properly fenced from the server before the application @@ -903,15 +927,9 @@ badheader: if (req->rl_nchunks) r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req); - credits = be32_to_cpu(headerp->rm_credit); - if (credits == 0) - credits = 1; /* don't deadlock */ - else if (credits > r_xprt->rx_buf.rb_max_requests) - credits = r_xprt->rx_buf.rb_max_requests; - spin_lock_bh(&xprt->transport_lock); cwnd = xprt->cwnd; - xprt->cwnd = credits << RPC_CWNDSHIFT; + xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; if (xprt->cwnd > cwnd) xprt_release_rqst_cong(rqst->rq_task); @@ -935,13 +953,43 @@ out_bcall: return; #endif -out_shortreply: - dprintk("RPC: %s: short/invalid reply\n", __func__); - goto repost; - +/* If the incoming reply terminated a pending RPC, the next + * RPC call will post a replacement receive buffer as it is + * being marshaled. + */ out_badversion: dprintk("RPC: %s: invalid version %d\n", __func__, be32_to_cpu(headerp->rm_vers)); + status = -EIO; + r_xprt->rx_stats.bad_reply_count++; + goto out; + +out_rdmaerr: + rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err); + switch (rmerr) { + case ERR_VERS: + pr_err("%s: server reports header version error (%u-%u)\n", + __func__, + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low), + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high)); + break; + case ERR_CHUNK: + pr_err("%s: server reports header decoding error\n", + __func__); + break; + default: + pr_err("%s: server reports unknown error %d\n", + __func__, rmerr); + } + status = -EREMOTEIO; + r_xprt->rx_stats.bad_reply_count++; + goto out; + +/* If no pending RPC transaction was matched, post a replacement + * receive buffer before returning. + */ +out_shortreply: + dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; out_nomatch: diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 878f1bf..f5ed9f9 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -112,89 +112,65 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) } } +/** + * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + */ static void -rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context) +rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) { - struct rpcrdma_ep *ep = context; - - pr_err("RPC: %s: %s on device %s ep %p\n", - __func__, ib_event_msg(event->event), - event->device->name, context); - if (ep->rep_connected == 1) { - ep->rep_connected = -EIO; - rpcrdma_conn_func(ep); - wake_up_all(&ep->rep_connect_wait); - } + /* WARNING: Only wr_cqe and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("rpcrdma: Send: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); } static void -rpcrdma_sendcq_process_wc(struct ib_wc *wc) +rpcrdma_receive_worker(struct work_struct *work) { - /* WARNING: Only wr_id and status are reliable at this point */ - if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) { - if (wc->status != IB_WC_SUCCESS && - wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("RPC: %s: SEND: %s\n", - __func__, ib_wc_status_msg(wc->status)); - } else { - struct rpcrdma_mw *r; + struct rpcrdma_rep *rep = + container_of(work, struct rpcrdma_rep, rr_work); - r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - r->mw_sendcompletion(wc); - } + rpcrdma_reply_handler(rep); } -/* The common case is a single send completion is waiting. By - * passing two WC entries to ib_poll_cq, a return code of 1 - * means there is exactly one WC waiting and no more. We don't - * have to invoke ib_poll_cq again to know that the CQ has been - * properly drained. +/* Perform basic sanity checking to avoid using garbage + * to update the credit grant value. */ static void -rpcrdma_sendcq_poll(struct ib_cq *cq) +rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) { - struct ib_wc *pos, wcs[2]; - int count, rc; + struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); + struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; + u32 credits; - do { - pos = wcs; + if (rep->rr_len < RPCRDMA_HDRLEN_ERR) + return; - rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); - if (rc < 0) - break; + credits = be32_to_cpu(rmsgp->rm_credit); + if (credits == 0) + credits = 1; /* don't deadlock */ + else if (credits > buffer->rb_max_requests) + credits = buffer->rb_max_requests; - count = rc; - while (count-- > 0) - rpcrdma_sendcq_process_wc(pos++); - } while (rc == ARRAY_SIZE(wcs)); - return; + atomic_set(&buffer->rb_credits, credits); } -/* Handle provider send completion upcalls. +/** + * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC + * @cq: completion queue (ignored) + * @wc: completed WR + * */ static void -rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) +rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc) { - do { - rpcrdma_sendcq_poll(cq); - } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | - IB_CQ_REPORT_MISSED_EVENTS) > 0); -} - -static void -rpcrdma_receive_worker(struct work_struct *work) -{ - struct rpcrdma_rep *rep = - container_of(work, struct rpcrdma_rep, rr_work); - - rpcrdma_reply_handler(rep); -} - -static void -rpcrdma_recvcq_process_wc(struct ib_wc *wc) -{ - struct rpcrdma_rep *rep = - (struct rpcrdma_rep *)(unsigned long)wc->wr_id; + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, + rr_cqe); /* WARNING: Only wr_id and status are reliable at this point */ if (wc->status != IB_WC_SUCCESS) @@ -211,7 +187,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc) ib_dma_sync_single_for_cpu(rep->rr_device, rdmab_addr(rep->rr_rdmabuf), rep->rr_len, DMA_FROM_DEVICE); - prefetch(rdmab_to_msg(rep->rr_rdmabuf)); + + rpcrdma_update_granted_credits(rep); out_schedule: queue_work(rpcrdma_receive_wq, &rep->rr_work); @@ -219,57 +196,20 @@ out_schedule: out_fail: if (wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("RPC: %s: rep %p: %s\n", - __func__, rep, ib_wc_status_msg(wc->status)); + pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); rep->rr_len = RPCRDMA_BAD_LEN; goto out_schedule; } -/* The wc array is on stack: automatic memory is always CPU-local. - * - * struct ib_wc is 64 bytes, making the poll array potentially - * large. But this is at the bottom of the call chain. Further - * substantial work is done in another thread. - */ -static void -rpcrdma_recvcq_poll(struct ib_cq *cq) -{ - struct ib_wc *pos, wcs[4]; - int count, rc; - - do { - pos = wcs; - - rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); - if (rc < 0) - break; - - count = rc; - while (count-- > 0) - rpcrdma_recvcq_process_wc(pos++); - } while (rc == ARRAY_SIZE(wcs)); -} - -/* Handle provider receive completion upcalls. - */ -static void -rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) -{ - do { - rpcrdma_recvcq_poll(cq); - } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | - IB_CQ_REPORT_MISSED_EVENTS) > 0); -} - static void rpcrdma_flush_cqs(struct rpcrdma_ep *ep) { struct ib_wc wc; while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_recvcq_process_wc(&wc); - while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) - rpcrdma_sendcq_process_wc(&wc); + rpcrdma_receive_wc(NULL, &wc); } static int @@ -330,6 +270,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) connected: dprintk("RPC: %s: %sconnected\n", __func__, connstate > 0 ? "" : "dis"); + atomic_set(&xprt->rx_buf.rb_credits, 1); ep->rep_connected = connstate; rpcrdma_conn_func(ep); wake_up_all(&ep->rep_connect_wait); @@ -560,9 +501,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) { struct ib_cq *sendcq, *recvcq; - struct ib_cq_init_attr cq_attr = {}; unsigned int max_qp_wr; - int rc, err; + int rc; if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) { dprintk("RPC: %s: insufficient sge's available\n", @@ -614,9 +554,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, init_waitqueue_head(&ep->rep_connect_wait); INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); - cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; - sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, - rpcrdma_cq_async_error_upcall, NULL, &cq_attr); + sendcq = ib_alloc_cq(ia->ri_device, NULL, + ep->rep_attr.cap.max_send_wr + 1, + 0, IB_POLL_SOFTIRQ); if (IS_ERR(sendcq)) { rc = PTR_ERR(sendcq); dprintk("RPC: %s: failed to create send CQ: %i\n", @@ -624,16 +564,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, goto out1; } - rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP); - if (rc) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - goto out2; - } - - cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; - recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, - rpcrdma_cq_async_error_upcall, NULL, &cq_attr); + recvcq = ib_alloc_cq(ia->ri_device, NULL, + ep->rep_attr.cap.max_recv_wr + 1, + 0, IB_POLL_SOFTIRQ); if (IS_ERR(recvcq)) { rc = PTR_ERR(recvcq); dprintk("RPC: %s: failed to create recv CQ: %i\n", @@ -641,14 +574,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, goto out2; } - rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP); - if (rc) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - ib_destroy_cq(recvcq); - goto out2; - } - ep->rep_attr.send_cq = sendcq; ep->rep_attr.recv_cq = recvcq; @@ -673,10 +598,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, return 0; out2: - err = ib_destroy_cq(sendcq); - if (err) - dprintk("RPC: %s: ib_destroy_cq returned %i\n", - __func__, err); + ib_free_cq(sendcq); out1: if (ia->ri_dma_mr) ib_dereg_mr(ia->ri_dma_mr); @@ -711,15 +633,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ia->ri_id->qp = NULL; } - rc = ib_destroy_cq(ep->rep_attr.recv_cq); - if (rc) - dprintk("RPC: %s: ib_destroy_cq returned %i\n", - __func__, rc); - - rc = ib_destroy_cq(ep->rep_attr.send_cq); - if (rc) - dprintk("RPC: %s: ib_destroy_cq returned %i\n", - __func__, rc); + ib_free_cq(ep->rep_attr.recv_cq); + ib_free_cq(ep->rep_attr.send_cq); if (ia->ri_dma_mr) { rc = ib_dereg_mr(ia->ri_dma_mr); @@ -898,6 +813,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) spin_lock(&buffer->rb_reqslock); list_add(&req->rl_all, &buffer->rb_allreqs); spin_unlock(&buffer->rb_reqslock); + req->rl_cqe.done = rpcrdma_wc_send; req->rl_buffer = &r_xprt->rx_buf; return req; } @@ -923,6 +839,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) } rep->rr_device = ia->ri_device; + rep->rr_cqe.done = rpcrdma_receive_wc; rep->rr_rxprt = r_xprt; INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); return rep; @@ -943,6 +860,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) buf->rb_max_requests = r_xprt->rx_data.max_requests; buf->rb_bc_srv_max_requests = 0; spin_lock_init(&buf->rb_lock); + atomic_set(&buf->rb_credits, 1); rc = ia->ri_ops->ro_init(r_xprt); if (rc) @@ -1259,7 +1177,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, } send_wr.next = NULL; - send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION; + send_wr.wr_cqe = &req->rl_cqe; send_wr.sg_list = iov; send_wr.num_sge = req->rl_niovs; send_wr.opcode = IB_WR_SEND; @@ -1297,7 +1215,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, int rc; recv_wr.next = NULL; - recv_wr.wr_id = (u64) (unsigned long) rep; + recv_wr.wr_cqe = &rep->rr_cqe; recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; recv_wr.num_sge = 1; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 38fe11b..2ebc743 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -95,10 +95,6 @@ struct rpcrdma_ep { #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit) #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount) -/* Force completion handler to ignore the signal - */ -#define RPCRDMA_IGNORE_COMPLETION (0ULL) - /* Pre-allocate extra Work Requests for handling backward receives * and sends. This is a fixed value because the Work Queues are * allocated when the forward channel is set up. @@ -171,6 +167,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) struct rpcrdma_buffer; struct rpcrdma_rep { + struct ib_cqe rr_cqe; unsigned int rr_len; struct ib_device *rr_device; struct rpcrdma_xprt *rr_rxprt; @@ -204,11 +201,11 @@ struct rpcrdma_frmr { struct scatterlist *sg; int sg_nents; struct ib_mr *fr_mr; + struct ib_cqe fr_cqe; enum rpcrdma_frmr_state fr_state; + struct completion fr_linv_done; struct work_struct fr_work; struct rpcrdma_xprt *fr_xprt; - bool fr_waiter; - struct completion fr_linv_done;; union { struct ib_reg_wr fr_regwr; struct ib_send_wr fr_invwr; @@ -224,8 +221,7 @@ struct rpcrdma_mw { union { struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; - } r; - void (*mw_sendcompletion)(struct ib_wc *); + }; struct list_head mw_list; struct list_head mw_all; }; @@ -281,6 +277,7 @@ struct rpcrdma_req { struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + struct ib_cqe rl_cqe; struct list_head rl_all; bool rl_backchannel; }; @@ -311,6 +308,7 @@ struct rpcrdma_buffer { struct list_head rb_send_bufs; struct list_head rb_recv_bufs; u32 rb_max_requests; + atomic_t rb_credits; /* most recent credit grant */ u32 rb_bc_srv_max_requests; spinlock_t rb_reqslock; /* protect rb_allreqs */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index fde2138..65e7595 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1844,9 +1844,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock) */ static void xs_local_rpcbind(struct rpc_task *task) { - rcu_read_lock(); - xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt)); - rcu_read_unlock(); + xprt_set_bound(task->tk_xprt); } static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port) |