From aae2006e9b0c294114915c13022fa348e1a88023 Mon Sep 17 00:00:00 2001 From: Andy Adamson Date: Wed, 1 Apr 2009 09:22:40 -0400 Subject: nfs41: sunrpc: Export the call prepare state for session reset Signed-off-by: Andy Adamson Signed-off-by: Benny Halevy Signed-off-by: Trond Myklebust --- net/sunrpc/clnt.c | 13 +++++++++++++ net/sunrpc/sched.c | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 5abab09..d00e813 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -695,6 +695,19 @@ void rpc_force_rebind(struct rpc_clnt *clnt) EXPORT_SYMBOL_GPL(rpc_force_rebind); /* + * Restart an (async) RPC call from the call_prepare state. + * Usually called from within the exit handler. + */ +void +rpc_restart_call_prepare(struct rpc_task *task) +{ + if (RPC_ASSASSINATED(task)) + return; + task->tk_action = rpc_prepare_task; +} +EXPORT_SYMBOL_GPL(rpc_restart_call_prepare); + +/* * Restart an (async) RPC call. Usually called from within the * exit handler. */ diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index ff50a05..1102ce1 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -569,7 +569,7 @@ EXPORT_SYMBOL_GPL(rpc_delay); /* * Helper to call task->tk_ops->rpc_call_prepare */ -static void rpc_prepare_task(struct rpc_task *task) +void rpc_prepare_task(struct rpc_task *task) { task->tk_ops->rpc_call_prepare(task, task->tk_calldata); } -- cgit v1.1 From 18dca02aeb3c49dfce87c76be643b139d05cf647 Mon Sep 17 00:00:00 2001 From: Ricardo Labiaga Date: Wed, 1 Apr 2009 09:22:53 -0400 Subject: nfs41: Add ability to read RPC call direction on TCP stream. NFSv4.1 callbacks can arrive over an existing connection. This patch adds the logic to read the RPC call direction (call or reply). It does this by updating the state machine to look for the call direction invoking xs_tcp_read_calldir(...) after reading the XID. [nfs41: Keep track of RPC call/reply direction with a flag] As per 11/14/08 review of RFC 53/85. Add a new flag to track whether the incoming message is an RPC call or an RPC reply. TCP_RPC_REPLY is set in the 'struct sock_xprt' tcp_flags in xs_tcp_read_calldir() if the message is an RPC reply sent on the forechannel. It is cleared if the message is an RPC request sent on the back channel. Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/xprtsock.c | 52 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index e185961..8975c10 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -270,6 +270,12 @@ struct sock_xprt { #define TCP_RCV_COPY_FRAGHDR (1UL << 1) #define TCP_RCV_COPY_XID (1UL << 2) #define TCP_RCV_COPY_DATA (1UL << 3) +#define TCP_RCV_COPY_CALLDIR (1UL << 4) + +/* + * TCP RPC flags + */ +#define TCP_RPC_REPLY (1UL << 5) static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) { @@ -956,7 +962,7 @@ static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_rea transport->tcp_offset = 0; /* Sanity check of the record length */ - if (unlikely(transport->tcp_reclen < 4)) { + if (unlikely(transport->tcp_reclen < 8)) { dprintk("RPC: invalid TCP record fragment length\n"); xprt_force_disconnect(xprt); return; @@ -991,13 +997,48 @@ static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_r if (used != len) return; transport->tcp_flags &= ~TCP_RCV_COPY_XID; - transport->tcp_flags |= TCP_RCV_COPY_DATA; + transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; transport->tcp_copied = 4; - dprintk("RPC: reading reply for XID %08x\n", + dprintk("RPC: reading %s XID %08x\n", + (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" + : "request with", ntohl(transport->tcp_xid)); xs_tcp_check_fraghdr(transport); } +static inline void xs_tcp_read_calldir(struct sock_xprt *transport, + struct xdr_skb_reader *desc) +{ + size_t len, used; + u32 offset; + __be32 calldir; + + /* + * We want transport->tcp_offset to be 8 at the end of this routine + * (4 bytes for the xid and 4 bytes for the call/reply flag). + * When this function is called for the first time, + * transport->tcp_offset is 4 (after having already read the xid). + */ + offset = transport->tcp_offset - sizeof(transport->tcp_xid); + len = sizeof(calldir) - offset; + dprintk("RPC: reading CALL/REPLY flag (%Zu bytes)\n", len); + used = xdr_skb_read_bits(desc, &calldir, len); + transport->tcp_offset += used; + if (used != len) + return; + transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; + transport->tcp_flags |= TCP_RCV_COPY_DATA; + transport->tcp_copied += 4; + if (ntohl(calldir) == RPC_REPLY) + transport->tcp_flags |= TCP_RPC_REPLY; + else + transport->tcp_flags &= ~TCP_RPC_REPLY; + dprintk("RPC: reading %s CALL/REPLY flag %08x\n", + (transport->tcp_flags & TCP_RPC_REPLY) ? + "reply for" : "request with", calldir); + xs_tcp_check_fraghdr(transport); +} + static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); @@ -1114,6 +1155,11 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns xs_tcp_read_xid(transport, &desc); continue; } + /* Read in the call/reply flag */ + if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { + xs_tcp_read_calldir(transport, &desc); + continue; + } /* Read in the request data */ if (transport->tcp_flags & TCP_RCV_COPY_DATA) { xs_tcp_read_request(xprt, &desc); -- cgit v1.1 From f4a2e418bfd03a1f25f515e8a92ecd584d96cfc1 Mon Sep 17 00:00:00 2001 From: Ricardo Labiaga Date: Wed, 1 Apr 2009 09:22:54 -0400 Subject: nfs41: Process the RPC call direction Reading and storing the RPC direction is a three step process. 1. xs_tcp_read_calldir() reads the RPC direction, but it will not store it in the XDR buffer since the 'struct rpc_rqst' is not yet available. 2. The 'struct rpc_rqst' is obtained during the TCP_RCV_COPY_DATA state. This state need not necessarily be preceeded by the TCP_RCV_READ_CALLDIR. For example, we may be reading a continuation packet to a large reply. Therefore, we can't simply obtain the 'struct rpc_rqst' during the TCP_RCV_READ_CALLDIR state and assume it's available during TCP_RCV_COPY_DATA. This patch adds a new TCP_RCV_READ_CALLDIR flag to indicate the need to read the RPC direction. It then uses TCP_RCV_COPY_CALLDIR to indicate the RPC direction needs to be saved after the 'struct rpc_rqst' has been allocated. 3. The 'struct rpc_rqst' is obtained by the xs_tcp_read_data() helper functions. xs_tcp_read_common() then saves the RPC direction in the XDR buffer if TCP_RCV_COPY_CALLDIR is set. This will happen when we're reading the data immediately after the direction was read. xs_tcp_read_common() then clears this flag. [was nfs41: Skip past the RPC call direction] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy [nfs41: sunrpc: Add RPC direction back into the XDR buffer] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy [nfs41: sunrpc: Don't skip past the RPC call direction] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/clnt.c | 5 +++-- net/sunrpc/xprtsock.c | 31 +++++++++++++++++++++++++------ 2 files changed, 28 insertions(+), 8 deletions(-) (limited to 'net') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index d00e813..aca3ab6 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1390,13 +1390,14 @@ rpc_verify_header(struct rpc_task *task) } if ((len -= 3) < 0) goto out_overflow; - p += 1; /* skip XID */ + p += 1; /* skip XID */ if ((n = ntohl(*p++)) != RPC_REPLY) { dprintk("RPC: %5u %s: not an RPC reply: %x\n", - task->tk_pid, __func__, n); + task->tk_pid, __func__, n); goto out_garbage; } + if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) { if (--len < 0) goto out_overflow; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 8975c10..a48df14 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -270,12 +270,13 @@ struct sock_xprt { #define TCP_RCV_COPY_FRAGHDR (1UL << 1) #define TCP_RCV_COPY_XID (1UL << 2) #define TCP_RCV_COPY_DATA (1UL << 3) -#define TCP_RCV_COPY_CALLDIR (1UL << 4) +#define TCP_RCV_READ_CALLDIR (1UL << 4) +#define TCP_RCV_COPY_CALLDIR (1UL << 5) /* * TCP RPC flags */ -#define TCP_RPC_REPLY (1UL << 5) +#define TCP_RPC_REPLY (1UL << 6) static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) { @@ -997,7 +998,7 @@ static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_r if (used != len) return; transport->tcp_flags &= ~TCP_RCV_COPY_XID; - transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; + transport->tcp_flags |= TCP_RCV_READ_CALLDIR; transport->tcp_copied = 4; dprintk("RPC: reading %s XID %08x\n", (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" @@ -1026,9 +1027,13 @@ static inline void xs_tcp_read_calldir(struct sock_xprt *transport, transport->tcp_offset += used; if (used != len) return; - transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; + transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR; + transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; transport->tcp_flags |= TCP_RCV_COPY_DATA; - transport->tcp_copied += 4; + /* + * We don't yet have the XDR buffer, so we will write the calldir + * out after we get the buffer from the 'struct rpc_rqst' + */ if (ntohl(calldir) == RPC_REPLY) transport->tcp_flags |= TCP_RPC_REPLY; else @@ -1059,6 +1064,20 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea } rcvbuf = &req->rq_private_buf; + + if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { + /* + * Save the RPC direction in the XDR buffer + */ + __be32 calldir = transport->tcp_flags & TCP_RPC_REPLY ? + htonl(RPC_REPLY) : 0; + + memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied, + &calldir, sizeof(calldir)); + transport->tcp_copied += sizeof(calldir); + transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; + } + len = desc->count; if (len > transport->tcp_reclen - transport->tcp_offset) { struct xdr_skb_reader my_desc; @@ -1156,7 +1175,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns continue; } /* Read in the call/reply flag */ - if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { + if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) { xs_tcp_read_calldir(transport, &desc); continue; } -- cgit v1.1 From f9acac1a4710ce88871f1ae323fc91c1cb6e9d52 Mon Sep 17 00:00:00 2001 From: Ricardo Labiaga Date: Wed, 1 Apr 2009 09:22:59 -0400 Subject: nfs41: Initialize new rpc_xprt callback related fields Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/xprt.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'net') diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 06ca058..52739f8 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1049,6 +1049,11 @@ found: INIT_LIST_HEAD(&xprt->free); INIT_LIST_HEAD(&xprt->recv); +#if defined(CONFIG_NFS_V4_1) + spin_lock_init(&xprt->bc_pa_lock); + INIT_LIST_HEAD(&xprt->bc_pa_list); +#endif /* CONFIG_NFS_V4_1 */ + INIT_WORK(&xprt->task_cleanup, xprt_autoclose); setup_timer(&xprt->timer, xprt_init_autodisconnect, (unsigned long)xprt); -- cgit v1.1 From fb7a0b9addbdbbb13b7bc02abf55ee524ea19ce1 Mon Sep 17 00:00:00 2001 From: Ricardo Labiaga Date: Wed, 1 Apr 2009 09:23:00 -0400 Subject: nfs41: New backchannel helper routines This patch introduces support to setup the callback xprt on the client side. It allocates/ destroys the preallocated memory structures used to process backchannel requests. At setup time, xprt_setup_backchannel() is invoked to allocate one or more rpc_rqst structures and substructures. This ensures that they are available when an RPC callback arrives. The rpc_rqst structures are maintained in a linked list attached to the rpc_xprt structure. We keep track of the number of allocations so that they can be correctly removed when the channel is destroyed. When an RPC callback arrives, xprt_alloc_bc_request() is invoked to obtain a preallocated rpc_rqst structure. An rpc_xprt structure is returned, and its RPC_BC_PREALLOC_IN_USE bit is set in rpc_xprt->bc_flags. The structure is removed from the the list since it is now in use, and it will be later added back when its user is done with it. After the RPC callback replies, the rpc_rqst structure is returned by invoking xprt_free_bc_request(). This clears the RPC_BC_PREALLOC_IN_USE bit and adds it back to the list, allowing it to be reused by a subsequent RPC callback request. To be consistent with the reception of RPC messages, the backchannel requests should be placed into the 'struct rpc_rqst' rq_rcv_buf, which is then in turn copied to the 'struct rpc_rqst' rq_private_buf. [nfs41: Preallocate rpc_rqst receive buffer for handling callbacks] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy [Update copyright notice and explain page allocation] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/Makefile | 1 + net/sunrpc/backchannel_rqst.c | 278 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 279 insertions(+) create mode 100644 net/sunrpc/backchannel_rqst.c (limited to 'net') diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile index 5369aa3..4a01f96 100644 --- a/net/sunrpc/Makefile +++ b/net/sunrpc/Makefile @@ -13,5 +13,6 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \ rpcb_clnt.o timer.o xdr.o \ sunrpc_syms.o cache.o rpc_pipe.o \ svc_xprt.o +sunrpc-$(CONFIG_NFS_V4_1) += backchannel_rqst.o sunrpc-$(CONFIG_PROC_FS) += stats.o sunrpc-$(CONFIG_SYSCTL) += sysctl.o diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c new file mode 100644 index 0000000..f56e18a --- /dev/null +++ b/net/sunrpc/backchannel_rqst.c @@ -0,0 +1,278 @@ +/****************************************************************************** + +(c) 2007 Network Appliance, Inc. All Rights Reserved. +(c) 2009 NetApp. All Rights Reserved. + +NetApp provides this source code under the GPL v2 License. +The GPL v2 license is available at +http://opensource.org/licenses/gpl-license.php. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +******************************************************************************/ + +#include +#include + +#ifdef RPC_DEBUG +#define RPCDBG_FACILITY RPCDBG_TRANS +#endif + +#if defined(CONFIG_NFS_V4_1) + +/* + * Helper routines that track the number of preallocation elements + * on the transport. + */ +static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) +{ + return xprt->bc_alloc_count > 0; +} + +static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n) +{ + xprt->bc_alloc_count += n; +} + +static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n) +{ + return xprt->bc_alloc_count -= n; +} + +/* + * Free the preallocated rpc_rqst structure and the memory + * buffers hanging off of it. + */ +static void xprt_free_allocation(struct rpc_rqst *req) +{ + struct xdr_buf *xbufp; + + dprintk("RPC: free allocations for req= %p\n", req); + BUG_ON(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); + xbufp = &req->rq_private_buf; + free_page((unsigned long)xbufp->head[0].iov_base); + xbufp = &req->rq_snd_buf; + free_page((unsigned long)xbufp->head[0].iov_base); + list_del(&req->rq_bc_pa_list); + kfree(req); +} + +/* + * Preallocate up to min_reqs structures and related buffers for use + * by the backchannel. This function can be called multiple times + * when creating new sessions that use the same rpc_xprt. The + * preallocated buffers are added to the pool of resources used by + * the rpc_xprt. Anyone of these resources may be used used by an + * incoming callback request. It's up to the higher levels in the + * stack to enforce that the maximum number of session slots is not + * being exceeded. + * + * Some callback arguments can be large. For example, a pNFS server + * using multiple deviceids. The list can be unbound, but the client + * has the ability to tell the server the maximum size of the callback + * requests. Each deviceID is 16 bytes, so allocate one page + * for the arguments to have enough room to receive a number of these + * deviceIDs. The NFS client indicates to the pNFS server that its + * callback requests can be up to 4096 bytes in size. + */ +int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) +{ + struct page *page_rcv = NULL, *page_snd = NULL; + struct xdr_buf *xbufp = NULL; + struct rpc_rqst *req, *tmp; + struct list_head tmp_list; + int i; + + dprintk("RPC: setup backchannel transport\n"); + + /* + * We use a temporary list to keep track of the preallocated + * buffers. Once we're done building the list we splice it + * into the backchannel preallocation list off of the rpc_xprt + * struct. This helps minimize the amount of time the list + * lock is held on the rpc_xprt struct. It also makes cleanup + * easier in case of memory allocation errors. + */ + INIT_LIST_HEAD(&tmp_list); + for (i = 0; i < min_reqs; i++) { + /* Pre-allocate one backchannel rpc_rqst */ + req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); + if (req == NULL) { + printk(KERN_ERR "Failed to create bc rpc_rqst\n"); + goto out_free; + } + + /* Add the allocated buffer to the tmp list */ + dprintk("RPC: adding req= %p\n", req); + list_add(&req->rq_bc_pa_list, &tmp_list); + + req->rq_xprt = xprt; + INIT_LIST_HEAD(&req->rq_list); + INIT_LIST_HEAD(&req->rq_bc_list); + + /* Preallocate one XDR receive buffer */ + page_rcv = alloc_page(GFP_KERNEL); + if (page_rcv == NULL) { + printk(KERN_ERR "Failed to create bc receive xbuf\n"); + goto out_free; + } + xbufp = &req->rq_rcv_buf; + xbufp->head[0].iov_base = page_address(page_rcv); + xbufp->head[0].iov_len = PAGE_SIZE; + xbufp->tail[0].iov_base = NULL; + xbufp->tail[0].iov_len = 0; + xbufp->page_len = 0; + xbufp->len = PAGE_SIZE; + xbufp->buflen = PAGE_SIZE; + + /* Preallocate one XDR send buffer */ + page_snd = alloc_page(GFP_KERNEL); + if (page_snd == NULL) { + printk(KERN_ERR "Failed to create bc snd xbuf\n"); + goto out_free; + } + + xbufp = &req->rq_snd_buf; + xbufp->head[0].iov_base = page_address(page_snd); + xbufp->head[0].iov_len = 0; + xbufp->tail[0].iov_base = NULL; + xbufp->tail[0].iov_len = 0; + xbufp->page_len = 0; + xbufp->len = 0; + xbufp->buflen = PAGE_SIZE; + } + + /* + * Add the temporary list to the backchannel preallocation list + */ + spin_lock_bh(&xprt->bc_pa_lock); + list_splice(&tmp_list, &xprt->bc_pa_list); + xprt_inc_alloc_count(xprt, min_reqs); + spin_unlock_bh(&xprt->bc_pa_lock); + + dprintk("RPC: setup backchannel transport done\n"); + return 0; + +out_free: + /* + * Memory allocation failed, free the temporary list + */ + list_for_each_entry_safe(req, tmp, &tmp_list, rq_bc_pa_list) + xprt_free_allocation(req); + + dprintk("RPC: setup backchannel transport failed\n"); + return -1; +} +EXPORT_SYMBOL(xprt_setup_backchannel); + +/* + * Destroys the backchannel preallocated structures. + * Since these structures may have been allocated by multiple calls + * to xprt_setup_backchannel, we only destroy up to the maximum number + * of reqs specified by the caller. + * @xprt: the transport holding the preallocated strucures + * @max_reqs the maximum number of preallocated structures to destroy + */ +void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) +{ + struct rpc_rqst *req = NULL, *tmp = NULL; + + dprintk("RPC: destroy backchannel transport\n"); + + BUG_ON(max_reqs == 0); + spin_lock_bh(&xprt->bc_pa_lock); + xprt_dec_alloc_count(xprt, max_reqs); + list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { + dprintk("RPC: req=%p\n", req); + xprt_free_allocation(req); + if (--max_reqs == 0) + break; + } + spin_unlock_bh(&xprt->bc_pa_lock); + + dprintk("RPC: backchannel list empty= %s\n", + list_empty(&xprt->bc_pa_list) ? "true" : "false"); +} +EXPORT_SYMBOL(xprt_destroy_backchannel); + +/* + * One or more rpc_rqst structure have been preallocated during the + * backchannel setup. Buffer space for the send and private XDR buffers + * has been preallocated as well. Use xprt_alloc_bc_request to allocate + * to this request. Use xprt_free_bc_request to return it. + * + * Return an available rpc_rqst, otherwise NULL if non are available. + */ +struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt) +{ + struct rpc_rqst *req; + + dprintk("RPC: allocate a backchannel request\n"); + spin_lock_bh(&xprt->bc_pa_lock); + if (!list_empty(&xprt->bc_pa_list)) { + req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, + rq_bc_pa_list); + list_del(&req->rq_bc_pa_list); + } else { + req = NULL; + } + spin_unlock_bh(&xprt->bc_pa_lock); + + if (req != NULL) { + set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); + req->rq_received = 0; + req->rq_bytes_sent = 0; + memcpy(&req->rq_private_buf, &req->rq_rcv_buf, + sizeof(req->rq_private_buf)); + } + dprintk("RPC: backchannel req=%p\n", req); + return req; +} + +/* + * Return the preallocated rpc_rqst structure and XDR buffers + * associated with this rpc_task. + */ +void xprt_free_bc_request(struct rpc_rqst *req) +{ + struct rpc_xprt *xprt = req->rq_xprt; + + dprintk("RPC: free backchannel req=%p\n", req); + + smp_mb__before_clear_bit(); + BUG_ON(!test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); + clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); + smp_mb__after_clear_bit(); + + if (!xprt_need_to_requeue(xprt)) { + /* + * The last remaining session was destroyed while this + * entry was in use. Free the entry and don't attempt + * to add back to the list because there is no need to + * have anymore preallocated entries. + */ + dprintk("RPC: Last session removed req=%p\n", req); + xprt_free_allocation(req); + return; + } + + /* + * Return it to the list of preallocations so that it + * may be reused by a new callback request. + */ + spin_lock_bh(&xprt->bc_pa_lock); + list_add(&req->rq_bc_pa_list, &xprt->bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); +} + +#endif /* CONFIG_NFS_V4_1 */ -- cgit v1.1 From 44b98efdd0a205bdca2cb63493350d06ff6804b1 Mon Sep 17 00:00:00 2001 From: Ricardo Labiaga Date: Wed, 1 Apr 2009 09:23:02 -0400 Subject: nfs41: New xs_tcp_read_data() Handles RPC replies and backchannel callbacks. Traditionally the NFS client has expected only RPC replies on its open connections. With NFSv4.1, callbacks can arrive over an existing open connection. This patch refactors the old xs_tcp_read_request() into an RPC reply handler: xs_tcp_read_reply(), a new backchannel callback handler: xs_tcp_read_callback(), and a common routine to read the data off the transport: xs_tcp_read_common(). The new xs_tcp_read_callback() queues callback requests onto a queue where the callback service (a separate thread) is listening for the processing. This patch incorporates work and suggestions from Rahul Iyer (iyer@netapp.com) and Benny Halevy (bhalevy@panasas.com). xs_tcp_read_callback() drops the connection when the number of expected callbacks is exceeded. Use xprt_force_disconnect(), ensuring tasks on the pending queue are awaken on disconnect. [nfs41: Keep track of RPC call/reply direction with a flag] [nfs41: Preallocate rpc_rqst receive buffer for handling callbacks] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy [nfs41: sunrpc: xs_tcp_read_callback() should use xprt_force_disconnect()] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy [Moves embedded #ifdefs into #ifdef function blocks] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/xprtsock.c | 144 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 126 insertions(+), 18 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index a48df14..e3e3a57 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -34,6 +34,9 @@ #include #include #include +#ifdef CONFIG_NFS_V4_1 +#include +#endif #include #include @@ -1044,25 +1047,16 @@ static inline void xs_tcp_read_calldir(struct sock_xprt *transport, xs_tcp_check_fraghdr(transport); } -static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) +static inline void xs_tcp_read_common(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc, + struct rpc_rqst *req) { - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - struct rpc_rqst *req; + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *rcvbuf; size_t len; ssize_t r; - /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->transport_lock); - req = xprt_lookup_rqst(xprt, transport->tcp_xid); - if (!req) { - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - dprintk("RPC: XID %08x request not found!\n", - ntohl(transport->tcp_xid)); - spin_unlock(&xprt->transport_lock); - return; - } - rcvbuf = &req->rq_private_buf; if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { @@ -1114,7 +1108,7 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea "tcp_offset = %u, tcp_reclen = %u\n", xprt, transport->tcp_copied, transport->tcp_offset, transport->tcp_reclen); - goto out; + return; } dprintk("RPC: XID %08x read %Zd bytes\n", @@ -1130,11 +1124,125 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea transport->tcp_flags &= ~TCP_RCV_COPY_DATA; } -out: + return; +} + +/* + * Finds the request corresponding to the RPC xid and invokes the common + * tcp read code to read the data. + */ +static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + struct rpc_rqst *req; + + dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); + + /* Find and lock the request corresponding to this xid */ + spin_lock(&xprt->transport_lock); + req = xprt_lookup_rqst(xprt, transport->tcp_xid); + if (!req) { + dprintk("RPC: XID %08x request not found!\n", + ntohl(transport->tcp_xid)); + spin_unlock(&xprt->transport_lock); + return -1; + } + + xs_tcp_read_common(xprt, desc, req); + if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_rqst(req->rq_task, transport->tcp_copied); + spin_unlock(&xprt->transport_lock); - xs_tcp_check_fraghdr(transport); + return 0; +} + +#if defined(CONFIG_NFS_V4_1) +/* + * Obtains an rpc_rqst previously allocated and invokes the common + * tcp read code to read the data. The result is placed in the callback + * queue. + * If we're unable to obtain the rpc_rqst we schedule the closing of the + * connection and return -1. + */ +static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + struct rpc_rqst *req; + + req = xprt_alloc_bc_request(xprt); + if (req == NULL) { + printk(KERN_WARNING "Callback slot table overflowed\n"); + xprt_force_disconnect(xprt); + return -1; + } + + req->rq_xid = transport->tcp_xid; + dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); + xs_tcp_read_common(xprt, desc, req); + + if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { + struct svc_serv *bc_serv = xprt->bc_serv; + + /* + * Add callback request to callback list. The callback + * service sleeps on the sv_cb_waitq waiting for new + * requests. Wake it up after adding enqueing the + * request. + */ + dprintk("RPC: add callback request to list\n"); + spin_lock(&bc_serv->sv_cb_lock); + list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); + spin_unlock(&bc_serv->sv_cb_lock); + wake_up(&bc_serv->sv_cb_waitq); + } + + req->rq_private_buf.len = transport->tcp_copied; + + return 0; +} + +static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + + return (transport->tcp_flags & TCP_RPC_REPLY) ? + xs_tcp_read_reply(xprt, desc) : + xs_tcp_read_callback(xprt, desc); +} +#else +static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + return xs_tcp_read_reply(xprt, desc); +} +#endif /* CONFIG_NFS_V4_1 */ + +/* + * Read data off the transport. This can be either an RPC_CALL or an + * RPC_REPLY. Relay the processing to helper functions. + */ +static void xs_tcp_read_data(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + + if (_xs_tcp_read_data(xprt, desc) == 0) + xs_tcp_check_fraghdr(transport); + else { + /* + * The transport_lock protects the request handling. + * There's no need to hold it to update the tcp_flags. + */ + transport->tcp_flags &= ~TCP_RCV_COPY_DATA; + } } static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) @@ -1181,7 +1289,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns } /* Read in the request data */ if (transport->tcp_flags & TCP_RCV_COPY_DATA) { - xs_tcp_read_request(xprt, &desc); + xs_tcp_read_data(xprt, &desc); continue; } /* Skip over any trailing bytes on short reads */ -- cgit v1.1 From 55ae1aabfb108106dd095de2578ceef1c755a8b8 Mon Sep 17 00:00:00 2001 From: Ricardo Labiaga Date: Wed, 1 Apr 2009 09:23:03 -0400 Subject: nfs41: Add backchannel processing support to RPC state machine Adds rpc_run_bc_task() which is called by the NFS callback service to process backchannel requests. It performs similar work to rpc_run_task() though "schedules" the backchannel task to be executed starting at the call_trasmit state in the RPC state machine. It also introduces some miscellaneous updates to the argument validation, call_transmit, and transport cleanup functions to take into account that there are now forechannel and backchannel tasks. Backchannel requests do not carry an RPC message structure, since the payload has already been XDR encoded using the existing NFSv4 callback mechanism. Introduce a new transmit state for the client to reply on to backchannel requests. This new state simply reserves the transport and issues the reply. In case of a connection related error, disconnects the transport and drops the reply. It requires the forechannel to re-establish the connection and the server to retransmit the request, as stated in NFSv4.1 section 2.9.2 "Client and Server Transport Behavior". Note: There is no need to loop attempting to reserve the transport. If EAGAIN is returned by xprt_prepare_transmit(), return with tk_status == 0, setting tk_action to call_bc_transmit. rpc_execute() will invoke it again after the task is taken off the sleep queue. [nfs41: rpc_run_bc_task() need not be exported outside RPC module] [nfs41: New call_bc_transmit RPC state] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy [nfs41: Backchannel: No need to loop in call_bc_transmit()] Signed-off-by: Andy Adamson Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy [rpc_count_iostats incorrectly exits early] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy [Convert rpc_reply_expected() to inline function] [Remove unnecessary BUG_ON()] [Rename variable] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/clnt.c | 117 +++++++++++++++++++++++++++++++++++++++++++++++++++- net/sunrpc/stats.c | 6 ++- net/sunrpc/sunrpc.h | 37 +++++++++++++++++ net/sunrpc/xprt.c | 38 ++++++++++++++--- 4 files changed, 189 insertions(+), 9 deletions(-) create mode 100644 net/sunrpc/sunrpc.h (limited to 'net') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index aca3ab6..f3e93b8 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -36,7 +36,9 @@ #include #include #include +#include +#include "sunrpc.h" #ifdef RPC_DEBUG # define RPCDBG_FACILITY RPCDBG_CALL @@ -63,6 +65,9 @@ static void call_decode(struct rpc_task *task); static void call_bind(struct rpc_task *task); static void call_bind_status(struct rpc_task *task); static void call_transmit(struct rpc_task *task); +#if defined(CONFIG_NFS_V4_1) +static void call_bc_transmit(struct rpc_task *task); +#endif /* CONFIG_NFS_V4_1 */ static void call_status(struct rpc_task *task); static void call_transmit_status(struct rpc_task *task); static void call_refresh(struct rpc_task *task); @@ -613,6 +618,50 @@ rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags, } EXPORT_SYMBOL_GPL(rpc_call_async); +#if defined(CONFIG_NFS_V4_1) +/** + * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run + * rpc_execute against it + * @ops: RPC call ops + */ +struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req, + const struct rpc_call_ops *tk_ops) +{ + struct rpc_task *task; + struct xdr_buf *xbufp = &req->rq_snd_buf; + struct rpc_task_setup task_setup_data = { + .callback_ops = tk_ops, + }; + + dprintk("RPC: rpc_run_bc_task req= %p\n", req); + /* + * Create an rpc_task to send the data + */ + task = rpc_new_task(&task_setup_data); + if (!task) { + xprt_free_bc_request(req); + goto out; + } + task->tk_rqstp = req; + + /* + * Set up the xdr_buf length. + * This also indicates that the buffer is XDR encoded already. + */ + xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + + xbufp->tail[0].iov_len; + + task->tk_action = call_bc_transmit; + atomic_inc(&task->tk_count); + BUG_ON(atomic_read(&task->tk_count) != 2); + rpc_execute(task); + +out: + dprintk("RPC: rpc_run_bc_task: task= %p\n", task); + return task; +} +#endif /* CONFIG_NFS_V4_1 */ + void rpc_call_start(struct rpc_task *task) { @@ -1098,7 +1147,7 @@ call_transmit(struct rpc_task *task) * in order to allow access to the socket to other RPC requests. */ call_transmit_status(task); - if (task->tk_msg.rpc_proc->p_decode != NULL) + if (rpc_reply_expected(task)) return; task->tk_action = rpc_exit_task; rpc_wake_up_queued_task(&task->tk_xprt->pending, task); @@ -1133,6 +1182,72 @@ call_transmit_status(struct rpc_task *task) } } +#if defined(CONFIG_NFS_V4_1) +/* + * 5b. Send the backchannel RPC reply. On error, drop the reply. In + * addition, disconnect on connectivity errors. + */ +static void +call_bc_transmit(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + + BUG_ON(task->tk_status != 0); + task->tk_status = xprt_prepare_transmit(task); + if (task->tk_status == -EAGAIN) { + /* + * Could not reserve the transport. Try again after the + * transport is released. + */ + task->tk_status = 0; + task->tk_action = call_bc_transmit; + return; + } + + task->tk_action = rpc_exit_task; + if (task->tk_status < 0) { + printk(KERN_NOTICE "RPC: Could not send backchannel reply " + "error: %d\n", task->tk_status); + return; + } + + xprt_transmit(task); + xprt_end_transmit(task); + dprint_status(task); + switch (task->tk_status) { + case 0: + /* Success */ + break; + case -EHOSTDOWN: + case -EHOSTUNREACH: + case -ENETUNREACH: + case -ETIMEDOUT: + /* + * Problem reaching the server. Disconnect and let the + * forechannel reestablish the connection. The server will + * have to retransmit the backchannel request and we'll + * reprocess it. Since these ops are idempotent, there's no + * need to cache our reply at this time. + */ + printk(KERN_NOTICE "RPC: Could not send backchannel reply " + "error: %d\n", task->tk_status); + xprt_conditional_disconnect(task->tk_xprt, + req->rq_connect_cookie); + break; + default: + /* + * We were unable to reply and will have to drop the + * request. The server should reconnect and retransmit. + */ + BUG_ON(task->tk_status == -EAGAIN); + printk(KERN_NOTICE "RPC: Could not send backchannel reply " + "error: %d\n", task->tk_status); + break; + } + rpc_wake_up_queued_task(&req->rq_xprt->pending, task); +} +#endif /* CONFIG_NFS_V4_1 */ + /* * 6. Sort out the RPC call status */ diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c index 1ef6e46..8487aa0 100644 --- a/net/sunrpc/stats.c +++ b/net/sunrpc/stats.c @@ -141,12 +141,14 @@ EXPORT_SYMBOL_GPL(rpc_free_iostats); void rpc_count_iostats(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; - struct rpc_iostats *stats = task->tk_client->cl_metrics; + struct rpc_iostats *stats; struct rpc_iostats *op_metrics; long rtt, execute, queue; - if (!stats || !req) + if (!task->tk_client || !task->tk_client->cl_metrics || !req) return; + + stats = task->tk_client->cl_metrics; op_metrics = &stats[task->tk_msg.rpc_proc->p_statidx]; op_metrics->om_ops++; diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h new file mode 100644 index 0000000..5d9dd74 --- /dev/null +++ b/net/sunrpc/sunrpc.h @@ -0,0 +1,37 @@ +/****************************************************************************** + +(c) 2008 NetApp. All Rights Reserved. + +NetApp provides this source code under the GPL v2 License. +The GPL v2 license is available at +http://opensource.org/licenses/gpl-license.php. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +******************************************************************************/ + +/* + * Functions and macros used internally by RPC + */ + +#ifndef _NET_SUNRPC_SUNRPC_H +#define _NET_SUNRPC_SUNRPC_H + +static inline int rpc_reply_expected(struct rpc_task *task) +{ + return (task->tk_msg.rpc_proc != NULL) && + (task->tk_msg.rpc_proc->p_decode != NULL); +} + +#endif /* _NET_SUNRPC_SUNRPC_H */ + diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 52739f8..0eea2bf 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -12,8 +12,9 @@ * - Next, the caller puts together the RPC message, stuffs it into * the request struct, and calls xprt_transmit(). * - xprt_transmit sends the message and installs the caller on the - * transport's wait list. At the same time, it installs a timer that - * is run after the packet's timeout has expired. + * transport's wait list. At the same time, if a reply is expected, + * it installs a timer that is run after the packet's timeout has + * expired. * - When a packet arrives, the data_ready handler walks the list of * pending requests for that transport. If a matching XID is found, the * caller is woken up, and the timer removed. @@ -46,6 +47,8 @@ #include #include +#include "sunrpc.h" + /* * Local variables */ @@ -873,7 +876,10 @@ void xprt_transmit(struct rpc_task *task) dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); if (!req->rq_received) { - if (list_empty(&req->rq_list)) { + if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { + /* + * Add to the list only if we're expecting a reply + */ spin_lock_bh(&xprt->transport_lock); /* Update the softirq receive buffer */ memcpy(&req->rq_private_buf, &req->rq_rcv_buf, @@ -908,8 +914,13 @@ void xprt_transmit(struct rpc_task *task) /* Don't race with disconnect */ if (!xprt_connected(xprt)) task->tk_status = -ENOTCONN; - else if (!req->rq_received) + else if (!req->rq_received && rpc_reply_expected(task)) { + /* + * Sleep on the pending queue since + * we're expecting a reply. + */ rpc_sleep_on(&xprt->pending, task, xprt_timer); + } spin_unlock_bh(&xprt->transport_lock); } @@ -982,11 +993,17 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) */ void xprt_release(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_xprt *xprt; struct rpc_rqst *req; + int is_bc_request; if (!(req = task->tk_rqstp)) return; + + /* Preallocated backchannel request? */ + is_bc_request = bc_prealloc(req); + + xprt = req->rq_xprt; rpc_count_iostats(task); spin_lock_bh(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); @@ -999,10 +1016,19 @@ void xprt_release(struct rpc_task *task) mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); spin_unlock_bh(&xprt->transport_lock); - xprt->ops->buf_free(req->rq_buffer); + if (!bc_prealloc(req)) + xprt->ops->buf_free(req->rq_buffer); task->tk_rqstp = NULL; if (req->rq_release_snd_buf) req->rq_release_snd_buf(req); + + /* + * Early exit if this is a backchannel preallocated request. + * There is no need to have it added to the RPC slot list. + */ + if (is_bc_request) + return; + memset(req, 0, sizeof(*req)); /* mark unused */ dprintk("RPC: %5u release request %p\n", task->tk_pid, req); -- cgit v1.1 From 0d90ba1cd416525c4825c111db862d8b15a02e9b Mon Sep 17 00:00:00 2001 From: Ricardo Labiaga Date: Wed, 1 Apr 2009 09:23:04 -0400 Subject: nfs41: Backchannel callback service helper routines Executes the backchannel task on the RPC state machine using the existing open connection previously established by the client. Signed-off-by: Ricardo Labiaga nfs41: Add bc_svc.o to sunrpc Makefile. [nfs41: bc_send() does not need to be exported outside RPC module] [nfs41: xprt_free_bc_request() need not be exported outside RPC module] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy [Update copyright] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/Makefile | 2 +- net/sunrpc/bc_svc.c | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++ net/sunrpc/xprtsock.c | 3 ++ 3 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 net/sunrpc/bc_svc.c (limited to 'net') diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile index 4a01f96..db73fd2 100644 --- a/net/sunrpc/Makefile +++ b/net/sunrpc/Makefile @@ -13,6 +13,6 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \ rpcb_clnt.o timer.o xdr.o \ sunrpc_syms.o cache.o rpc_pipe.o \ svc_xprt.o -sunrpc-$(CONFIG_NFS_V4_1) += backchannel_rqst.o +sunrpc-$(CONFIG_NFS_V4_1) += backchannel_rqst.o bc_svc.o sunrpc-$(CONFIG_PROC_FS) += stats.o sunrpc-$(CONFIG_SYSCTL) += sysctl.o diff --git a/net/sunrpc/bc_svc.c b/net/sunrpc/bc_svc.c new file mode 100644 index 0000000..13f214f --- /dev/null +++ b/net/sunrpc/bc_svc.c @@ -0,0 +1,81 @@ +/****************************************************************************** + +(c) 2007 Network Appliance, Inc. All Rights Reserved. +(c) 2009 NetApp. All Rights Reserved. + +NetApp provides this source code under the GPL v2 License. +The GPL v2 license is available at +http://opensource.org/licenses/gpl-license.php. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +******************************************************************************/ + +/* + * The NFSv4.1 callback service helper routines. + * They implement the transport level processing required to send the + * reply over an existing open connection previously established by the client. + */ + +#if defined(CONFIG_NFS_V4_1) + +#include + +#include +#include +#include + +#define RPCDBG_FACILITY RPCDBG_SVCDSP + +void bc_release_request(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + + dprintk("RPC: bc_release_request: task= %p\n", task); + + /* + * Release this request only if it's a backchannel + * preallocated request + */ + if (!bc_prealloc(req)) + return; + xprt_free_bc_request(req); +} + +/* Empty callback ops */ +static const struct rpc_call_ops nfs41_callback_ops = { +}; + + +/* + * Send the callback reply + */ +int bc_send(struct rpc_rqst *req) +{ + struct rpc_task *task; + int ret; + + dprintk("RPC: bc_send req= %p\n", req); + task = rpc_run_bc_task(req, &nfs41_callback_ops); + if (IS_ERR(task)) + ret = PTR_ERR(task); + else { + BUG_ON(atomic_read(&task->tk_count) != 1); + ret = task->tk_status; + rpc_put_task(task); + } + return ret; + dprintk("RPC: bc_send ret= %d \n", ret); +} + +#endif /* CONFIG_NFS_V4_1 */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index e3e3a57..8a72186 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -2183,6 +2183,9 @@ static struct rpc_xprt_ops xs_tcp_ops = { .buf_free = rpc_free, .send_request = xs_tcp_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, +#if defined(CONFIG_NFS_V4_1) + .release_request = bc_release_request, +#endif /* CONFIG_NFS_V4_1 */ .close = xs_tcp_close, .destroy = xs_destroy, .print_stats = xs_tcp_print_stats, -- cgit v1.1 From 1cad7ea6fe98dc414bd3df55275c147bd15ebf97 Mon Sep 17 00:00:00 2001 From: Ricardo Labiaga Date: Wed, 1 Apr 2009 09:23:06 -0400 Subject: nfs41: Refactor svc_process() net/sunrpc/svc.c:svc_process() is used by the NFSv4 callback service to process RPC requests arriving over connections initiated by the server. NFSv4.1 supports callbacks over the backchannel on connections initiated by the client. This patch refactors svc_process() so that common code can also be used by the backchannel. Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/svc.c | 80 +++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 30 deletions(-) (limited to 'net') diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 8847add..bfda66d 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -970,20 +970,18 @@ svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) } /* - * Process the RPC request. + * Common routine for processing the RPC request. */ -int -svc_process(struct svc_rqst *rqstp) +static int +svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv) { struct svc_program *progp; struct svc_version *versp = NULL; /* compiler food */ struct svc_procedure *procp = NULL; - struct kvec * argv = &rqstp->rq_arg.head[0]; - struct kvec * resv = &rqstp->rq_res.head[0]; struct svc_serv *serv = rqstp->rq_server; kxdrproc_t xdr; __be32 *statp; - u32 dir, prog, vers, proc; + u32 prog, vers, proc; __be32 auth_stat, rpc_stat; int auth_res; __be32 *reply_statp; @@ -993,19 +991,6 @@ svc_process(struct svc_rqst *rqstp) if (argv->iov_len < 6*4) goto err_short_len; - /* setup response xdr_buf. - * Initially it has just one page - */ - rqstp->rq_resused = 1; - resv->iov_base = page_address(rqstp->rq_respages[0]); - resv->iov_len = 0; - rqstp->rq_res.pages = rqstp->rq_respages + 1; - rqstp->rq_res.len = 0; - rqstp->rq_res.page_base = 0; - rqstp->rq_res.page_len = 0; - rqstp->rq_res.buflen = PAGE_SIZE; - rqstp->rq_res.tail[0].iov_base = NULL; - rqstp->rq_res.tail[0].iov_len = 0; /* Will be turned off only in gss privacy case: */ rqstp->rq_splice_ok = 1; /* Will be turned off only when NFSv4 Sessions are used */ @@ -1014,17 +999,13 @@ svc_process(struct svc_rqst *rqstp) /* Setup reply header */ rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp); - rqstp->rq_xid = svc_getu32(argv); svc_putu32(resv, rqstp->rq_xid); - dir = svc_getnl(argv); vers = svc_getnl(argv); /* First words of reply: */ svc_putnl(resv, 1); /* REPLY */ - if (dir != 0) /* direction != CALL */ - goto err_bad_dir; if (vers != 2) /* RPC version number */ goto err_bad_rpc; @@ -1147,7 +1128,7 @@ svc_process(struct svc_rqst *rqstp) sendit: if (svc_authorise(rqstp)) goto dropit; - return svc_send(rqstp); + return 1; /* Caller can now send it */ dropit: svc_authorise(rqstp); /* doesn't hurt to call this twice */ @@ -1161,12 +1142,6 @@ err_short_len: goto dropit; /* drop request */ -err_bad_dir: - svc_printk(rqstp, "bad direction %d, dropping request\n", dir); - - serv->sv_stats->rpcbadfmt++; - goto dropit; /* drop request */ - err_bad_rpc: serv->sv_stats->rpcbadfmt++; svc_putnl(resv, 1); /* REJECT */ @@ -1220,6 +1195,51 @@ err_bad: EXPORT_SYMBOL_GPL(svc_process); /* + * Process the RPC request. + */ +int +svc_process(struct svc_rqst *rqstp) +{ + struct kvec *argv = &rqstp->rq_arg.head[0]; + struct kvec *resv = &rqstp->rq_res.head[0]; + struct svc_serv *serv = rqstp->rq_server; + u32 dir; + int error; + + /* + * Setup response xdr_buf. + * Initially it has just one page + */ + rqstp->rq_resused = 1; + resv->iov_base = page_address(rqstp->rq_respages[0]); + resv->iov_len = 0; + rqstp->rq_res.pages = rqstp->rq_respages + 1; + rqstp->rq_res.len = 0; + rqstp->rq_res.page_base = 0; + rqstp->rq_res.page_len = 0; + rqstp->rq_res.buflen = PAGE_SIZE; + rqstp->rq_res.tail[0].iov_base = NULL; + rqstp->rq_res.tail[0].iov_len = 0; + + rqstp->rq_xid = svc_getu32(argv); + + dir = svc_getnl(argv); + if (dir != 0) { + /* direction != CALL */ + svc_printk(rqstp, "bad direction %d, dropping request\n", dir); + serv->sv_stats->rpcbadfmt++; + svc_drop(rqstp); + return 0; + } + + error = svc_process_common(rqstp, argv, resv); + if (error <= 0) + return error; + + return svc_send(rqstp); +} + +/* * Return (transport-specific) limit on the rpc payload. */ u32 svc_max_payload(const struct svc_rqst *rqstp) -- cgit v1.1 From 4d6bbb6233c9cf23822a2f66f8470c9f40854b77 Mon Sep 17 00:00:00 2001 From: Ricardo Labiaga Date: Wed, 1 Apr 2009 09:23:07 -0400 Subject: nfs41: Backchannel bc_svc_process() Implement the NFSv4.1 backchannel service. Invokes the common callback processing logic svc_process_common() to authenticate the call and dispatch the appropriate NFSv4.1 XDR decoder and operation procedure. It then invokes bc_send() to send the reply over the same connection. bc_send() is implemented in a separate patch. At this time there is no slot validation or reply cache handling. [nfs41: Preallocate rpc_rqst receive buffer for handling callbacks] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy [Move bc_svc_process() declaration to correct patch] Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/svc.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) (limited to 'net') diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index bfda66d..06b52e4 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -25,6 +25,7 @@ #include #include #include +#include #define RPCDBG_FACILITY RPCDBG_SVCDSP @@ -1239,6 +1240,54 @@ svc_process(struct svc_rqst *rqstp) return svc_send(rqstp); } +#if defined(CONFIG_NFS_V4_1) +/* + * Process a backchannel RPC request that arrived over an existing + * outbound connection + */ +int +bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, + struct svc_rqst *rqstp) +{ + struct kvec *argv = &rqstp->rq_arg.head[0]; + struct kvec *resv = &rqstp->rq_res.head[0]; + int error; + + /* Build the svc_rqst used by the common processing routine */ + rqstp->rq_xid = req->rq_xid; + rqstp->rq_prot = req->rq_xprt->prot; + rqstp->rq_server = serv; + + rqstp->rq_addrlen = sizeof(req->rq_xprt->addr); + memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen); + memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg)); + memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res)); + + /* reset result send buffer "put" position */ + resv->iov_len = 0; + + if (rqstp->rq_prot != IPPROTO_TCP) { + printk(KERN_ERR "No support for Non-TCP transports!\n"); + BUG(); + } + + /* + * Skip the next two words because they've already been + * processed in the trasport + */ + svc_getu32(argv); /* XID */ + svc_getnl(argv); /* CALLDIR */ + + error = svc_process_common(rqstp, argv, resv); + if (error <= 0) + return error; + + memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf)); + return bc_send(req); +} +EXPORT_SYMBOL(bc_svc_process); +#endif /* CONFIG_NFS_V4_1 */ + /* * Return (transport-specific) limit on the rpc payload. */ -- cgit v1.1 From 7652e5a09ba319241607b22d9055ce93fd5b8039 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 1 Apr 2009 09:23:09 -0400 Subject: nfs41: sunrpc: provide functions to create and destroy a svc_xprt for backchannel use For nfs41 callbacks we need an svc_xprt to process requests coming up the backchannel socket as rpc_rqst's that are transformed into svc_rqst's that need a rq_xprt to be processed. The svc_{udp,tcp}_create methods are too heavy for this job as svc_create_socket creates an actual socket to listen on while for nfs41 we're "reusing" the fore channel's socket. Signed-off-by: Benny Halevy --- net/sunrpc/svcsock.c | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) (limited to 'net') diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 9d50423..a2a03e5 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -1327,3 +1327,42 @@ static void svc_sock_free(struct svc_xprt *xprt) sock_release(svsk->sk_sock); kfree(svsk); } + +/* + * Create a svc_xprt. + * + * For internal use only (e.g. nfsv4.1 backchannel). + * Callers should typically use the xpo_create() method. + */ +struct svc_xprt *svc_sock_create(struct svc_serv *serv, int prot) +{ + struct svc_sock *svsk; + struct svc_xprt *xprt = NULL; + + dprintk("svc: %s\n", __func__); + svsk = kzalloc(sizeof(*svsk), GFP_KERNEL); + if (!svsk) + goto out; + + xprt = &svsk->sk_xprt; + if (prot == IPPROTO_TCP) + svc_xprt_init(&svc_tcp_class, xprt, serv); + else if (prot == IPPROTO_UDP) + svc_xprt_init(&svc_udp_class, xprt, serv); + else + BUG(); +out: + dprintk("svc: %s return %p\n", __func__, xprt); + return xprt; +} +EXPORT_SYMBOL_GPL(svc_sock_create); + +/* + * Destroy a svc_sock. + */ +void svc_sock_destroy(struct svc_xprt *xprt) +{ + if (xprt) + kfree(container_of(xprt, struct svc_sock, sk_xprt)); +} +EXPORT_SYMBOL_GPL(svc_sock_destroy); -- cgit v1.1 From 9c9f3f5fa62cc4959e4d4d1cf1ec74f2d6ac1197 Mon Sep 17 00:00:00 2001 From: Andy Adamson Date: Wed, 1 Apr 2009 09:23:10 -0400 Subject: nfs41: sunrpc: add a struct svc_xprt pointer to struct svc_serv for backchannel use This svc_xprt is passed on to the callback service thread to be later used to processes incoming svc_rqst's Signed-off-by: Benny Halevy --- net/sunrpc/svc.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'net') diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 06b52e4..b35048f 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -487,6 +487,10 @@ svc_destroy(struct svc_serv *serv) if (svc_serv_is_pooled(serv)) svc_pool_map_put(); +#if defined(CONFIG_NFS_V4_1) + svc_sock_destroy(serv->bc_xprt); +#endif /* CONFIG_NFS_V4_1 */ + svc_unregister(serv); kfree(serv->sv_pools); kfree(serv); -- cgit v1.1 From 8f975242352e92898dc641ebff0d24808f39848a Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 1 Apr 2009 09:23:11 -0400 Subject: nfs41: create a svc_xprt for nfs41 callback thread and use for incoming callbacks Signed-off-by: Benny Halevy --- net/sunrpc/svc.c | 1 + 1 file changed, 1 insertion(+) (limited to 'net') diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index b35048f..6b90ce4 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -1258,6 +1258,7 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, int error; /* Build the svc_rqst used by the common processing routine */ + rqstp->rq_xprt = serv->bc_xprt; rqstp->rq_xid = req->rq_xid; rqstp->rq_prot = req->rq_xprt->prot; rqstp->rq_server = serv; -- cgit v1.1 From 343952fa5aac888934ffc203abed26a823400eb6 Mon Sep 17 00:00:00 2001 From: Rahul Iyer Date: Wed, 1 Apr 2009 09:23:17 -0400 Subject: nfs41: Get the rpc_xprt * from the rpc_rqst instead of the rpc_clnt. Obtain the rpc_xprt from the rpc_rqst so that calls and callback replies can both use the same code path. A client needs the rpc_xprt in order to reply to a callback. Signed-off-by: Rahul Iyer Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/xprt.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 0eea2bf..c144611 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -195,8 +195,8 @@ EXPORT_SYMBOL_GPL(xprt_load_transport); */ int xprt_reserve_xprt(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) @@ -858,7 +858,7 @@ out_unlock: void xprt_end_transmit(struct rpc_task *task) { - xprt_release_write(task->tk_xprt, task); + xprt_release_write(task->tk_rqstp->rq_xprt, task); } /** -- cgit v1.1 From dd2b63d049480979016b959abc2d141cdddb1389 Mon Sep 17 00:00:00 2001 From: Ricardo Labiaga Date: Wed, 1 Apr 2009 09:23:28 -0400 Subject: nfs41: Rename rq_received to rq_reply_bytes_recvd The 'rq_received' member of 'struct rpc_rqst' is used to track when we have received a reply to our request. With v4.1, the backchannel can now accept callback requests over the existing connection. Rename this field to make it clear that it is only used for tracking reply bytes and not all bytes received on the connection. Signed-off-by: Ricardo Labiaga Signed-off-by: Benny Halevy --- net/sunrpc/backchannel_rqst.c | 2 +- net/sunrpc/clnt.c | 8 ++++---- net/sunrpc/stats.c | 2 +- net/sunrpc/xprt.c | 15 ++++++++------- 4 files changed, 14 insertions(+), 13 deletions(-) (limited to 'net') diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c index f56e18a..5a7d342 100644 --- a/net/sunrpc/backchannel_rqst.c +++ b/net/sunrpc/backchannel_rqst.c @@ -230,7 +230,7 @@ struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt) if (req != NULL) { set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); - req->rq_received = 0; + req->rq_reply_bytes_recvd = 0; req->rq_bytes_sent = 0; memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(req->rq_private_buf)); diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index f3e93b8..5bc2f45 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1258,8 +1258,8 @@ call_status(struct rpc_task *task) struct rpc_rqst *req = task->tk_rqstp; int status; - if (req->rq_received > 0 && !req->rq_bytes_sent) - task->tk_status = req->rq_received; + if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent) + task->tk_status = req->rq_reply_bytes_recvd; dprint_status(task); @@ -1376,7 +1376,7 @@ call_decode(struct rpc_task *task) /* * Ensure that we see all writes made by xprt_complete_rqst() - * before it changed req->rq_received. + * before it changed req->rq_reply_bytes_recvd. */ smp_rmb(); req->rq_rcv_buf.len = req->rq_private_buf.len; @@ -1417,7 +1417,7 @@ out_retry: task->tk_status = 0; /* Note: rpc_verify_header() may have freed the RPC slot */ if (task->tk_rqstp == req) { - req->rq_received = req->rq_rcv_buf.len = 0; + req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0; if (task->tk_client->cl_discrtry) xprt_conditional_disconnect(task->tk_xprt, req->rq_connect_cookie); diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c index 8487aa0..1b4e679 100644 --- a/net/sunrpc/stats.c +++ b/net/sunrpc/stats.c @@ -156,7 +156,7 @@ void rpc_count_iostats(struct rpc_task *task) op_metrics->om_timeouts += task->tk_timeouts; op_metrics->om_bytes_sent += task->tk_bytes_sent; - op_metrics->om_bytes_recv += req->rq_received; + op_metrics->om_bytes_recv += req->rq_reply_bytes_recvd; queue = (long)req->rq_xtime - task->tk_start; if (queue < 0) diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index c144611..f412a85 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -806,9 +806,10 @@ void xprt_complete_rqst(struct rpc_task *task, int copied) list_del_init(&req->rq_list); req->rq_private_buf.len = copied; - /* Ensure all writes are done before we update req->rq_received */ + /* Ensure all writes are done before we update */ + /* req->rq_reply_bytes_recvd */ smp_wmb(); - req->rq_received = copied; + req->rq_reply_bytes_recvd = copied; rpc_wake_up_queued_task(&xprt->pending, task); } EXPORT_SYMBOL_GPL(xprt_complete_rqst); @@ -823,7 +824,7 @@ static void xprt_timer(struct rpc_task *task) dprintk("RPC: %5u xprt_timer\n", task->tk_pid); spin_lock_bh(&xprt->transport_lock); - if (!req->rq_received) { + if (!req->rq_reply_bytes_recvd) { if (xprt->ops->timer) xprt->ops->timer(task); } else @@ -845,8 +846,8 @@ int xprt_prepare_transmit(struct rpc_task *task) dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); spin_lock_bh(&xprt->transport_lock); - if (req->rq_received && !req->rq_bytes_sent) { - err = req->rq_received; + if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) { + err = req->rq_reply_bytes_recvd; goto out_unlock; } if (!xprt->ops->reserve_xprt(task)) @@ -875,7 +876,7 @@ void xprt_transmit(struct rpc_task *task) dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); - if (!req->rq_received) { + if (!req->rq_reply_bytes_recvd) { if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { /* * Add to the list only if we're expecting a reply @@ -914,7 +915,7 @@ void xprt_transmit(struct rpc_task *task) /* Don't race with disconnect */ if (!xprt_connected(xprt)) task->tk_status = -ENOTCONN; - else if (!req->rq_received && rpc_reply_expected(task)) { + else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) { /* * Sleep on the pending queue since * we're expecting a reply. -- cgit v1.1