From 66d7a56a6254389587d0999dcaab1d2634cd4e24 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 5 Oct 2015 11:19:20 -0400 Subject: SUNRPC: Refactor TCP receive Move the TCP data receive loop out of xs_tcp_data_ready(). Doing so will allow us to move the data receive out of the softirq context in a set of followup patches. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 44 +++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 1a85e0e..fa8d0c1 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1391,6 +1391,30 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns return len - desc.count; } +static void xs_tcp_data_receive(struct sock_xprt *transport) +{ + struct rpc_xprt *xprt = &transport->xprt; + struct sock *sk; + read_descriptor_t rd_desc = { + .count = 2*1024*1024, + .arg.data = xprt, + }; + unsigned long total = 0; + int read = 0; + + sk = transport->inet; + + /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ + for (;;) { + read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); + if (read <= 0) + break; + total += read; + rd_desc.count = 65536; + } + trace_xs_tcp_data_ready(xprt, read, total); +} + /** * xs_tcp_data_ready - "data ready" callback for TCP sockets * @sk: socket with data to read @@ -1398,34 +1422,24 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns */ static void xs_tcp_data_ready(struct sock *sk) { + struct sock_xprt *transport; struct rpc_xprt *xprt; - read_descriptor_t rd_desc; - int read; - unsigned long total = 0; dprintk("RPC: xs_tcp_data_ready...\n"); read_lock_bh(&sk->sk_callback_lock); - if (!(xprt = xprt_from_sock(sk))) { - read = 0; + if (!(xprt = xprt_from_sock(sk))) goto out; - } + transport = container_of(xprt, struct sock_xprt, xprt); + /* Any data means we had a useful conversation, so * the we don't need to delay the next reconnect */ if (xprt->reestablish_timeout) xprt->reestablish_timeout = 0; - /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ - rd_desc.arg.data = xprt; - do { - rd_desc.count = 65536; - read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - if (read > 0) - total += read; - } while (read > 0); + xs_tcp_data_receive(transport); out: - trace_xs_tcp_data_ready(xprt, read, total); read_unlock_bh(&sk->sk_callback_lock); } -- cgit v1.1 From edc1b01cd3b20a5fff049e98f82a2b0d24a34c89 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 5 Oct 2015 10:53:49 -0400 Subject: SUNRPC: Move TCP receive data path into a workqueue context Stream protocols such as TCP can often build up a backlog of data to be read due to ordering. Combine this with the fact that some workloads such as NFS read()-intensive workloads need to receive a lot of data per RPC call, and it turns out that receiving the data from inside a softirq context can cause starvation. The following patch moves the TCP data receive into a workqueue context. We still end up calling tcp_read_sock(), but we do so from a process context, meaning that softirqs are enabled for most of the time. With this patch, I see a doubling of read bandwidth when running a multi-threaded iozone workload between a virtual client and server setup. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 51 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 15 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index fa8d0c1..58dc90c 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -823,6 +823,7 @@ static void xs_reset_transport(struct sock_xprt *transport) kernel_sock_shutdown(sock, SHUT_RDWR); + mutex_lock(&transport->recv_mutex); write_lock_bh(&sk->sk_callback_lock); transport->inet = NULL; transport->sock = NULL; @@ -833,6 +834,7 @@ static void xs_reset_transport(struct sock_xprt *transport) xprt_clear_connected(xprt); write_unlock_bh(&sk->sk_callback_lock); xs_sock_reset_connection_flags(xprt); + mutex_unlock(&transport->recv_mutex); trace_rpc_socket_close(xprt, sock); sock_release(sock); @@ -886,6 +888,7 @@ static void xs_destroy(struct rpc_xprt *xprt) cancel_delayed_work_sync(&transport->connect_worker); xs_close(xprt); + cancel_work_sync(&transport->recv_worker); xs_xprt_free(xprt); module_put(THIS_MODULE); } @@ -1243,12 +1246,12 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); req = xprt_lookup_rqst(xprt, transport->tcp_xid); if (!req) { dprintk("RPC: XID %08x request not found!\n", ntohl(transport->tcp_xid)); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return -1; } @@ -1257,7 +1260,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_rqst(req->rq_task, transport->tcp_copied); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return 0; } @@ -1277,10 +1280,10 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, struct rpc_rqst *req; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); req = xprt_lookup_bc_request(xprt, transport->tcp_xid); if (req == NULL) { - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); printk(KERN_WARNING "Callback slot table overflowed\n"); xprt_force_disconnect(xprt); return -1; @@ -1291,7 +1294,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_bc_request(req, transport->tcp_copied); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return 0; } @@ -1402,19 +1405,33 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) unsigned long total = 0; int read = 0; + mutex_lock(&transport->recv_mutex); sk = transport->inet; + if (sk == NULL) + goto out; /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ for (;;) { + lock_sock(sk); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); + release_sock(sk); if (read <= 0) break; total += read; rd_desc.count = 65536; } +out: + mutex_unlock(&transport->recv_mutex); trace_xs_tcp_data_ready(xprt, read, total); } +static void xs_tcp_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_tcp_data_receive(transport); +} + /** * xs_tcp_data_ready - "data ready" callback for TCP sockets * @sk: socket with data to read @@ -1437,8 +1454,8 @@ static void xs_tcp_data_ready(struct sock *sk) */ if (xprt->reestablish_timeout) xprt->reestablish_timeout = 0; + queue_work(rpciod_workqueue, &transport->recv_worker); - xs_tcp_data_receive(transport); out: read_unlock_bh(&sk->sk_callback_lock); } @@ -1840,6 +1857,10 @@ static inline void xs_reclassify_socket(int family, struct socket *sock) } #endif +static void xs_dummy_data_receive_workfn(struct work_struct *work) +{ +} + static void xs_dummy_setup_socket(struct work_struct *work) { } @@ -2664,6 +2685,7 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, } new = container_of(xprt, struct sock_xprt, xprt); + mutex_init(&new->recv_mutex); memcpy(&xprt->addr, args->dstaddr, args->addrlen); xprt->addrlen = args->addrlen; if (args->srcaddr) @@ -2717,6 +2739,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; + INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); @@ -2788,21 +2811,20 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) xprt->timeout = &xs_udp_default_timeout; + INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); + switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_udp_setup_socket); xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_udp_setup_socket); xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); break; default: @@ -2867,21 +2889,20 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) xprt->ops = &xs_tcp_ops; xprt->timeout = &xs_tcp_default_timeout; + INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); + switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_tcp_setup_socket); xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_tcp_setup_socket); xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); break; default: -- cgit v1.1 From f9b2ee714c5c945cda27e9cbca5f60d5199fb93f Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 6 Oct 2015 16:26:05 -0400 Subject: SUNRPC: Move UDP receive data path into a workqueue context Now that we've done it for TCP, let's convert UDP as well. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 84 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 23 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 58dc90c..df8bdcc 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -972,42 +972,36 @@ static void xs_local_data_ready(struct sock *sk) } /** - * xs_udp_data_ready - "data ready" callback for UDP sockets - * @sk: socket with data to read + * xs_udp_data_read_skb - receive callback for UDP sockets + * @xprt: transport + * @sk: socket + * @skb: skbuff * */ -static void xs_udp_data_ready(struct sock *sk) +static void xs_udp_data_read_skb(struct rpc_xprt *xprt, + struct sock *sk, + struct sk_buff *skb) { struct rpc_task *task; - struct rpc_xprt *xprt; struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; + int repsize, copied; u32 _xid; __be32 *xp; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: xs_udp_data_ready...\n"); - if (!(xprt = xprt_from_sock(sk))) - goto out; - - if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) - goto out; - repsize = skb->len - sizeof(struct udphdr); if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d!\n", repsize); - goto dropit; + return; } /* Copy the XID from the skb... */ xp = skb_header_pointer(skb, sizeof(struct udphdr), sizeof(_xid), &_xid); if (xp == NULL) - goto dropit; + return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; @@ -1028,10 +1022,54 @@ static void xs_udp_data_ready(struct sock *sk) xprt_complete_rqst(task, copied); out_unlock: - spin_unlock(&xprt->transport_lock); - dropit: - skb_free_datagram(sk, skb); - out: + spin_unlock_bh(&xprt->transport_lock); +} + +static void xs_udp_data_receive(struct sock_xprt *transport) +{ + struct sk_buff *skb; + struct sock *sk; + int err; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + for (;;) { + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + break; + xs_udp_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + } +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_udp_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_udp_data_receive(transport); +} + +/** + * xs_data_ready - "data ready" callback for UDP sockets + * @sk: socket with data to read + * + */ +static void xs_data_ready(struct sock *sk) +{ + struct rpc_xprt *xprt; + + read_lock_bh(&sk->sk_callback_lock); + dprintk("RPC: xs_data_ready...\n"); + xprt = xprt_from_sock(sk); + if (xprt != NULL) { + struct sock_xprt *transport = container_of(xprt, + struct sock_xprt, xprt); + queue_work(rpciod_workqueue, &transport->recv_worker); + } read_unlock_bh(&sk->sk_callback_lock); } @@ -2094,7 +2132,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_udp_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; sk->sk_allocation = GFP_NOIO; @@ -2811,7 +2849,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) xprt->timeout = &xs_udp_default_timeout; - INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); + INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); switch (addr->sa_family) { -- cgit v1.1 From a26480942c99d84a7682ea4c00f47a3a42ed41d2 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 6 Oct 2015 17:03:00 -0400 Subject: SUNRPC: Move AF_LOCAL receive data path into a workqueue context Now that we've done it for TCP and UDP, let's convert AF_LOCAL as well. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 72 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 30 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index df8bdcc..1471ecc 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -909,44 +909,36 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) } /** - * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets - * @sk: socket with data to read + * xs_local_data_read_skb + * @xprt: transport + * @sk: socket + * @skb: skbuff * * Currently this assumes we can read the whole reply in a single gulp. */ -static void xs_local_data_ready(struct sock *sk) +static void xs_local_data_read_skb(struct rpc_xprt *xprt, + struct sock *sk, + struct sk_buff *skb) { struct rpc_task *task; - struct rpc_xprt *xprt; struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; + int repsize, copied; u32 _xid; __be32 *xp; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: %s...\n", __func__); - xprt = xprt_from_sock(sk); - if (xprt == NULL) - goto out; - - skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) - goto out; - repsize = skb->len - sizeof(rpc_fraghdr); if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d\n", repsize); - goto dropit; + return; } /* Copy the XID from the skb... */ xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); if (xp == NULL) - goto dropit; + return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; @@ -964,11 +956,35 @@ static void xs_local_data_ready(struct sock *sk) xprt_complete_rqst(task, copied); out_unlock: - spin_unlock(&xprt->transport_lock); - dropit: - skb_free_datagram(sk, skb); - out: - read_unlock_bh(&sk->sk_callback_lock); + spin_unlock_bh(&xprt->transport_lock); +} + +static void xs_local_data_receive(struct sock_xprt *transport) +{ + struct sk_buff *skb; + struct sock *sk; + int err; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + for (;;) { + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + break; + xs_local_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + } +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_local_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_local_data_receive(transport); } /** @@ -1895,10 +1911,6 @@ static inline void xs_reclassify_socket(int family, struct socket *sock) } #endif -static void xs_dummy_data_receive_workfn(struct work_struct *work) -{ -} - static void xs_dummy_setup_socket(struct work_struct *work) { } @@ -1946,7 +1958,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_local_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; sk->sk_error_report = xs_error_report; sk->sk_allocation = GFP_NOIO; @@ -2777,7 +2789,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; - INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); + INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); -- cgit v1.1 From 31303d6cbb24ba94e8b82170213bd2fde6365d9a Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 6 Oct 2015 15:59:20 -0400 Subject: SUNRPC: Use MSG_SENDPAGE_NOTLAST in xs_send_pagedata() If we're sending more than one page via kernel_sendpage(), then set MSG_SENDPAGE_NOTLAST between the pages so that we don't send suboptimal frames (see commit 2f5338442425 and commit 35f9c09fe9c7). Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 1471ecc..e71aff2 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -360,8 +360,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i int flags = XS_SENDMSG_FLAGS; remainder -= len; - if (remainder != 0 || more) + if (more) flags |= MSG_MORE; + if (remainder != 0) + flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE; err = do_sendpage(sock, *ppage, base, len, flags); if (remainder == 0 || err != len) break; -- cgit v1.1 From 8610586d820aa75dc7da470d77fc23492c2492f8 Mon Sep 17 00:00:00 2001 From: Steve Wise Date: Mon, 21 Sep 2015 12:24:34 -0500 Subject: xprtrdma: don't log warnings for flushed completions Unsignaled send WRs can get flushed as part of normal unmount, so don't log them as warnings. Signed-off-by: Steve Wise Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/frwr_ops.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 5318951..0a36239 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -252,8 +252,11 @@ frwr_sendcompletion(struct ib_wc *wc) /* WARNING: Only wr_id and status are reliable at this point */ r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - pr_warn("RPC: %s: frmr %p flushed, status %s (%d)\n", - __func__, r, ib_wc_status_msg(wc->status), wc->status); + if (wc->status == IB_WC_WR_FLUSH_ERR) + dprintk("RPC: %s: frmr %p flushed\n", __func__, r); + else + pr_warn("RPC: %s: frmr %p error, status %s (%d)\n", + __func__, r, ib_wc_status_msg(wc->status), wc->status); r->r.frmr.fr_state = FRMR_IS_STALE; } -- cgit v1.1 From a045178887ebafa9514d6b4cb840ac13a26c8365 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:26:29 -0400 Subject: xprtrdma: Enable swap-on-NFS/RDMA After adding a swapfile on an NFS/RDMA mount and removing the normal swap partition, I was able to push the NFS client well into swap without any issue. I forgot to swapoff the NFS file before rebooting. This pinned the NFS mount and the IB core and provider, causing shutdown to hang. I think this is expected and safe behavior. Probably shutdown scripts should "swapoff -a" before unmounting any filesystems. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 41e452b..e9e5ed7 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) static int xprt_rdma_enable_swap(struct rpc_xprt *xprt) { - return -EINVAL; + return 0; } static void -- cgit v1.1 From 7b3d770c67bc07db5035999e4f864c5f2ff7b10e Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:26:37 -0400 Subject: xprtrdma: Re-arm after missed events ib_req_notify_cq(IB_CQ_REPORT_MISSED_EVENTS) returns a positive value if WCs were added to a CQ after the last completion upcall but before the CQ has been re-armed. Commit 7f23f6f6e388 ("xprtrmda: Reduce lock contention in completion handlers") assumed that when ib_req_notify_cq() returned a positive RC, the CQ had also been successfully re-armed, making it safe to return control to the provider without losing any completion signals. That is an invalid assumption. Change both completion handlers to continue polling while ib_req_notify_cq() returns a positive value. Fixes: 7f23f6f6e388 ("xprtrmda: Reduce lock contention in ...") Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Reviewed-by: Devesh Sharma Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 66 +++++++-------------------------------------- 1 file changed, 10 insertions(+), 56 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 5502d4d..61eea73 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -179,38 +179,17 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) return 0; } -/* - * Handle send, fast_reg_mr, and local_inv completions. - * - * Send events are typically suppressed and thus do not result - * in an upcall. Occasionally one is signaled, however. This - * prevents the provider's completion queue from wrapping and - * losing a completion. +/* Handle provider send completion upcalls. */ static void rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) { struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; - int rc; - - rc = rpcrdma_sendcq_poll(cq, ep); - if (rc) { - dprintk("RPC: %s: ib_poll_cq failed: %i\n", - __func__, rc); - return; - } - rc = ib_req_notify_cq(cq, - IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); - if (rc == 0) - return; - if (rc < 0) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - return; - } - - rpcrdma_sendcq_poll(cq, ep); + do { + rpcrdma_sendcq_poll(cq, ep); + } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | + IB_CQ_REPORT_MISSED_EVENTS) > 0); } static void @@ -274,42 +253,17 @@ out_schedule: return rc; } -/* - * Handle receive completions. - * - * It is reentrant but processes single events in order to maintain - * ordering of receives to keep server credits. - * - * It is the responsibility of the scheduled tasklet to return - * recv buffers to the pool. NOTE: this affects synchronization of - * connection shutdown. That is, the structures required for - * the completion of the reply handler must remain intact until - * all memory has been reclaimed. +/* Handle provider receive completion upcalls. */ static void rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) { struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; - int rc; - - rc = rpcrdma_recvcq_poll(cq, ep); - if (rc) { - dprintk("RPC: %s: ib_poll_cq failed: %i\n", - __func__, rc); - return; - } - rc = ib_req_notify_cq(cq, - IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); - if (rc == 0) - return; - if (rc < 0) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - return; - } - - rpcrdma_recvcq_poll(cq, ep); + do { + rpcrdma_recvcq_poll(cq, ep); + } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | + IB_CQ_REPORT_MISSED_EVENTS) > 0); } static void -- cgit v1.1 From 4220a07264c0517006a534aed201e29c8d297306 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:26:45 -0400 Subject: xprtrdma: Prevent loss of completion signals Commit 8301a2c047cc ("xprtrdma: Limit work done by completion handler") was supposed to prevent xprtrdma's upcall handlers from starving other softIRQ work by letting them return to the provider before all CQEs have been polled. The logic assumes the provider will call the upcall handler again immediately if the CQ is re-armed while there are still queued CQEs. This assumption is invalid. The IBTA spec says that after a CQ is armed, the hardware must interrupt only when a new CQE is inserted. xprtrdma can't rely on the provider calling again, even though some providers do. Therefore, leaving CQEs on queue makes sense only when there is another mechanism that ensures all remaining CQEs are consumed in a timely fashion. xprtrdma does not have such a mechanism. If a CQE remains queued, the transport can wait forever to send the next RPC. Finally, move the wcs array back onto the stack to ensure that the poll array is always local to the CPU where the completion upcall is running. Fixes: 8301a2c047cc ("xprtrdma: Limit work done by completion ...") Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Reviewed-by: Devesh Sharma Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 74 +++++++++++++++++++++-------------------- net/sunrpc/xprtrdma/xprt_rdma.h | 5 --- 2 files changed, 38 insertions(+), 41 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 61eea73..6661b1b 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -158,25 +158,30 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc) } } -static int -rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) +/* The common case is a single send completion is waiting. By + * passing two WC entries to ib_poll_cq, a return code of 1 + * means there is exactly one WC waiting and no more. We don't + * have to invoke ib_poll_cq again to know that the CQ has been + * properly drained. + */ +static void +rpcrdma_sendcq_poll(struct ib_cq *cq) { - struct ib_wc *wcs; - int budget, count, rc; + struct ib_wc *pos, wcs[2]; + int count, rc; - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; do { - wcs = ep->rep_send_wcs; + pos = wcs; - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); - if (rc <= 0) - return rc; + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); + if (rc < 0) + break; count = rc; while (count-- > 0) - rpcrdma_sendcq_process_wc(wcs++); - } while (rc == RPCRDMA_POLLSIZE && --budget); - return 0; + rpcrdma_sendcq_process_wc(pos++); + } while (rc == ARRAY_SIZE(wcs)); + return; } /* Handle provider send completion upcalls. @@ -184,10 +189,8 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) static void rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) { - struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; - do { - rpcrdma_sendcq_poll(cq, ep); + rpcrdma_sendcq_poll(cq); } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS) > 0); } @@ -226,31 +229,32 @@ out_fail: goto out_schedule; } -static int -rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) +/* The wc array is on stack: automatic memory is always CPU-local. + * + * struct ib_wc is 64 bytes, making the poll array potentially + * large. But this is at the bottom of the call chain. Further + * substantial work is done in another thread. + */ +static void +rpcrdma_recvcq_poll(struct ib_cq *cq) { - struct list_head sched_list; - struct ib_wc *wcs; - int budget, count, rc; + struct ib_wc *pos, wcs[4]; + LIST_HEAD(sched_list); + int count, rc; - INIT_LIST_HEAD(&sched_list); - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; do { - wcs = ep->rep_recv_wcs; + pos = wcs; - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); - if (rc <= 0) - goto out_schedule; + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); + if (rc < 0) + break; count = rc; while (count-- > 0) - rpcrdma_recvcq_process_wc(wcs++, &sched_list); - } while (rc == RPCRDMA_POLLSIZE && --budget); - rc = 0; + rpcrdma_recvcq_process_wc(pos++, &sched_list); + } while (rc == ARRAY_SIZE(wcs)); -out_schedule: rpcrdma_schedule_tasklet(&sched_list); - return rc; } /* Handle provider receive completion upcalls. @@ -258,10 +262,8 @@ out_schedule: static void rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) { - struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; - do { - rpcrdma_recvcq_poll(cq, ep); + rpcrdma_recvcq_poll(cq); } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS) > 0); } @@ -623,7 +625,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, - rpcrdma_cq_async_error_upcall, ep, &cq_attr); + rpcrdma_cq_async_error_upcall, NULL, &cq_attr); if (IS_ERR(sendcq)) { rc = PTR_ERR(sendcq); dprintk("RPC: %s: failed to create send CQ: %i\n", @@ -640,7 +642,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, - rpcrdma_cq_async_error_upcall, ep, &cq_attr); + rpcrdma_cq_async_error_upcall, NULL, &cq_attr); if (IS_ERR(recvcq)) { rc = PTR_ERR(recvcq); dprintk("RPC: %s: failed to create recv CQ: %i\n", diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index c09414e..42c8d44 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -77,9 +77,6 @@ struct rpcrdma_ia { * RDMA Endpoint -- one per transport instance */ -#define RPCRDMA_WC_BUDGET (128) -#define RPCRDMA_POLLSIZE (16) - struct rpcrdma_ep { atomic_t rep_cqcount; int rep_cqinit; @@ -89,8 +86,6 @@ struct rpcrdma_ep { struct rdma_conn_param rep_remote_cma; struct sockaddr_storage rep_remote_addr; struct delayed_work rep_connect_worker; - struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE]; - struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE]; }; /* -- cgit v1.1 From b0e178a2d8ad4bd6c6bbf5d3f3cf50ca8907581b Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:26:54 -0400 Subject: xprtrdma: Refactor reply handler error handling Clean up: The error cases in rpcrdma_reply_handler() almost never execute. Ensure the compiler places them out of the hot path. No behavior change expected. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Reviewed-by: Devesh Sharma Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 89 +++++++++++++++++++++++------------------ net/sunrpc/xprtrdma/verbs.c | 2 +- net/sunrpc/xprtrdma/xprt_rdma.h | 2 + 3 files changed, 53 insertions(+), 40 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index bc8bd65..60ffa63 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -741,52 +741,27 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) unsigned long cwnd; u32 credits; - /* Check status. If bad, signal disconnect and return rep to pool */ - if (rep->rr_len == ~0U) { - rpcrdma_recv_buffer_put(rep); - if (r_xprt->rx_ep.rep_connected == 1) { - r_xprt->rx_ep.rep_connected = -EIO; - rpcrdma_conn_func(&r_xprt->rx_ep); - } - return; - } - if (rep->rr_len < RPCRDMA_HDRLEN_MIN) { - dprintk("RPC: %s: short/invalid reply\n", __func__); - goto repost; - } + dprintk("RPC: %s: incoming rep %p\n", __func__, rep); + + if (rep->rr_len == RPCRDMA_BAD_LEN) + goto out_badstatus; + if (rep->rr_len < RPCRDMA_HDRLEN_MIN) + goto out_shortreply; + headerp = rdmab_to_msg(rep->rr_rdmabuf); - if (headerp->rm_vers != rpcrdma_version) { - dprintk("RPC: %s: invalid version %d\n", - __func__, be32_to_cpu(headerp->rm_vers)); - goto repost; - } + if (headerp->rm_vers != rpcrdma_version) + goto out_badversion; /* Get XID and try for a match. */ spin_lock(&xprt->transport_lock); rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); - if (rqst == NULL) { - spin_unlock(&xprt->transport_lock); - dprintk("RPC: %s: reply 0x%p failed " - "to match any request xid 0x%08x len %d\n", - __func__, rep, be32_to_cpu(headerp->rm_xid), - rep->rr_len); -repost: - r_xprt->rx_stats.bad_reply_count++; - if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) - rpcrdma_recv_buffer_put(rep); - - return; - } + if (!rqst) + goto out_nomatch; /* get request object */ req = rpcr_to_rdmar(rqst); - if (req->rl_reply) { - spin_unlock(&xprt->transport_lock); - dprintk("RPC: %s: duplicate reply 0x%p to RPC " - "request 0x%p: xid 0x%08x\n", __func__, rep, req, - be32_to_cpu(headerp->rm_xid)); - goto repost; - } + if (req->rl_reply) + goto out_duplicate; dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" " RPC request 0x%p xid 0x%08x\n", @@ -883,8 +858,44 @@ badheader: if (xprt->cwnd > cwnd) xprt_release_rqst_cong(rqst->rq_task); + xprt_complete_rqst(rqst->rq_task, status); + spin_unlock(&xprt->transport_lock); dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", __func__, xprt, rqst, status); - xprt_complete_rqst(rqst->rq_task, status); + return; + +out_badstatus: + rpcrdma_recv_buffer_put(rep); + if (r_xprt->rx_ep.rep_connected == 1) { + r_xprt->rx_ep.rep_connected = -EIO; + rpcrdma_conn_func(&r_xprt->rx_ep); + } + return; + +out_shortreply: + dprintk("RPC: %s: short/invalid reply\n", __func__); + goto repost; + +out_badversion: + dprintk("RPC: %s: invalid version %d\n", + __func__, be32_to_cpu(headerp->rm_vers)); + goto repost; + +out_nomatch: + spin_unlock(&xprt->transport_lock); + dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", + __func__, be32_to_cpu(headerp->rm_xid), + rep->rr_len); + goto repost; + +out_duplicate: spin_unlock(&xprt->transport_lock); + dprintk("RPC: %s: " + "duplicate reply %p to RPC request %p: xid 0x%08x\n", + __func__, rep, req, be32_to_cpu(headerp->rm_xid)); + +repost: + r_xprt->rx_stats.bad_reply_count++; + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) + rpcrdma_recv_buffer_put(rep); } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 6661b1b..a60b4c4 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -225,7 +225,7 @@ out_fail: if (wc->status != IB_WC_WR_FLUSH_ERR) pr_err("RPC: %s: rep %p: %s\n", __func__, rep, ib_wc_status_msg(wc->status)); - rep->rr_len = ~0U; + rep->rr_len = RPCRDMA_BAD_LEN; goto out_schedule; } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 42c8d44..a13508b 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -168,6 +168,8 @@ struct rpcrdma_rep { struct rpcrdma_regbuf *rr_rdmabuf; }; +#define RPCRDMA_BAD_LEN (~0U) + /* * struct rpcrdma_mw - external memory region metadata * -- cgit v1.1 From 1e465fd4ff475cc29c866ee75496c941b3908e69 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:02 -0400 Subject: xprtrdma: Replace send and receive arrays The rb_send_bufs and rb_recv_bufs arrays are used to implement a pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep structs. Replace those arrays with free lists. To allow more than 512 RPCs in-flight at once, each of these arrays would be larger than a page (assuming 8-byte addresses and 4KB pages). Allowing up to 64K in-flight RPCs (as TCP now does), each buffer array would have to be 128 pages. That's an order-6 allocation. (Not that we're going there.) A list is easier to expand dynamically. Instead of allocating a larger array of pointers and copying the existing pointers to the new array, simply append more buffers to each list. This also makes it simpler to manage receive buffers that might catch backwards-direction calls, or to post receive buffers in bulk to amortize the overhead of ib_post_recv. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Reviewed-by: Devesh Sharma Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 155 ++++++++++++++++++---------------------- net/sunrpc/xprtrdma/xprt_rdma.h | 9 ++- 2 files changed, 73 insertions(+), 91 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index a60b4c4..c09f1b6 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -926,44 +926,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; - char *p; - size_t len; int i, rc; - buf->rb_max_requests = cdata->max_requests; + buf->rb_max_requests = r_xprt->rx_data.max_requests; spin_lock_init(&buf->rb_lock); - /* Need to allocate: - * 1. arrays for send and recv pointers - * 2. arrays of struct rpcrdma_req to fill in pointers - * 3. array of struct rpcrdma_rep for replies - * Send/recv buffers in req/rep need to be registered - */ - len = buf->rb_max_requests * - (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *)); - - p = kzalloc(len, GFP_KERNEL); - if (p == NULL) { - dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n", - __func__, len); - rc = -ENOMEM; - goto out; - } - buf->rb_pool = p; /* for freeing it later */ - - buf->rb_send_bufs = (struct rpcrdma_req **) p; - p = (char *) &buf->rb_send_bufs[buf->rb_max_requests]; - buf->rb_recv_bufs = (struct rpcrdma_rep **) p; - p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests]; - rc = ia->ri_ops->ro_init(r_xprt); if (rc) goto out; + INIT_LIST_HEAD(&buf->rb_send_bufs); for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; - struct rpcrdma_rep *rep; req = rpcrdma_create_req(r_xprt); if (IS_ERR(req)) { @@ -972,7 +946,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) rc = PTR_ERR(req); goto out; } - buf->rb_send_bufs[i] = req; + list_add(&req->rl_free, &buf->rb_send_bufs); + } + + INIT_LIST_HEAD(&buf->rb_recv_bufs); + for (i = 0; i < buf->rb_max_requests + 2; i++) { + struct rpcrdma_rep *rep; rep = rpcrdma_create_rep(r_xprt); if (IS_ERR(rep)) { @@ -981,7 +960,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) rc = PTR_ERR(rep); goto out; } - buf->rb_recv_bufs[i] = rep; + list_add(&rep->rr_list, &buf->rb_recv_bufs); } return 0; @@ -990,6 +969,28 @@ out: return rc; } +static struct rpcrdma_req * +rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_req *req; + + req = list_first_entry(&buf->rb_send_bufs, + struct rpcrdma_req, rl_free); + list_del(&req->rl_free); + return req; +} + +static struct rpcrdma_rep * +rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_rep *rep; + + rep = list_first_entry(&buf->rb_recv_bufs, + struct rpcrdma_rep, rr_list); + list_del(&rep->rr_list); + return rep; +} + static void rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) { @@ -1015,25 +1016,22 @@ void rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_ia *ia = rdmab_to_ia(buf); - int i; - /* clean up in reverse order from create - * 1. recv mr memory (mr free, then kfree) - * 2. send mr memory (mr free, then kfree) - * 3. MWs - */ - dprintk("RPC: %s: entering\n", __func__); + while (!list_empty(&buf->rb_recv_bufs)) { + struct rpcrdma_rep *rep; - for (i = 0; i < buf->rb_max_requests; i++) { - if (buf->rb_recv_bufs) - rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]); - if (buf->rb_send_bufs) - rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]); + rep = rpcrdma_buffer_get_rep_locked(buf); + rpcrdma_destroy_rep(ia, rep); } - ia->ri_ops->ro_destroy(buf); + while (!list_empty(&buf->rb_send_bufs)) { + struct rpcrdma_req *req; - kfree(buf->rb_pool); + req = rpcrdma_buffer_get_req_locked(buf); + rpcrdma_destroy_req(ia, req); + } + + ia->ri_ops->ro_destroy(buf); } struct rpcrdma_mw * @@ -1065,25 +1063,10 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) spin_unlock(&buf->rb_mwlock); } -static void -rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) -{ - buf->rb_send_bufs[--buf->rb_send_index] = req; - req->rl_niovs = 0; - if (req->rl_reply) { - buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply; - req->rl_reply = NULL; - } -} - /* * Get a set of request/reply buffers. * - * Reply buffer (if needed) is attached to send buffer upon return. - * Rule: - * rb_send_index and rb_recv_index MUST always be pointing to the - * *next* available buffer (non-NULL). They are incremented after - * removing buffers, and decremented *before* returning them. + * Reply buffer (if available) is attached to send buffer upon return. */ struct rpcrdma_req * rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) @@ -1092,26 +1075,23 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) unsigned long flags; spin_lock_irqsave(&buffers->rb_lock, flags); + if (list_empty(&buffers->rb_send_bufs)) + goto out_reqbuf; + req = rpcrdma_buffer_get_req_locked(buffers); + if (list_empty(&buffers->rb_recv_bufs)) + goto out_repbuf; + req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); + spin_unlock_irqrestore(&buffers->rb_lock, flags); + return req; - if (buffers->rb_send_index == buffers->rb_max_requests) { - spin_unlock_irqrestore(&buffers->rb_lock, flags); - dprintk("RPC: %s: out of request buffers\n", __func__); - return ((struct rpcrdma_req *)NULL); - } - - req = buffers->rb_send_bufs[buffers->rb_send_index]; - if (buffers->rb_send_index < buffers->rb_recv_index) { - dprintk("RPC: %s: %d extra receives outstanding (ok)\n", - __func__, - buffers->rb_recv_index - buffers->rb_send_index); - req->rl_reply = NULL; - } else { - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; - } - buffers->rb_send_bufs[buffers->rb_send_index++] = NULL; - +out_reqbuf: + spin_unlock_irqrestore(&buffers->rb_lock, flags); + pr_warn("RPC: %s: out of request buffers\n", __func__); + return NULL; +out_repbuf: spin_unlock_irqrestore(&buffers->rb_lock, flags); + pr_warn("RPC: %s: out of reply buffers\n", __func__); + req->rl_reply = NULL; return req; } @@ -1123,17 +1103,22 @@ void rpcrdma_buffer_put(struct rpcrdma_req *req) { struct rpcrdma_buffer *buffers = req->rl_buffer; + struct rpcrdma_rep *rep = req->rl_reply; unsigned long flags; + req->rl_niovs = 0; + req->rl_reply = NULL; + spin_lock_irqsave(&buffers->rb_lock, flags); - rpcrdma_buffer_put_sendbuf(req, buffers); + list_add_tail(&req->rl_free, &buffers->rb_send_bufs); + if (rep) + list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); spin_unlock_irqrestore(&buffers->rb_lock, flags); } /* * Recover reply buffers from pool. - * This happens when recovering from error conditions. - * Post-increment counter/array index. + * This happens when recovering from disconnect. */ void rpcrdma_recv_buffer_get(struct rpcrdma_req *req) @@ -1142,10 +1127,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req) unsigned long flags; spin_lock_irqsave(&buffers->rb_lock, flags); - if (buffers->rb_recv_index < buffers->rb_max_requests) { - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; - } + if (!list_empty(&buffers->rb_recv_bufs)) + req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); spin_unlock_irqrestore(&buffers->rb_lock, flags); } @@ -1160,7 +1143,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) unsigned long flags; spin_lock_irqsave(&buffers->rb_lock, flags); - buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep; + list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); spin_unlock_irqrestore(&buffers->rb_lock, flags); } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index a13508b..e6a358f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -252,6 +252,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */ #define RPCRDMA_MAX_IOVS (2) struct rpcrdma_req { + struct list_head rl_free; unsigned int rl_niovs; unsigned int rl_nchunks; unsigned int rl_connect_cookie; @@ -285,12 +286,10 @@ struct rpcrdma_buffer { struct list_head rb_all; char *rb_pool; - spinlock_t rb_lock; /* protect buf arrays */ + spinlock_t rb_lock; /* protect buf lists */ + struct list_head rb_send_bufs; + struct list_head rb_recv_bufs; u32 rb_max_requests; - int rb_send_index; - int rb_recv_index; - struct rpcrdma_req **rb_send_bufs; - struct rpcrdma_rep **rb_recv_bufs; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) -- cgit v1.1 From fe97b47cd623ebbaa55a163c336abc47153526d1 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:10 -0400 Subject: xprtrdma: Use workqueue to process RPC/RDMA replies The reply tasklet is fast, but it's single threaded. After reply traffic saturates a single CPU, there's no more reply processing capacity. Replace the tasklet with a workqueue to spread reply handling across all CPUs. This also moves RPC/RDMA reply handling out of the soft IRQ context and into a context that allows sleeps. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 17 +++++++------ net/sunrpc/xprtrdma/transport.c | 8 ++++++ net/sunrpc/xprtrdma/verbs.c | 54 +++++++++++++++++++++++++++++++++-------- net/sunrpc/xprtrdma/xprt_rdma.h | 4 +++ 4 files changed, 65 insertions(+), 18 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 60ffa63..95774fc 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -723,8 +723,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep) schedule_delayed_work(&ep->rep_connect_worker, 0); } -/* - * Called as a tasklet to do req/reply match and complete a request +/* Process received RPC/RDMA messages. + * * Errors must result in the RPC task either being awakened, or * allowed to timeout, to discover the errors at that time. */ @@ -752,13 +752,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (headerp->rm_vers != rpcrdma_version) goto out_badversion; - /* Get XID and try for a match. */ - spin_lock(&xprt->transport_lock); + /* Match incoming rpcrdma_rep to an rpcrdma_req to + * get context for handling any incoming chunks. + */ + spin_lock_bh(&xprt->transport_lock); rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); if (!rqst) goto out_nomatch; - /* get request object */ req = rpcr_to_rdmar(rqst); if (req->rl_reply) goto out_duplicate; @@ -859,7 +860,7 @@ badheader: xprt_release_rqst_cong(rqst->rq_task); xprt_complete_rqst(rqst->rq_task, status); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", __func__, xprt, rqst, status); return; @@ -882,14 +883,14 @@ out_badversion: goto repost; out_nomatch: - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len); goto repost; out_duplicate: - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); dprintk("RPC: %s: " "duplicate reply %p to RPC request %p: xid 0x%08x\n", __func__, rep, req, be32_to_cpu(headerp->rm_xid)); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index e9e5ed7..897a2f3 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -732,6 +732,7 @@ void xprt_rdma_cleanup(void) dprintk("RPC: %s: xprt_unregister returned %i\n", __func__, rc); + rpcrdma_destroy_wq(); frwr_destroy_recovery_wq(); } @@ -743,8 +744,15 @@ int xprt_rdma_init(void) if (rc) return rc; + rc = rpcrdma_alloc_wq(); + if (rc) { + frwr_destroy_recovery_wq(); + return rc; + } + rc = xprt_register_transport(&xprt_rdma); if (rc) { + rpcrdma_destroy_wq(); frwr_destroy_recovery_wq(); return rc; } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index c09f1b6..5c20629 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -100,6 +100,35 @@ rpcrdma_run_tasklet(unsigned long data) static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); +static struct workqueue_struct *rpcrdma_receive_wq; + +int +rpcrdma_alloc_wq(void) +{ + struct workqueue_struct *recv_wq; + + recv_wq = alloc_workqueue("xprtrdma_receive", + WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, + 0); + if (!recv_wq) + return -ENOMEM; + + rpcrdma_receive_wq = recv_wq; + return 0; +} + +void +rpcrdma_destroy_wq(void) +{ + struct workqueue_struct *wq; + + if (rpcrdma_receive_wq) { + wq = rpcrdma_receive_wq; + rpcrdma_receive_wq = NULL; + destroy_workqueue(wq); + } +} + static void rpcrdma_schedule_tasklet(struct list_head *sched_list) { @@ -196,7 +225,16 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) } static void -rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) +rpcrdma_receive_worker(struct work_struct *work) +{ + struct rpcrdma_rep *rep = + container_of(work, struct rpcrdma_rep, rr_work); + + rpcrdma_reply_handler(rep); +} + +static void +rpcrdma_recvcq_process_wc(struct ib_wc *wc) { struct rpcrdma_rep *rep = (struct rpcrdma_rep *)(unsigned long)wc->wr_id; @@ -219,8 +257,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) prefetch(rdmab_to_msg(rep->rr_rdmabuf)); out_schedule: - list_add_tail(&rep->rr_list, sched_list); + queue_work(rpcrdma_receive_wq, &rep->rr_work); return; + out_fail: if (wc->status != IB_WC_WR_FLUSH_ERR) pr_err("RPC: %s: rep %p: %s\n", @@ -239,7 +278,6 @@ static void rpcrdma_recvcq_poll(struct ib_cq *cq) { struct ib_wc *pos, wcs[4]; - LIST_HEAD(sched_list); int count, rc; do { @@ -251,10 +289,8 @@ rpcrdma_recvcq_poll(struct ib_cq *cq) count = rc; while (count-- > 0) - rpcrdma_recvcq_process_wc(pos++, &sched_list); + rpcrdma_recvcq_process_wc(pos++); } while (rc == ARRAY_SIZE(wcs)); - - rpcrdma_schedule_tasklet(&sched_list); } /* Handle provider receive completion upcalls. @@ -272,12 +308,9 @@ static void rpcrdma_flush_cqs(struct rpcrdma_ep *ep) { struct ib_wc wc; - LIST_HEAD(sched_list); while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_recvcq_process_wc(&wc, &sched_list); - if (!list_empty(&sched_list)) - rpcrdma_schedule_tasklet(&sched_list); + rpcrdma_recvcq_process_wc(&wc); while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) rpcrdma_sendcq_process_wc(&wc); } @@ -913,6 +946,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) rep->rr_device = ia->ri_device; rep->rr_rxprt = r_xprt; + INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); return rep; out_free: diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index e6a358f..6ea1dbe 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -164,6 +164,7 @@ struct rpcrdma_rep { unsigned int rr_len; struct ib_device *rr_device; struct rpcrdma_xprt *rr_rxprt; + struct work_struct rr_work; struct list_head rr_list; struct rpcrdma_regbuf *rr_rdmabuf; }; @@ -430,6 +431,9 @@ unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); int frwr_alloc_recovery_wq(void); void frwr_destroy_recovery_wq(void); +int rpcrdma_alloc_wq(void); +void rpcrdma_destroy_wq(void); + /* * Wrappers for chunk registration, shared by read/write chunk code. */ -- cgit v1.1 From 2da9ab3008f359857eb594b0b4b0fee62f2a73c2 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:18 -0400 Subject: xprtrdma: Remove reply tasklet Clean up: The reply tasklet is no longer used. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 43 ------------------------------------------- 1 file changed, 43 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 5c20629..3dd5a7c 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -68,38 +68,6 @@ * internal functions */ -/* - * handle replies in tasklet context, using a single, global list - * rdma tasklet function -- just turn around and call the func - * for all replies on the list - */ - -static DEFINE_SPINLOCK(rpcrdma_tk_lock_g); -static LIST_HEAD(rpcrdma_tasklets_g); - -static void -rpcrdma_run_tasklet(unsigned long data) -{ - struct rpcrdma_rep *rep; - unsigned long flags; - - data = data; - spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); - while (!list_empty(&rpcrdma_tasklets_g)) { - rep = list_entry(rpcrdma_tasklets_g.next, - struct rpcrdma_rep, rr_list); - list_del(&rep->rr_list); - spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); - - rpcrdma_reply_handler(rep); - - spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); - } - spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); -} - -static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); - static struct workqueue_struct *rpcrdma_receive_wq; int @@ -130,17 +98,6 @@ rpcrdma_destroy_wq(void) } static void -rpcrdma_schedule_tasklet(struct list_head *sched_list) -{ - unsigned long flags; - - spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); - list_splice_tail(sched_list, &rpcrdma_tasklets_g); - spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); - tasklet_schedule(&rpcrdma_tasklet_g); -} - -static void rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) { struct rpcrdma_ep *ep = context; -- cgit v1.1 From a5b027e1897c811401862877d0ba4ca26fabc4da Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:27 -0400 Subject: xprtrdma: Saving IRQs no longer needed for rb_lock Now that RPC replies are processed in a workqueue, there's no need to disable IRQs when managing send and receive buffers. This saves noticeable overhead per RPC. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 3dd5a7c..baa0523 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1063,24 +1063,23 @@ struct rpcrdma_req * rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) { struct rpcrdma_req *req; - unsigned long flags; - spin_lock_irqsave(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); if (list_empty(&buffers->rb_send_bufs)) goto out_reqbuf; req = rpcrdma_buffer_get_req_locked(buffers); if (list_empty(&buffers->rb_recv_bufs)) goto out_repbuf; req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); return req; out_reqbuf: - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); pr_warn("RPC: %s: out of request buffers\n", __func__); return NULL; out_repbuf: - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); pr_warn("RPC: %s: out of reply buffers\n", __func__); req->rl_reply = NULL; return req; @@ -1095,16 +1094,15 @@ rpcrdma_buffer_put(struct rpcrdma_req *req) { struct rpcrdma_buffer *buffers = req->rl_buffer; struct rpcrdma_rep *rep = req->rl_reply; - unsigned long flags; req->rl_niovs = 0; req->rl_reply = NULL; - spin_lock_irqsave(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); list_add_tail(&req->rl_free, &buffers->rb_send_bufs); if (rep) list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); } /* @@ -1115,12 +1113,11 @@ void rpcrdma_recv_buffer_get(struct rpcrdma_req *req) { struct rpcrdma_buffer *buffers = req->rl_buffer; - unsigned long flags; - spin_lock_irqsave(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); if (!list_empty(&buffers->rb_recv_bufs)) req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); } /* @@ -1131,11 +1128,10 @@ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) { struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; - unsigned long flags; - spin_lock_irqsave(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); } /* -- cgit v1.1 From 42e5c3e2725ba0c0affc1fc8a6aa1d5cf31ecb75 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:35 -0400 Subject: SUNRPC: Abstract backchannel operations xprt_{setup,destroy}_backchannel() won't be adequate for RPC/RMDA bi-direction. In particular, receive buffers have to be pre- registered and posted in order to receive incoming backchannel requests. Add a virtual function call to allow the insertion of appropriate backchannel setup and destruction methods for each transport. In addition, freeing a backchannel request is a little different for RPC/RDMA. Introduce an rpc_xprt_op to handle the difference. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/backchannel_rqst.c | 24 ++++++++++++++++++++++-- net/sunrpc/xprtsock.c | 5 +++++ 2 files changed, 27 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c index 6255d14..229956b 100644 --- a/net/sunrpc/backchannel_rqst.c +++ b/net/sunrpc/backchannel_rqst.c @@ -138,6 +138,14 @@ out_free: */ int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) { + if (!xprt->ops->bc_setup) + return 0; + return xprt->ops->bc_setup(xprt, min_reqs); +} +EXPORT_SYMBOL_GPL(xprt_setup_backchannel); + +int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) +{ struct rpc_rqst *req; struct list_head tmp_list; int i; @@ -192,7 +200,6 @@ out_free: dprintk("RPC: setup backchannel transport failed\n"); return -ENOMEM; } -EXPORT_SYMBOL_GPL(xprt_setup_backchannel); /** * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. @@ -205,6 +212,13 @@ EXPORT_SYMBOL_GPL(xprt_setup_backchannel); */ void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) { + if (xprt->ops->bc_destroy) + xprt->ops->bc_destroy(xprt, max_reqs); +} +EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); + +void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs) +{ struct rpc_rqst *req = NULL, *tmp = NULL; dprintk("RPC: destroy backchannel transport\n"); @@ -227,7 +241,6 @@ out: dprintk("RPC: backchannel list empty= %s\n", list_empty(&xprt->bc_pa_list) ? "true" : "false"); } -EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid) { @@ -264,6 +277,13 @@ void xprt_free_bc_request(struct rpc_rqst *req) { struct rpc_xprt *xprt = req->rq_xprt; + xprt->ops->bc_free_rqst(req); +} + +void xprt_free_bc_rqst(struct rpc_rqst *req) +{ + struct rpc_xprt *xprt = req->rq_xprt; + dprintk("RPC: free backchannel req=%p\n", req); req->rq_connect_cookie = xprt->connect_cookie - 1; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 1a85e0e..44a81e4 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -2580,6 +2580,11 @@ static struct rpc_xprt_ops xs_tcp_ops = { .enable_swap = xs_enable_swap, .disable_swap = xs_disable_swap, .inject_disconnect = xs_inject_disconnect, +#ifdef CONFIG_SUNRPC_BACKCHANNEL + .bc_setup = xprt_setup_bc, + .bc_free_rqst = xprt_free_bc_rqst, + .bc_destroy = xprt_destroy_bc, +#endif }; /* -- cgit v1.1 From f531a5dbc451afb66e9d6c71a69e8358d1847969 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:43 -0400 Subject: xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers xprtrdma's backward direction send and receive buffers are the same size as the forechannel's inline threshold, and must be pre- registered. The consumer has no control over which receive buffer the adapter chooses to catch an incoming backwards-direction call. Any receive buffer can be used for either a forward reply or a backward call. Thus both types of RPC message must all be the same size. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/Makefile | 1 + net/sunrpc/xprtrdma/backchannel.c | 206 ++++++++++++++++++++++++++++++++++++++ net/sunrpc/xprtrdma/transport.c | 7 +- net/sunrpc/xprtrdma/verbs.c | 87 ++++++++++++++-- net/sunrpc/xprtrdma/xprt_rdma.h | 20 ++++ 5 files changed, 309 insertions(+), 12 deletions(-) create mode 100644 net/sunrpc/xprtrdma/backchannel.c (limited to 'net') diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index 48913de..33f99d3 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \ svc_rdma.o svc_rdma_transport.o \ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ module.o +rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c new file mode 100644 index 0000000..3d01b32 --- /dev/null +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2015 Oracle. All rights reserved. + * + * Support for backward direction RPCs on RPC/RDMA. + */ + +#include + +#include "xprt_rdma.h" + +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) +# define RPCDBG_FACILITY RPCDBG_TRANS +#endif + +static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + + spin_lock(&buf->rb_reqslock); + list_del(&req->rl_all); + spin_unlock(&buf->rb_reqslock); + + rpcrdma_destroy_req(&r_xprt->rx_ia, req); + + kfree(rqst); +} + +static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) +{ + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_regbuf *rb; + struct rpcrdma_req *req; + struct xdr_buf *buf; + size_t size; + + req = rpcrdma_create_req(r_xprt); + if (!req) + return -ENOMEM; + req->rl_backchannel = true; + + size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); + if (IS_ERR(rb)) + goto out_fail; + req->rl_rdmabuf = rb; + + size += RPCRDMA_INLINE_READ_THRESHOLD(rqst); + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); + if (IS_ERR(rb)) + goto out_fail; + rb->rg_owner = req; + req->rl_sendbuf = rb; + /* so that rpcr_to_rdmar works when receiving a request */ + rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base; + + buf = &rqst->rq_snd_buf; + buf->head[0].iov_base = rqst->rq_buffer; + buf->head[0].iov_len = 0; + buf->tail[0].iov_base = NULL; + buf->tail[0].iov_len = 0; + buf->page_len = 0; + buf->len = 0; + buf->buflen = size; + + return 0; + +out_fail: + rpcrdma_bc_free_rqst(r_xprt, rqst); + return -ENOMEM; +} + +/* Allocate and add receive buffers to the rpcrdma_buffer's + * existing list of rep's. These are released when the + * transport is destroyed. + */ +static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, + unsigned int count) +{ + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; + struct rpcrdma_rep *rep; + unsigned long flags; + int rc = 0; + + while (count--) { + rep = rpcrdma_create_rep(r_xprt); + if (IS_ERR(rep)) { + pr_err("RPC: %s: reply buffer alloc failed\n", + __func__); + rc = PTR_ERR(rep); + break; + } + + spin_lock_irqsave(&buffers->rb_lock, flags); + list_add(&rep->rr_list, &buffers->rb_recv_bufs); + spin_unlock_irqrestore(&buffers->rb_lock, flags); + } + + return rc; +} + +/** + * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests + * @xprt: transport associated with these backchannel resources + * @reqs: number of concurrent incoming requests to expect + * + * Returns 0 on success; otherwise a negative errno + */ +int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; + struct rpc_rqst *rqst; + unsigned int i; + int rc; + + /* The backchannel reply path returns each rpc_rqst to the + * bc_pa_list _after_ the reply is sent. If the server is + * faster than the client, it can send another backward + * direction request before the rpc_rqst is returned to the + * list. The client rejects the request in this case. + * + * Twice as many rpc_rqsts are prepared to ensure there is + * always an rpc_rqst available as soon as a reply is sent. + */ + for (i = 0; i < (reqs << 1); i++) { + rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); + if (!rqst) { + pr_err("RPC: %s: Failed to create bc rpc_rqst\n", + __func__); + goto out_free; + } + + rqst->rq_xprt = &r_xprt->rx_xprt; + INIT_LIST_HEAD(&rqst->rq_list); + INIT_LIST_HEAD(&rqst->rq_bc_list); + + if (rpcrdma_bc_setup_rqst(r_xprt, rqst)) + goto out_free; + + spin_lock_bh(&xprt->bc_pa_lock); + list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); + } + + rc = rpcrdma_bc_setup_reps(r_xprt, reqs); + if (rc) + goto out_free; + + rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs); + if (rc) + goto out_free; + + buffer->rb_bc_srv_max_requests = reqs; + request_module("svcrdma"); + + return 0; + +out_free: + xprt_rdma_bc_destroy(xprt, reqs); + + pr_err("RPC: %s: setup backchannel transport failed\n", __func__); + return -ENOMEM; +} + +/** + * xprt_rdma_bc_destroy - Release resources for handling backchannel requests + * @xprt: transport associated with these backchannel resources + * @reqs: number of incoming requests to destroy; ignored + */ +void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpc_rqst *rqst, *tmp; + + spin_lock_bh(&xprt->bc_pa_lock); + list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { + list_del(&rqst->rq_bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); + + rpcrdma_bc_free_rqst(r_xprt, rqst); + + spin_lock_bh(&xprt->bc_pa_lock); + } + spin_unlock_bh(&xprt->bc_pa_lock); +} + +/** + * xprt_rdma_bc_free_rqst - Release a backchannel rqst + * @rqst: request to release + */ +void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) +{ + struct rpc_xprt *xprt = rqst->rq_xprt; + + smp_mb__before_atomic(); + WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)); + clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); + smp_mb__after_atomic(); + + spin_lock_bh(&xprt->bc_pa_lock); + list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); +} diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 897a2f3..845278e 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -705,7 +705,12 @@ static struct rpc_xprt_ops xprt_rdma_procs = { .print_stats = xprt_rdma_print_stats, .enable_swap = xprt_rdma_enable_swap, .disable_swap = xprt_rdma_disable_swap, - .inject_disconnect = xprt_rdma_inject_disconnect + .inject_disconnect = xprt_rdma_inject_disconnect, +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + .bc_setup = xprt_rdma_bc_setup, + .bc_free_rqst = xprt_rdma_bc_free_rqst, + .bc_destroy = xprt_rdma_bc_destroy, +#endif }; static struct xprt_class xprt_rdma = { diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index baa0523..7f0ed30 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -831,7 +831,21 @@ retry: } rc = ep->rep_connected; } else { + struct rpcrdma_xprt *r_xprt; + unsigned int extras; + dprintk("RPC: %s: connected\n", __func__); + + r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); + extras = r_xprt->rx_buf.rb_bc_srv_max_requests; + + if (extras) { + rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); + if (rc) + pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n", + __func__, rc); + rc = 0; + } } out: @@ -868,20 +882,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) } } -static struct rpcrdma_req * +struct rpcrdma_req * rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) { + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; struct rpcrdma_req *req; req = kzalloc(sizeof(*req), GFP_KERNEL); if (req == NULL) return ERR_PTR(-ENOMEM); + INIT_LIST_HEAD(&req->rl_free); + spin_lock(&buffer->rb_reqslock); + list_add(&req->rl_all, &buffer->rb_allreqs); + spin_unlock(&buffer->rb_reqslock); req->rl_buffer = &r_xprt->rx_buf; return req; } -static struct rpcrdma_rep * +struct rpcrdma_rep * rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; @@ -920,6 +939,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) int i, rc; buf->rb_max_requests = r_xprt->rx_data.max_requests; + buf->rb_bc_srv_max_requests = 0; spin_lock_init(&buf->rb_lock); rc = ia->ri_ops->ro_init(r_xprt); @@ -927,6 +947,8 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) goto out; INIT_LIST_HEAD(&buf->rb_send_bufs); + INIT_LIST_HEAD(&buf->rb_allreqs); + spin_lock_init(&buf->rb_reqslock); for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; @@ -937,6 +959,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) rc = PTR_ERR(req); goto out; } + req->rl_backchannel = false; list_add(&req->rl_free, &buf->rb_send_bufs); } @@ -985,19 +1008,13 @@ rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) static void rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) { - if (!rep) - return; - rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); kfree(rep); } -static void +void rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) { - if (!req) - return; - rpcrdma_free_regbuf(ia, req->rl_sendbuf); rpcrdma_free_regbuf(ia, req->rl_rdmabuf); kfree(req); @@ -1015,12 +1032,19 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) rpcrdma_destroy_rep(ia, rep); } - while (!list_empty(&buf->rb_send_bufs)) { + spin_lock(&buf->rb_reqslock); + while (!list_empty(&buf->rb_allreqs)) { struct rpcrdma_req *req; - req = rpcrdma_buffer_get_req_locked(buf); + req = list_first_entry(&buf->rb_allreqs, + struct rpcrdma_req, rl_all); + list_del(&req->rl_all); + + spin_unlock(&buf->rb_reqslock); rpcrdma_destroy_req(ia, req); + spin_lock(&buf->rb_reqslock); } + spin_unlock(&buf->rb_reqslock); ia->ri_ops->ro_destroy(buf); } @@ -1288,6 +1312,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, return rc; } +/** + * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests + * @r_xprt: transport associated with these backchannel resources + * @min_reqs: minimum number of incoming requests expected + * + * Returns zero if all requested buffers were posted, or a negative errno. + */ +int +rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) +{ + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; + struct rpcrdma_rep *rep; + unsigned long flags; + int rc; + + while (count--) { + spin_lock_irqsave(&buffers->rb_lock, flags); + if (list_empty(&buffers->rb_recv_bufs)) + goto out_reqbuf; + rep = rpcrdma_buffer_get_rep_locked(buffers); + spin_unlock_irqrestore(&buffers->rb_lock, flags); + + rc = rpcrdma_ep_post_recv(ia, ep, rep); + if (rc) + goto out_rc; + } + + return 0; + +out_reqbuf: + spin_unlock_irqrestore(&buffers->rb_lock, flags); + pr_warn("%s: no extra receive buffers\n", __func__); + return -ENOMEM; + +out_rc: + rpcrdma_recv_buffer_put(rep); + return rc; +} + /* How many chunk list items fit within our inline buffers? */ unsigned int diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 6ea1dbe..1eb86c79f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -263,6 +263,9 @@ struct rpcrdma_req { struct rpcrdma_regbuf *rl_rdmabuf; struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + + struct list_head rl_all; + bool rl_backchannel; }; static inline struct rpcrdma_req * @@ -291,6 +294,10 @@ struct rpcrdma_buffer { struct list_head rb_send_bufs; struct list_head rb_recv_bufs; u32 rb_max_requests; + + u32 rb_bc_srv_max_requests; + spinlock_t rb_reqslock; /* protect rb_allreqs */ + struct list_head rb_allreqs; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) @@ -411,6 +418,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *, /* * Buffer calls - xprtrdma/verbs.c */ +struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *); +struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *); +void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *); int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); @@ -427,6 +437,7 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *); unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); +int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); int frwr_alloc_recovery_wq(void); void frwr_destroy_recovery_wq(void); @@ -494,6 +505,15 @@ int rpcrdma_marshal_req(struct rpc_rqst *); int xprt_rdma_init(void); void xprt_rdma_cleanup(void); +/* Backchannel calls - xprtrdma/backchannel.c + */ +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); +int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); +void xprt_rdma_bc_free_rqst(struct rpc_rqst *); +void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + /* Temporary NFS request map cache. Created in svc_rdma.c */ extern struct kmem_cache *svc_rdma_map_cachep; /* WR context cache. Created in svc_rdma.c */ -- cgit v1.1 From 124fa17d3e33060fbb28e995a42c7f5c8b31b345 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:51 -0400 Subject: xprtrdma: Pre-allocate Work Requests for backchannel Pre-allocate extra send and receive Work Requests needed to handle backchannel receives and sends. The transport doesn't know how many extra WRs to pre-allocate until the xprt_setup_backchannel() call, but that's long after the WRs are allocated during forechannel setup. So, use a fixed value for now. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/backchannel.c | 4 ++++ net/sunrpc/xprtrdma/verbs.c | 14 ++++++++++++-- net/sunrpc/xprtrdma/xprt_rdma.h | 10 ++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 3d01b32..3165ed6 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -125,6 +125,9 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) * Twice as many rpc_rqsts are prepared to ensure there is * always an rpc_rqst available as soon as a reply is sent. */ + if (reqs > RPCRDMA_BACKWARD_WRS >> 1) + goto out_err; + for (i = 0; i < (reqs << 1); i++) { rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); if (!rqst) { @@ -161,6 +164,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) out_free: xprt_rdma_bc_destroy(xprt, reqs); +out_err: pr_err("RPC: %s: setup backchannel transport failed\n", __func__); return -ENOMEM; } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 7f0ed30..93883ff 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -568,6 +568,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct ib_device_attr *devattr = &ia->ri_devattr; struct ib_cq *sendcq, *recvcq; struct ib_cq_init_attr cq_attr = {}; + unsigned int max_qp_wr; int rc, err; if (devattr->max_sge < RPCRDMA_MAX_IOVS) { @@ -576,18 +577,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, return -ENOMEM; } + if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) { + dprintk("RPC: %s: insufficient wqe's available\n", + __func__); + return -ENOMEM; + } + max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS; + /* check provider's send/recv wr limits */ - if (cdata->max_requests > devattr->max_qp_wr) - cdata->max_requests = devattr->max_qp_wr; + if (cdata->max_requests > max_qp_wr) + cdata->max_requests = max_qp_wr; ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; ep->rep_attr.qp_context = ep; ep->rep_attr.srq = NULL; ep->rep_attr.cap.max_send_wr = cdata->max_requests; + ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; rc = ia->ri_ops->ro_open(ia, ep, cdata); if (rc) return rc; ep->rep_attr.cap.max_recv_wr = cdata->max_requests; + ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 1eb86c79f..55d2660 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -101,6 +101,16 @@ struct rpcrdma_ep { */ #define RPCRDMA_IGNORE_COMPLETION (0ULL) +/* Pre-allocate extra Work Requests for handling backward receives + * and sends. This is a fixed value because the Work Queues are + * allocated when the forward channel is set up. + */ +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +#define RPCRDMA_BACKWARD_WRS (8) +#else +#define RPCRDMA_BACKWARD_WRS (0) +#endif + /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV * * The below structure appears at the front of a large region of kmalloc'd -- cgit v1.1 From 83128a60ca74e996c5e0336c4fff0579f4a8c909 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:27:59 -0400 Subject: xprtrdma: Add support for sending backward direction RPC replies Backward direction RPC replies are sent via the client transport's send_request method, the same way forward direction RPC calls are sent. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/backchannel.c | 45 +++++++++++++++++++++++++++++++++++++++ net/sunrpc/xprtrdma/rpc_rdma.c | 5 +++++ net/sunrpc/xprtrdma/xprt_rdma.h | 1 + 3 files changed, 51 insertions(+) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 3165ed6..ffc4853 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -170,6 +170,51 @@ out_err: } /** + * rpcrdma_bc_marshal_reply - Send backwards direction reply + * @rqst: buffer containing RPC reply data + * + * Returns zero on success. + */ +int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) +{ + struct rpc_xprt *xprt = rqst->rq_xprt; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + struct rpcrdma_msg *headerp; + size_t rpclen; + + headerp = rdmab_to_msg(req->rl_rdmabuf); + headerp->rm_xid = rqst->rq_xid; + headerp->rm_vers = rpcrdma_version; + headerp->rm_credit = + cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); + headerp->rm_type = rdma_msg; + headerp->rm_body.rm_chunks[0] = xdr_zero; + headerp->rm_body.rm_chunks[1] = xdr_zero; + headerp->rm_body.rm_chunks[2] = xdr_zero; + + rpclen = rqst->rq_svec[0].iov_len; + + pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n", + __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf)); + pr_info("RPC: %s: RPC/RDMA: %*ph\n", + __func__, (int)RPCRDMA_HDRLEN_MIN, headerp); + pr_info("RPC: %s: RPC: %*ph\n", + __func__, (int)rpclen, rqst->rq_svec[0].iov_base); + + req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); + req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN; + req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); + + req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); + req->rl_send_iov[1].length = rpclen; + req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); + + req->rl_niovs = 2; + return 0; +} + +/** * xprt_rdma_bc_destroy - Release resources for handling backchannel requests * @xprt: transport associated with these backchannel resources * @reqs: number of incoming requests to destroy; ignored diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 95774fc..b7a21e5 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) + return rpcrdma_bc_marshal_reply(rqst); +#endif + /* * rpclen gets amount of data in first buffer, which is the * pre-registered buffer. diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 55d2660..e2d23ea 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -520,6 +520,7 @@ void xprt_rdma_cleanup(void); #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); +int rpcrdma_bc_marshal_reply(struct rpc_rqst *); void xprt_rdma_bc_free_rqst(struct rpc_rqst *); void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -- cgit v1.1 From 63cae47005af51c937f4cdcc4835f29075add2ba Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:28:08 -0400 Subject: xprtrdma: Handle incoming backward direction RPC calls Introduce a code path in the rpcrdma_reply_handler() to catch incoming backward direction RPC calls and route them to the ULP's backchannel server. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/backchannel.c | 118 ++++++++++++++++++++++++++++++++++++++ net/sunrpc/xprtrdma/rpc_rdma.c | 41 +++++++++++++ net/sunrpc/xprtrdma/xprt_rdma.h | 2 + 3 files changed, 161 insertions(+) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index ffc4853..0b3387f 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -5,6 +5,8 @@ */ #include +#include +#include #include "xprt_rdma.h" @@ -12,6 +14,8 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif +#define RPCRDMA_BACKCHANNEL_DEBUG + static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) { @@ -253,3 +257,117 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); spin_unlock_bh(&xprt->bc_pa_lock); } + +/** + * rpcrdma_bc_receive_call - Handle a backward direction call + * @xprt: transport receiving the call + * @rep: receive buffer containing the call + * + * Called in the RPC reply handler, which runs in a tasklet. + * Be quick about it. + * + * Operational assumptions: + * o Backchannel credits are ignored, just as the NFS server + * forechannel currently does + * o The ULP manages a replay cache (eg, NFSv4.1 sessions). + * No replay detection is done at the transport level + */ +void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_rep *rep) +{ + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + struct rpcrdma_msg *headerp; + struct svc_serv *bc_serv; + struct rpcrdma_req *req; + struct rpc_rqst *rqst; + struct xdr_buf *buf; + size_t size; + __be32 *p; + + headerp = rdmab_to_msg(rep->rr_rdmabuf); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: callback XID %08x, length=%u\n", + __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len); + pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp); +#endif + + /* Sanity check: + * Need at least enough bytes for RPC/RDMA header, as code + * here references the header fields by array offset. Also, + * backward calls are always inline, so ensure there + * are some bytes beyond the RPC/RDMA header. + */ + if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24) + goto out_short; + p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN); + size = rep->rr_len - RPCRDMA_HDRLEN_MIN; + + /* Grab a free bc rqst */ + spin_lock(&xprt->bc_pa_lock); + if (list_empty(&xprt->bc_pa_list)) { + spin_unlock(&xprt->bc_pa_lock); + goto out_overflow; + } + rqst = list_first_entry(&xprt->bc_pa_list, + struct rpc_rqst, rq_bc_pa_list); + list_del(&rqst->rq_bc_pa_list); + spin_unlock(&xprt->bc_pa_lock); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: using rqst %p\n", __func__, rqst); +#endif + + /* Prepare rqst */ + rqst->rq_reply_bytes_recvd = 0; + rqst->rq_bytes_sent = 0; + rqst->rq_xid = headerp->rm_xid; + set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); + + buf = &rqst->rq_rcv_buf; + memset(buf, 0, sizeof(*buf)); + buf->head[0].iov_base = p; + buf->head[0].iov_len = size; + buf->len = size; + + /* The receive buffer has to be hooked to the rpcrdma_req + * so that it can be reposted after the server is done + * parsing it but just before sending the backward + * direction reply. + */ + req = rpcr_to_rdmar(rqst); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: attaching rep %p to req %p\n", + __func__, rep, req); +#endif + req->rl_reply = rep; + + /* Defeat the retransmit detection logic in send_request */ + req->rl_connect_cookie = 0; + + /* Queue rqst for ULP's callback service */ + bc_serv = xprt->bc_serv; + spin_lock(&bc_serv->sv_cb_lock); + list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list); + spin_unlock(&bc_serv->sv_cb_lock); + + wake_up(&bc_serv->sv_cb_waitq); + + r_xprt->rx_stats.bcall_count++; + return; + +out_overflow: + pr_warn("RPC/RDMA backchannel overflow\n"); + xprt_disconnect_done(xprt); + /* This receive buffer gets reposted automatically + * when the connection is re-established. + */ + return; + +out_short: + pr_warn("RPC/RDMA short backward direction call\n"); + + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) + xprt_disconnect_done(xprt); + else + pr_warn("RPC: %s: reposting rep %p\n", + __func__, rep); +} diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index b7a21e5..c10d969 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -716,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work) spin_unlock_bh(&xprt->transport_lock); } +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +/* By convention, backchannel calls arrive via rdma_msg type + * messages, and never populate the chunk lists. This makes + * the RPC/RDMA header small and fixed in size, so it is + * straightforward to check the RPC header's direction field. + */ +static bool +rpcrdma_is_bcall(struct rpcrdma_msg *headerp) +{ + __be32 *p = (__be32 *)headerp; + + if (headerp->rm_type != rdma_msg) + return false; + if (headerp->rm_body.rm_chunks[0] != xdr_zero) + return false; + if (headerp->rm_body.rm_chunks[1] != xdr_zero) + return false; + if (headerp->rm_body.rm_chunks[2] != xdr_zero) + return false; + + /* sanity */ + if (p[7] != headerp->rm_xid) + return false; + /* call direction */ + if (p[8] != cpu_to_be32(RPC_CALL)) + return false; + + return true; +} +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + /* * This function is called when an async event is posted to * the connection which changes the connection state. All it @@ -756,6 +787,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) headerp = rdmab_to_msg(rep->rr_rdmabuf); if (headerp->rm_vers != rpcrdma_version) goto out_badversion; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (rpcrdma_is_bcall(headerp)) + goto out_bcall; +#endif /* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks. @@ -878,6 +913,12 @@ out_badstatus: } return; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +out_bcall: + rpcrdma_bc_receive_call(r_xprt, rep); + return; +#endif + out_shortreply: dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index e2d23ea..eb87d96e8 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -353,6 +353,7 @@ struct rpcrdma_stats { unsigned long failed_marshal_count; unsigned long bad_reply_count; unsigned long nomsg_call_count; + unsigned long bcall_count; }; /* @@ -520,6 +521,7 @@ void xprt_rdma_cleanup(void); #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); +void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); int rpcrdma_bc_marshal_reply(struct rpc_rqst *); void xprt_rdma_bc_free_rqst(struct rpc_rqst *); void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); -- cgit v1.1 From 9468431962616c2449d47c482208a5967e011bf9 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:28:16 -0400 Subject: svcrdma: Add backward direction service for RPC/RDMA transport On NFSv4.1 mount points, the Linux NFS client uses this transport endpoint to receive backward direction calls and route replies back to the NFSv4.1 server. Signed-off-by: Chuck Lever Acked-by: "J. Bruce Fields" Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/svc_rdma.c | 6 ++++ net/sunrpc/xprtrdma/svc_rdma_transport.c | 58 ++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index 2cd252f..1b7051b 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -239,6 +239,9 @@ void svc_rdma_cleanup(void) unregister_sysctl_table(svcrdma_table_header); svcrdma_table_header = NULL; } +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + svc_unreg_xprt_class(&svc_rdma_bc_class); +#endif svc_unreg_xprt_class(&svc_rdma_class); kmem_cache_destroy(svc_rdma_map_cachep); kmem_cache_destroy(svc_rdma_ctxt_cachep); @@ -286,6 +289,9 @@ int svc_rdma_init(void) /* Register RDMA with the SVC transport switch */ svc_reg_xprt_class(&svc_rdma_class); +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + svc_reg_xprt_class(&svc_rdma_bc_class); +#endif return 0; err1: kmem_cache_destroy(svc_rdma_map_cachep); diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index fcc3eb8..a133b1e 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -56,6 +56,7 @@ #define RPCDBG_FACILITY RPCDBG_SVCXPRT +static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, struct net *net, struct sockaddr *sa, int salen, @@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = { .xcl_ident = XPRT_TRANSPORT_RDMA, }; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, + struct sockaddr *, int, int); +static void svc_rdma_bc_detach(struct svc_xprt *); +static void svc_rdma_bc_free(struct svc_xprt *); + +static struct svc_xprt_ops svc_rdma_bc_ops = { + .xpo_create = svc_rdma_bc_create, + .xpo_detach = svc_rdma_bc_detach, + .xpo_free = svc_rdma_bc_free, + .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, + .xpo_secure_port = svc_rdma_secure_port, +}; + +struct svc_xprt_class svc_rdma_bc_class = { + .xcl_name = "rdma-bc", + .xcl_owner = THIS_MODULE, + .xcl_ops = &svc_rdma_bc_ops, + .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) +}; + +static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, + struct net *net, + struct sockaddr *sa, int salen, + int flags) +{ + struct svcxprt_rdma *cma_xprt; + struct svc_xprt *xprt; + + cma_xprt = rdma_create_xprt(serv, 0); + if (!cma_xprt) + return ERR_PTR(-ENOMEM); + xprt = &cma_xprt->sc_xprt; + + svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); + serv->sv_bc_xprt = xprt; + + dprintk("svcrdma: %s(%p)\n", __func__, xprt); + return xprt; +} + +static void svc_rdma_bc_detach(struct svc_xprt *xprt) +{ + dprintk("svcrdma: %s(%p)\n", __func__, xprt); +} + +static void svc_rdma_bc_free(struct svc_xprt *xprt) +{ + struct svcxprt_rdma *rdma = + container_of(xprt, struct svcxprt_rdma, sc_xprt); + + dprintk("svcrdma: %s(%p)\n", __func__, xprt); + if (xprt) + kfree(rdma); +} +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) { struct svc_rdma_op_ctxt *ctxt; -- cgit v1.1 From 0f2e3bdab6590a5d8900e7d701e21ac9af19924c Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:28:24 -0400 Subject: SUNRPC: Remove the TCP-only restriction in bc_svc_process() Allow the use of other transport classes when handling a backward direction RPC call. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma Signed-off-by: Anna Schumaker --- net/sunrpc/svc.c | 5 ----- 1 file changed, 5 deletions(-) (limited to 'net') diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index a8f579d..bc5b7b5 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -1367,11 +1367,6 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, /* reset result send buffer "put" position */ resv->iov_len = 0; - if (rqstp->rq_prot != IPPROTO_TCP) { - printk(KERN_ERR "No support for Non-TCP transports!\n"); - BUG(); - } - /* * Skip the next two words because they've already been * processed in the transport -- cgit v1.1 From 76566773a1f1c2295ed901b6f1241cfe10d99029 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sat, 24 Oct 2015 17:28:32 -0400 Subject: NFS: Enable client side NFSv4.1 backchannel to use other transports Forechannel transports get their own "bc_up" method to create an endpoint for the backchannel service. Signed-off-by: Chuck Lever [Anna Schumaker: Add forward declaration of struct net to xprt.h] Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/backchannel.c | 21 +++++++++++++++++++++ net/sunrpc/xprtrdma/transport.c | 1 + net/sunrpc/xprtrdma/xprt_rdma.h | 1 + net/sunrpc/xprtsock.c | 12 ++++++++++++ 4 files changed, 35 insertions(+) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 0b3387f..2dcb44f 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -7,6 +7,7 @@ #include #include #include +#include #include "xprt_rdma.h" @@ -174,6 +175,26 @@ out_err: } /** + * xprt_rdma_bc_up - Create transport endpoint for backchannel service + * @serv: server endpoint + * @net: network namespace + * + * The "xprt" is an implied argument: it supplies the name of the + * backchannel transport class. + * + * Returns zero on success, negative errno on failure + */ +int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net) +{ + int ret; + + ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0); + if (ret < 0) + return ret; + return 0; +} + +/** * rpcrdma_bc_marshal_reply - Send backwards direction reply * @rqst: buffer containing RPC reply data * diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 845278e..8c545f7 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -708,6 +708,7 @@ static struct rpc_xprt_ops xprt_rdma_procs = { .inject_disconnect = xprt_rdma_inject_disconnect, #if defined(CONFIG_SUNRPC_BACKCHANNEL) .bc_setup = xprt_rdma_bc_setup, + .bc_up = xprt_rdma_bc_up, .bc_free_rqst = xprt_rdma_bc_free_rqst, .bc_destroy = xprt_rdma_bc_destroy, #endif diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index eb87d96e8..f8dd17b 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -520,6 +520,7 @@ void xprt_rdma_cleanup(void); */ #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); +int xprt_rdma_bc_up(struct svc_serv *, struct net *); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); int rpcrdma_bc_marshal_reply(struct rpc_rqst *); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 44a81e4..dc47067 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1306,6 +1306,17 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, xs_tcp_read_reply(xprt, desc) : xs_tcp_read_callback(xprt, desc); } + +static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) +{ + int ret; + + ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0, + SVC_SOCK_ANONYMOUS); + if (ret < 0) + return ret; + return 0; +} #else static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) @@ -2582,6 +2593,7 @@ static struct rpc_xprt_ops xs_tcp_ops = { .inject_disconnect = xs_inject_disconnect, #ifdef CONFIG_SUNRPC_BACKCHANNEL .bc_setup = xprt_setup_bc, + .bc_up = xs_tcp_bc_up, .bc_free_rqst = xprt_free_bc_rqst, .bc_destroy = xprt_destroy_bc, #endif -- cgit v1.1 From 7fc561362da38253522899ccab30c7955d4132df Mon Sep 17 00:00:00 2001 From: Andrzej Hajda Date: Thu, 24 Sep 2015 16:00:09 +0200 Subject: SUNRPC: fix variable type Due to incorrect len type bc_send_request returned always zero. The problem has been detected using proposed semantic patch scripts/coccinelle/tests/assign_signed_to_unsigned.cocci [1]. [1]: http://permalink.gmane.org/gmane.linux.kernel/2046107 Signed-off-by: Andrzej Hajda Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 94824ff..1d1a704 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -2570,7 +2570,7 @@ static int bc_send_request(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; struct svc_xprt *xprt; - u32 len; + int len; dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); /* -- cgit v1.1 From 941c3ff3102ccce440034d59cf9e4e9cc10b720d Mon Sep 17 00:00:00 2001 From: Kinglong Mee Date: Sat, 12 Sep 2015 09:37:18 +0800 Subject: Sunrpc: Supports hexadecimal number for sysctl files of sunrpc debug The sunrpc debug sysctl files only accept decimal number right now. But all the XXXDBUG_XXX macros are defined as hexadecimal. It is not easy to set or check an separate flag. This patch let those files support accepting hexadecimal number, (decimal number is also supported). Also, display it as hexadecimal. v2, Remove duplicate parsing of '0x...', just using simple_strtol(tmpbuf, &s, 0) Fix a bug of isspace() checking after parsing Signed-off-by: Kinglong Mee Signed-off-by: Trond Myklebust --- net/sunrpc/sysctl.c | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) (limited to 'net') diff --git a/net/sunrpc/sysctl.c b/net/sunrpc/sysctl.c index 887f018..c88d9bc 100644 --- a/net/sunrpc/sysctl.c +++ b/net/sunrpc/sysctl.c @@ -76,7 +76,7 @@ static int proc_dodebug(struct ctl_table *table, int write, void __user *buffer, size_t *lenp, loff_t *ppos) { - char tmpbuf[20], c, *s; + char tmpbuf[20], c, *s = NULL; char __user *p; unsigned int value; size_t left, len; @@ -103,23 +103,24 @@ proc_dodebug(struct ctl_table *table, int write, return -EFAULT; tmpbuf[left] = '\0'; - for (s = tmpbuf, value = 0; '0' <= *s && *s <= '9'; s++, left--) - value = 10 * value + (*s - '0'); - if (*s && !isspace(*s)) - return -EINVAL; - while (left && isspace(*s)) - left--, s++; + value = simple_strtol(tmpbuf, &s, 0); + if (s) { + left -= (s - tmpbuf); + if (left && !isspace(*s)) + return -EINVAL; + while (left && isspace(*s)) + left--, s++; + } else + left = 0; *(unsigned int *) table->data = value; /* Display the RPC tasks on writing to rpc_debug */ if (strcmp(table->procname, "rpc_debug") == 0) rpc_show_tasks(&init_net); } else { - if (!access_ok(VERIFY_WRITE, buffer, left)) - return -EFAULT; - len = sprintf(tmpbuf, "%d", *(unsigned int *) table->data); + len = sprintf(tmpbuf, "0x%04x", *(unsigned int *) table->data); if (len > left) len = left; - if (__copy_to_user(buffer, tmpbuf, len)) + if (copy_to_user(buffer, tmpbuf, len)) return -EFAULT; if ((left -= len) > 0) { if (put_user('\n', (char __user *)buffer + len)) -- cgit v1.1