diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/sunrpc/clnt.c | 15 | ||||
-rw-r--r-- | net/sunrpc/rpcb_clnt.c | 8 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 7 | ||||
-rw-r--r-- | net/sunrpc/stats.c | 26 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 38 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 108 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 182 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 411 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 112 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 238 |
10 files changed, 628 insertions, 517 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 05da12a..612aa73 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -286,10 +286,8 @@ static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt, static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename) { - clnt->cl_nodelen = strlen(nodename); - if (clnt->cl_nodelen > UNX_MAXNODENAME) - clnt->cl_nodelen = UNX_MAXNODENAME; - memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen); + clnt->cl_nodelen = strlcpy(clnt->cl_nodename, + nodename, sizeof(clnt->cl_nodename)); } static int rpc_client_register(struct rpc_clnt *clnt, @@ -365,6 +363,7 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, const struct rpc_version *version; struct rpc_clnt *clnt = NULL; const struct rpc_timeout *timeout; + const char *nodename = args->nodename; int err; /* sanity check the name before trying to print it */ @@ -420,8 +419,10 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, atomic_set(&clnt->cl_count, 1); + if (nodename == NULL) + nodename = utsname()->nodename; /* save the nodename */ - rpc_clnt_set_nodename(clnt, utsname()->nodename); + rpc_clnt_set_nodename(clnt, nodename); err = rpc_client_register(clnt, args->authflavor, args->client_name); if (err) @@ -576,6 +577,7 @@ static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args, if (xprt == NULL) goto out_err; args->servername = xprt->servername; + args->nodename = clnt->cl_nodename; new = rpc_new_client(args, xprt, clnt); if (IS_ERR(new)) { @@ -1824,6 +1826,7 @@ call_connect_status(struct rpc_task *task) case -ECONNABORTED: case -ENETUNREACH: case -EHOSTUNREACH: + case -EADDRINUSE: case -ENOBUFS: case -EPIPE: if (RPC_IS_SOFTCONN(task)) @@ -1932,6 +1935,7 @@ call_transmit_status(struct rpc_task *task) } case -ECONNRESET: case -ECONNABORTED: + case -EADDRINUSE: case -ENOTCONN: case -ENOBUFS: case -EPIPE: @@ -2051,6 +2055,7 @@ call_status(struct rpc_task *task) case -ECONNRESET: case -ECONNABORTED: rpc_force_rebind(clnt); + case -EADDRINUSE: case -ENOBUFS: rpc_delay(task, 3*HZ); case -EPIPE: diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c index 0520201..cf5770d 100644 --- a/net/sunrpc/rpcb_clnt.c +++ b/net/sunrpc/rpcb_clnt.c @@ -355,7 +355,8 @@ out: return result; } -static struct rpc_clnt *rpcb_create(struct net *net, const char *hostname, +static struct rpc_clnt *rpcb_create(struct net *net, const char *nodename, + const char *hostname, struct sockaddr *srvaddr, size_t salen, int proto, u32 version) { @@ -365,6 +366,7 @@ static struct rpc_clnt *rpcb_create(struct net *net, const char *hostname, .address = srvaddr, .addrsize = salen, .servername = hostname, + .nodename = nodename, .program = &rpcb_program, .version = version, .authflavor = RPC_AUTH_UNIX, @@ -740,7 +742,9 @@ void rpcb_getport_async(struct rpc_task *task) dprintk("RPC: %5u %s: trying rpcbind version %u\n", task->tk_pid, __func__, bind_version); - rpcb_clnt = rpcb_create(xprt->xprt_net, xprt->servername, sap, salen, + rpcb_clnt = rpcb_create(xprt->xprt_net, + clnt->cl_nodename, + xprt->servername, sap, salen, xprt->prot, bind_version); if (IS_ERR(rpcb_clnt)) { status = PTR_ERR(rpcb_clnt); diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index d20f232..b91fd9c 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -844,10 +844,10 @@ static void rpc_async_schedule(struct work_struct *work) void *rpc_malloc(struct rpc_task *task, size_t size) { struct rpc_buffer *buf; - gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN; + gfp_t gfp = GFP_NOIO | __GFP_NOWARN; if (RPC_IS_SWAPPER(task)) - gfp |= __GFP_MEMALLOC; + gfp = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; size += sizeof(struct rpc_buffer); if (size <= RPC_BUFFER_MAXSIZE) @@ -1069,7 +1069,8 @@ static int rpciod_start(void) * Create the rpciod thread and wait for it to start. */ dprintk("RPC: creating workqueue rpciod\n"); - wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 1); + /* Note: highpri because network receive is latency sensitive */ + wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); rpciod_workqueue = wq; return rpciod_workqueue != NULL; } diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c index 9711a15..2ecb994 100644 --- a/net/sunrpc/stats.c +++ b/net/sunrpc/stats.c @@ -140,22 +140,20 @@ void rpc_free_iostats(struct rpc_iostats *stats) EXPORT_SYMBOL_GPL(rpc_free_iostats); /** - * rpc_count_iostats - tally up per-task stats + * rpc_count_iostats_metrics - tally up per-task stats * @task: completed rpc_task - * @stats: array of stat structures + * @op_metrics: stat structure for OP that will accumulate stats from @task */ -void rpc_count_iostats(const struct rpc_task *task, struct rpc_iostats *stats) +void rpc_count_iostats_metrics(const struct rpc_task *task, + struct rpc_iostats *op_metrics) { struct rpc_rqst *req = task->tk_rqstp; - struct rpc_iostats *op_metrics; ktime_t delta, now; - if (!stats || !req) + if (!op_metrics || !req) return; now = ktime_get(); - op_metrics = &stats[task->tk_msg.rpc_proc->p_statidx]; - spin_lock(&op_metrics->om_lock); op_metrics->om_ops++; @@ -175,6 +173,20 @@ void rpc_count_iostats(const struct rpc_task *task, struct rpc_iostats *stats) spin_unlock(&op_metrics->om_lock); } +EXPORT_SYMBOL_GPL(rpc_count_iostats_metrics); + +/** + * rpc_count_iostats - tally up per-task stats + * @task: completed rpc_task + * @stats: array of stat structures + * + * Uses the statidx from @task + */ +void rpc_count_iostats(const struct rpc_task *task, struct rpc_iostats *stats) +{ + rpc_count_iostats_metrics(task, + &stats[task->tk_msg.rpc_proc->p_statidx]); +} EXPORT_SYMBOL_GPL(rpc_count_iostats); static void _print_name(struct seq_file *seq, unsigned int op, diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index ebbefad..e3015ae 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -683,13 +683,43 @@ xprt_init_autodisconnect(unsigned long data) if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) goto out_abort; spin_unlock(&xprt->transport_lock); - set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); queue_work(rpciod_workqueue, &xprt->task_cleanup); return; out_abort: spin_unlock(&xprt->transport_lock); } +bool xprt_lock_connect(struct rpc_xprt *xprt, + struct rpc_task *task, + void *cookie) +{ + bool ret = false; + + spin_lock_bh(&xprt->transport_lock); + if (!test_bit(XPRT_LOCKED, &xprt->state)) + goto out; + if (xprt->snd_task != task) + goto out; + xprt->snd_task = cookie; + ret = true; +out: + spin_unlock_bh(&xprt->transport_lock); + return ret; +} + +void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) +{ + spin_lock_bh(&xprt->transport_lock); + if (xprt->snd_task != cookie) + goto out; + if (!test_bit(XPRT_LOCKED, &xprt->state)) + goto out; + xprt->snd_task =NULL; + xprt->ops->release_xprt(xprt, NULL); +out: + spin_unlock_bh(&xprt->transport_lock); +} + /** * xprt_connect - schedule a transport connect operation * @task: RPC task that is requesting the connect @@ -712,9 +742,7 @@ void xprt_connect(struct rpc_task *task) if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) xprt->ops->close(xprt); - if (xprt_connected(xprt)) - xprt_release_write(xprt, task); - else { + if (!xprt_connected(xprt)) { task->tk_rqstp->rq_bytes_sent = 0; task->tk_timeout = task->tk_rqstp->rq_timeout; rpc_sleep_on(&xprt->pending, task, xprt_connect_status); @@ -726,6 +754,7 @@ void xprt_connect(struct rpc_task *task) xprt->stat.connect_start = jiffies; xprt->ops->connect(xprt, task); } + xprt_release_write(xprt, task); } static void xprt_connect_status(struct rpc_task *task) @@ -758,7 +787,6 @@ static void xprt_connect_status(struct rpc_task *task) dprintk("RPC: %5u xprt_connect_status: error %d connecting to " "server %s\n", task->tk_pid, -task->tk_status, xprt->servername); - xprt_release_write(xprt, task); task->tk_status = -EIO; } } diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index df01d12..7e9acd9 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -209,9 +209,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, if (cur_rchunk) { /* read */ cur_rchunk->rc_discrim = xdr_one; /* all read chunks have the same "position" */ - cur_rchunk->rc_position = htonl(pos); - cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey); - cur_rchunk->rc_target.rs_length = htonl(seg->mr_len); + cur_rchunk->rc_position = cpu_to_be32(pos); + cur_rchunk->rc_target.rs_handle = + cpu_to_be32(seg->mr_rkey); + cur_rchunk->rc_target.rs_length = + cpu_to_be32(seg->mr_len); xdr_encode_hyper( (__be32 *)&cur_rchunk->rc_target.rs_offset, seg->mr_base); @@ -222,8 +224,10 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, cur_rchunk++; r_xprt->rx_stats.read_chunk_count++; } else { /* write/reply */ - cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey); - cur_wchunk->wc_target.rs_length = htonl(seg->mr_len); + cur_wchunk->wc_target.rs_handle = + cpu_to_be32(seg->mr_rkey); + cur_wchunk->wc_target.rs_length = + cpu_to_be32(seg->mr_len); xdr_encode_hyper( (__be32 *)&cur_wchunk->wc_target.rs_offset, seg->mr_base); @@ -257,7 +261,7 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, *iptr++ = xdr_zero; /* encode a NULL reply chunk */ } else { warray->wc_discrim = xdr_one; - warray->wc_nchunks = htonl(nchunks); + warray->wc_nchunks = cpu_to_be32(nchunks); iptr = (__be32 *) cur_wchunk; if (type == rpcrdma_writech) { *iptr++ = xdr_zero; /* finish the write chunk list */ @@ -290,7 +294,7 @@ ssize_t rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result) { struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base; + struct rpcrdma_msg *headerp = rdmab_to_msg(req->rl_rdmabuf); if (req->rl_rtype != rpcrdma_noch) result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, @@ -402,13 +406,12 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) base = rqst->rq_svec[0].iov_base; rpclen = rqst->rq_svec[0].iov_len; - /* build RDMA header in private area at front */ - headerp = (struct rpcrdma_msg *) req->rl_base; - /* don't htonl XID, it's already done in request */ + headerp = rdmab_to_msg(req->rl_rdmabuf); + /* don't byte-swap XID, it's already done in request */ headerp->rm_xid = rqst->rq_xid; - headerp->rm_vers = xdr_one; - headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests); - headerp->rm_type = htonl(RDMA_MSG); + headerp->rm_vers = rpcrdma_version; + headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); + headerp->rm_type = rdma_msg; /* * Chunks needed for results? @@ -468,7 +471,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) return -EIO; } - hdrlen = 28; /*sizeof *headerp;*/ + hdrlen = RPCRDMA_HDRLEN_MIN; padlen = 0; /* @@ -482,11 +485,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) RPCRDMA_INLINE_PAD_VALUE(rqst)); if (padlen) { - headerp->rm_type = htonl(RDMA_MSGP); + headerp->rm_type = rdma_msgp; headerp->rm_body.rm_padded.rm_align = - htonl(RPCRDMA_INLINE_PAD_VALUE(rqst)); + cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst)); headerp->rm_body.rm_padded.rm_thresh = - htonl(RPCRDMA_INLINE_PAD_THRESH); + cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH); headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero; headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero; headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero; @@ -524,7 +527,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd" " headerp 0x%p base 0x%p lkey 0x%x\n", __func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen, - headerp, base, req->rl_iov.lkey); + headerp, base, rdmab_lkey(req->rl_rdmabuf)); /* * initialize send_iov's - normally only two: rdma chunk header and @@ -533,26 +536,26 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) * header and any write data. In all non-rdma cases, any following * data has been copied into the RPC header buffer. */ - req->rl_send_iov[0].addr = req->rl_iov.addr; + req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); req->rl_send_iov[0].length = hdrlen; - req->rl_send_iov[0].lkey = req->rl_iov.lkey; + req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); - req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base); + req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); req->rl_send_iov[1].length = rpclen; - req->rl_send_iov[1].lkey = req->rl_iov.lkey; + req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); req->rl_niovs = 2; if (padlen) { struct rpcrdma_ep *ep = &r_xprt->rx_ep; - req->rl_send_iov[2].addr = ep->rep_pad.addr; + req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf); req->rl_send_iov[2].length = padlen; - req->rl_send_iov[2].lkey = ep->rep_pad.lkey; + req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf); req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen; req->rl_send_iov[3].length = rqst->rq_slen - rpclen; - req->rl_send_iov[3].lkey = req->rl_iov.lkey; + req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf); req->rl_niovs = 4; } @@ -569,8 +572,9 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b { unsigned int i, total_len; struct rpcrdma_write_chunk *cur_wchunk; + char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf); - i = ntohl(**iptrp); /* get array count */ + i = be32_to_cpu(**iptrp); if (i > max) return -1; cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1); @@ -582,11 +586,11 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b xdr_decode_hyper((__be32 *)&seg->rs_offset, &off); dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n", __func__, - ntohl(seg->rs_length), + be32_to_cpu(seg->rs_length), (unsigned long long)off, - ntohl(seg->rs_handle)); + be32_to_cpu(seg->rs_handle)); } - total_len += ntohl(seg->rs_length); + total_len += be32_to_cpu(seg->rs_length); ++cur_wchunk; } /* check and adjust for properly terminated write chunk */ @@ -596,7 +600,7 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b return -1; cur_wchunk = (struct rpcrdma_write_chunk *) w; } - if ((char *) cur_wchunk > rep->rr_base + rep->rr_len) + if ((char *)cur_wchunk > base + rep->rr_len) return -1; *iptrp = (__be32 *) cur_wchunk; @@ -691,7 +695,9 @@ rpcrdma_connect_worker(struct work_struct *work) { struct rpcrdma_ep *ep = container_of(work, struct rpcrdma_ep, rep_connect_worker.work); - struct rpc_xprt *xprt = ep->rep_xprt; + struct rpcrdma_xprt *r_xprt = + container_of(ep, struct rpcrdma_xprt, rx_ep); + struct rpc_xprt *xprt = &r_xprt->rx_xprt; spin_lock_bh(&xprt->transport_lock); if (++xprt->connect_cookie == 0) /* maintain a reserved value */ @@ -732,7 +738,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) struct rpc_xprt *xprt = rep->rr_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); __be32 *iptr; - int rdmalen, status; + int credits, rdmalen, status; unsigned long cwnd; /* Check status. If bad, signal disconnect and return rep to pool */ @@ -744,14 +750,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) } return; } - if (rep->rr_len < 28) { + if (rep->rr_len < RPCRDMA_HDRLEN_MIN) { dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; } - headerp = (struct rpcrdma_msg *) rep->rr_base; - if (headerp->rm_vers != xdr_one) { + headerp = rdmab_to_msg(rep->rr_rdmabuf); + if (headerp->rm_vers != rpcrdma_version) { dprintk("RPC: %s: invalid version %d\n", - __func__, ntohl(headerp->rm_vers)); + __func__, be32_to_cpu(headerp->rm_vers)); goto repost; } @@ -762,7 +768,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) spin_unlock(&xprt->transport_lock); dprintk("RPC: %s: reply 0x%p failed " "to match any request xid 0x%08x len %d\n", - __func__, rep, headerp->rm_xid, rep->rr_len); + __func__, rep, be32_to_cpu(headerp->rm_xid), + rep->rr_len); repost: r_xprt->rx_stats.bad_reply_count++; rep->rr_func = rpcrdma_reply_handler; @@ -778,13 +785,14 @@ repost: spin_unlock(&xprt->transport_lock); dprintk("RPC: %s: duplicate reply 0x%p to RPC " "request 0x%p: xid 0x%08x\n", __func__, rep, req, - headerp->rm_xid); + be32_to_cpu(headerp->rm_xid)); goto repost; } dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" " RPC request 0x%p xid 0x%08x\n", - __func__, rep, req, rqst, headerp->rm_xid); + __func__, rep, req, rqst, + be32_to_cpu(headerp->rm_xid)); /* from here on, the reply is no longer an orphan */ req->rl_reply = rep; @@ -793,7 +801,7 @@ repost: /* check for expected message types */ /* The order of some of these tests is important. */ switch (headerp->rm_type) { - case htonl(RDMA_MSG): + case rdma_msg: /* never expect read chunks */ /* never expect reply chunks (two ways to check) */ /* never expect write chunks without having offered RDMA */ @@ -824,22 +832,24 @@ repost: } else { /* else ordinary inline */ rdmalen = 0; - iptr = (__be32 *)((unsigned char *)headerp + 28); - rep->rr_len -= 28; /*sizeof *headerp;*/ + iptr = (__be32 *)((unsigned char *)headerp + + RPCRDMA_HDRLEN_MIN); + rep->rr_len -= RPCRDMA_HDRLEN_MIN; status = rep->rr_len; } /* Fix up the rpc results for upper layer */ rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen); break; - case htonl(RDMA_NOMSG): + case rdma_nomsg: /* never expect read or write chunks, always reply chunks */ if (headerp->rm_body.rm_chunks[0] != xdr_zero || headerp->rm_body.rm_chunks[1] != xdr_zero || headerp->rm_body.rm_chunks[2] != xdr_one || req->rl_nchunks == 0) goto badheader; - iptr = (__be32 *)((unsigned char *)headerp + 28); + iptr = (__be32 *)((unsigned char *)headerp + + RPCRDMA_HDRLEN_MIN); rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr); if (rdmalen < 0) goto badheader; @@ -853,7 +863,7 @@ badheader: dprintk("%s: invalid rpcrdma reply header (type %d):" " chunks[012] == %d %d %d" " expected chunks <= %d\n", - __func__, ntohl(headerp->rm_type), + __func__, be32_to_cpu(headerp->rm_type), headerp->rm_body.rm_chunks[0], headerp->rm_body.rm_chunks[1], headerp->rm_body.rm_chunks[2], @@ -863,8 +873,14 @@ badheader: break; } + credits = be32_to_cpu(headerp->rm_credit); + if (credits == 0) + credits = 1; /* don't deadlock */ + else if (credits > r_xprt->rx_buf.rb_max_requests) + credits = r_xprt->rx_buf.rb_max_requests; + cwnd = xprt->cwnd; - xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; + xprt->cwnd = credits << RPC_CWNDSHIFT; if (xprt->cwnd > cwnd) xprt_release_rqst_cong(rqst->rq_task); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index bbd6155..2e192ba 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -200,9 +200,9 @@ xprt_rdma_free_addresses(struct rpc_xprt *xprt) static void xprt_rdma_connect_worker(struct work_struct *work) { - struct rpcrdma_xprt *r_xprt = - container_of(work, struct rpcrdma_xprt, rdma_connect.work); - struct rpc_xprt *xprt = &r_xprt->xprt; + struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt, + rx_connect_worker.work); + struct rpc_xprt *xprt = &r_xprt->rx_xprt; int rc = 0; xprt_clear_connected(xprt); @@ -235,7 +235,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt) dprintk("RPC: %s: called\n", __func__); - cancel_delayed_work_sync(&r_xprt->rdma_connect); + cancel_delayed_work_sync(&r_xprt->rx_connect_worker); xprt_clear_connected(xprt); @@ -364,8 +364,7 @@ xprt_setup_rdma(struct xprt_create *args) * any inline data. Also specify any padding which will be provided * from a preregistered zero buffer. */ - rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia, - &new_xprt->rx_data); + rc = rpcrdma_buffer_create(new_xprt); if (rc) goto out3; @@ -374,9 +373,8 @@ xprt_setup_rdma(struct xprt_create *args) * connection loss notification is async. We also catch connection loss * when reaping receives. */ - INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker); - new_ep->rep_func = rpcrdma_conn_func; - new_ep->rep_xprt = xprt; + INIT_DELAYED_WORK(&new_xprt->rx_connect_worker, + xprt_rdma_connect_worker); xprt_rdma_format_addresses(xprt); xprt->max_payload = rpcrdma_max_payload(new_xprt); @@ -434,94 +432,101 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) if (r_xprt->rx_ep.rep_connected != 0) { /* Reconnect */ - schedule_delayed_work(&r_xprt->rdma_connect, - xprt->reestablish_timeout); + schedule_delayed_work(&r_xprt->rx_connect_worker, + xprt->reestablish_timeout); xprt->reestablish_timeout <<= 1; if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO) xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO; else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; } else { - schedule_delayed_work(&r_xprt->rdma_connect, 0); + schedule_delayed_work(&r_xprt->rx_connect_worker, 0); if (!RPC_IS_ASYNC(task)) - flush_delayed_work(&r_xprt->rdma_connect); + flush_delayed_work(&r_xprt->rx_connect_worker); } } /* * The RDMA allocate/free functions need the task structure as a place * to hide the struct rpcrdma_req, which is necessary for the actual send/recv - * sequence. For this reason, the recv buffers are attached to send - * buffers for portions of the RPC. Note that the RPC layer allocates - * both send and receive buffers in the same call. We may register - * the receive buffer portion when using reply chunks. + * sequence. + * + * The RPC layer allocates both send and receive buffers in the same call + * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer). + * We may register rq_rcv_buf when using reply chunks. */ static void * xprt_rdma_allocate(struct rpc_task *task, size_t size) { struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - struct rpcrdma_req *req, *nreq; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_regbuf *rb; + struct rpcrdma_req *req; + size_t min_size; + gfp_t flags; - req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf); + req = rpcrdma_buffer_get(&r_xprt->rx_buf); if (req == NULL) return NULL; - if (size > req->rl_size) { - dprintk("RPC: %s: size %zd too large for buffer[%zd]: " - "prog %d vers %d proc %d\n", - __func__, size, req->rl_size, - task->tk_client->cl_prog, task->tk_client->cl_vers, - task->tk_msg.rpc_proc->p_proc); - /* - * Outgoing length shortage. Our inline write max must have - * been configured to perform direct i/o. - * - * This is therefore a large metadata operation, and the - * allocate call was made on the maximum possible message, - * e.g. containing long filename(s) or symlink data. In - * fact, while these metadata operations *might* carry - * large outgoing payloads, they rarely *do*. However, we - * have to commit to the request here, so reallocate and - * register it now. The data path will never require this - * reallocation. - * - * If the allocation or registration fails, the RPC framework - * will (doggedly) retry. - */ - if (task->tk_flags & RPC_TASK_SWAPPER) - nreq = kmalloc(sizeof *req + size, GFP_ATOMIC); - else - nreq = kmalloc(sizeof *req + size, GFP_NOFS); - if (nreq == NULL) - goto outfail; - - if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia, - nreq->rl_base, size + sizeof(struct rpcrdma_req) - - offsetof(struct rpcrdma_req, rl_base), - &nreq->rl_handle, &nreq->rl_iov)) { - kfree(nreq); - goto outfail; - } - rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size; - nreq->rl_size = size; - nreq->rl_niovs = 0; - nreq->rl_nchunks = 0; - nreq->rl_buffer = (struct rpcrdma_buffer *)req; - nreq->rl_reply = req->rl_reply; - memcpy(nreq->rl_segments, - req->rl_segments, sizeof nreq->rl_segments); - /* flag the swap with an unused field */ - nreq->rl_iov.length = 0; - req->rl_reply = NULL; - req = nreq; - } + flags = GFP_NOIO | __GFP_NOWARN; + if (RPC_IS_SWAPPER(task)) + flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; + + if (req->rl_rdmabuf == NULL) + goto out_rdmabuf; + if (req->rl_sendbuf == NULL) + goto out_sendbuf; + if (size > req->rl_sendbuf->rg_size) + goto out_sendbuf; + +out: dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); req->rl_connect_cookie = 0; /* our reserved value */ - return req->rl_xdr_buf; - -outfail: + return req->rl_sendbuf->rg_base; + +out_rdmabuf: + min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp); + rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags); + if (IS_ERR(rb)) + goto out_fail; + req->rl_rdmabuf = rb; + +out_sendbuf: + /* XDR encoding and RPC/RDMA marshaling of this request has not + * yet occurred. Thus a lower bound is needed to prevent buffer + * overrun during marshaling. + * + * RPC/RDMA marshaling may choose to send payload bearing ops + * inline, if the result is smaller than the inline threshold. + * The value of the "size" argument accounts for header + * requirements but not for the payload in these cases. + * + * Likewise, allocate enough space to receive a reply up to the + * size of the inline threshold. + * + * It's unlikely that both the send header and the received + * reply will be large, but slush is provided here to allow + * flexibility when marshaling. + */ + min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp); + min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp); + if (size < min_size) + size = min_size; + + rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags); + if (IS_ERR(rb)) + goto out_fail; + rb->rg_owner = req; + + r_xprt->rx_stats.hardway_register_count += size; + rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf); + req->rl_sendbuf = rb; + goto out; + +out_fail: rpcrdma_buffer_put(req); - rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; + r_xprt->rx_stats.failed_marshal_count++; return NULL; } @@ -533,47 +538,24 @@ xprt_rdma_free(void *buffer) { struct rpcrdma_req *req; struct rpcrdma_xprt *r_xprt; - struct rpcrdma_rep *rep; + struct rpcrdma_regbuf *rb; int i; if (buffer == NULL) return; - req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]); - if (req->rl_iov.length == 0) { /* see allocate above */ - r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer, - struct rpcrdma_xprt, rx_buf); - } else - r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf); - rep = req->rl_reply; + rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]); + req = rb->rg_owner; + r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf); - dprintk("RPC: %s: called on 0x%p%s\n", - __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : ""); + dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); - /* - * Finish the deregistration. The process is considered - * complete when the rr_func vector becomes NULL - this - * was put in place during rpcrdma_reply_handler() - the wait - * call below will not block if the dereg is "done". If - * interrupted, our framework will clean up. - */ for (i = 0; req->rl_nchunks;) { --req->rl_nchunks; i += rpcrdma_deregister_external( &req->rl_segments[i], r_xprt); } - if (req->rl_iov.length == 0) { /* see allocate above */ - struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer; - oreq->rl_reply = req->rl_reply; - (void) rpcrdma_deregister_internal(&r_xprt->rx_ia, - req->rl_handle, - &req->rl_iov); - kfree(req); - req = oreq; - } - - /* Put back request+reply buffers */ rpcrdma_buffer_put(req); } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index c98e406..124676c 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -49,6 +49,7 @@ #include <linux/interrupt.h> #include <linux/slab.h> +#include <linux/prefetch.h> #include <asm/bitops.h> #include "xprt_rdma.h" @@ -153,7 +154,7 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) event->device->name, context); if (ep->rep_connected == 1) { ep->rep_connected = -EIO; - ep->rep_func(ep); + rpcrdma_conn_func(ep); wake_up_all(&ep->rep_connect_wait); } } @@ -168,23 +169,59 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context) event->device->name, context); if (ep->rep_connected == 1) { ep->rep_connected = -EIO; - ep->rep_func(ep); + rpcrdma_conn_func(ep); wake_up_all(&ep->rep_connect_wait); } } +static const char * const wc_status[] = { + "success", + "local length error", + "local QP operation error", + "local EE context operation error", + "local protection error", + "WR flushed", + "memory management operation error", + "bad response error", + "local access error", + "remote invalid request error", + "remote access error", + "remote operation error", + "transport retry counter exceeded", + "RNR retrycounter exceeded", + "local RDD violation error", + "remove invalid RD request", + "operation aborted", + "invalid EE context number", + "invalid EE context state", + "fatal error", + "response timeout error", + "general error", +}; + +#define COMPLETION_MSG(status) \ + ((status) < ARRAY_SIZE(wc_status) ? \ + wc_status[(status)] : "unexpected completion error") + static void rpcrdma_sendcq_process_wc(struct ib_wc *wc) { - struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; + if (likely(wc->status == IB_WC_SUCCESS)) + return; - dprintk("RPC: %s: frmr %p status %X opcode %d\n", - __func__, frmr, wc->status, wc->opcode); + /* WARNING: Only wr_id and status are reliable at this point */ + if (wc->wr_id == 0ULL) { + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("RPC: %s: SEND: %s\n", + __func__, COMPLETION_MSG(wc->status)); + } else { + struct rpcrdma_mw *r; - if (wc->wr_id == 0ULL) - return; - if (wc->status != IB_WC_SUCCESS) - frmr->r.frmr.fr_state = FRMR_IS_STALE; + r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; + r->r.frmr.fr_state = FRMR_IS_STALE; + pr_err("RPC: %s: frmr %p (stale): %s\n", + __func__, r, COMPLETION_MSG(wc->status)); + } } static int @@ -248,33 +285,32 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) struct rpcrdma_rep *rep = (struct rpcrdma_rep *)(unsigned long)wc->wr_id; - dprintk("RPC: %s: rep %p status %X opcode %X length %u\n", - __func__, rep, wc->status, wc->opcode, wc->byte_len); + /* WARNING: Only wr_id and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS) + goto out_fail; - if (wc->status != IB_WC_SUCCESS) { - rep->rr_len = ~0U; - goto out_schedule; - } + /* status == SUCCESS means all fields in wc are trustworthy */ if (wc->opcode != IB_WC_RECV) return; + dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", + __func__, rep, wc->byte_len); + rep->rr_len = wc->byte_len; ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device, - rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE); - - if (rep->rr_len >= 16) { - struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base; - unsigned int credits = ntohl(p->rm_credit); - - if (credits == 0) - credits = 1; /* don't deadlock */ - else if (credits > rep->rr_buffer->rb_max_requests) - credits = rep->rr_buffer->rb_max_requests; - atomic_set(&rep->rr_buffer->rb_credits, credits); - } + rdmab_addr(rep->rr_rdmabuf), + rep->rr_len, DMA_FROM_DEVICE); + prefetch(rdmab_to_msg(rep->rr_rdmabuf)); out_schedule: list_add_tail(&rep->rr_list, sched_list); + return; +out_fail: + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("RPC: %s: rep %p: %s\n", + __func__, rep, COMPLETION_MSG(wc->status)); + rep->rr_len = ~0U; + goto out_schedule; } static int @@ -390,8 +426,8 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr; #endif - struct ib_qp_attr attr; - struct ib_qp_init_attr iattr; + struct ib_qp_attr *attr = &ia->ri_qp_attr; + struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; int connstate = 0; switch (event->event) { @@ -414,12 +450,13 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) break; case RDMA_CM_EVENT_ESTABLISHED: connstate = 1; - ib_query_qp(ia->ri_id->qp, &attr, - IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, - &iattr); + ib_query_qp(ia->ri_id->qp, attr, + IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, + iattr); dprintk("RPC: %s: %d responder resources" " (%d initiator)\n", - __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic); + __func__, attr->max_dest_rd_atomic, + attr->max_rd_atomic); goto connected; case RDMA_CM_EVENT_CONNECT_ERROR: connstate = -ENOTCONN; @@ -436,11 +473,10 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) case RDMA_CM_EVENT_DEVICE_REMOVAL: connstate = -ENODEV; connected: - atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1); dprintk("RPC: %s: %sconnected\n", __func__, connstate > 0 ? "" : "dis"); ep->rep_connected = connstate; - ep->rep_func(ep); + rpcrdma_conn_func(ep); wake_up_all(&ep->rep_connect_wait); /*FALLTHROUGH*/ default: @@ -453,7 +489,7 @@ connected: #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) if (connstate == 1) { - int ird = attr.max_dest_rd_atomic; + int ird = attr->max_dest_rd_atomic; int tird = ep->rep_remote_cma.responder_resources; printk(KERN_INFO "rpcrdma: connection to %pI4:%u " "on %s, memreg %d slots %d ird %d%s\n", @@ -554,8 +590,8 @@ int rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) { int rc, mem_priv; - struct ib_device_attr devattr; struct rpcrdma_ia *ia = &xprt->rx_ia; + struct ib_device_attr *devattr = &ia->ri_devattr; ia->ri_id = rpcrdma_create_id(xprt, ia, addr); if (IS_ERR(ia->ri_id)) { @@ -571,26 +607,21 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) goto out2; } - /* - * Query the device to determine if the requested memory - * registration strategy is supported. If it isn't, set the - * strategy to a globally supported model. - */ - rc = ib_query_device(ia->ri_id->device, &devattr); + rc = ib_query_device(ia->ri_id->device, devattr); if (rc) { dprintk("RPC: %s: ib_query_device failed %d\n", __func__, rc); - goto out2; + goto out3; } - if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) { + if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) { ia->ri_have_dma_lkey = 1; ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey; } if (memreg == RPCRDMA_FRMR) { /* Requires both frmr reg and local dma lkey */ - if ((devattr.device_cap_flags & + if ((devattr->device_cap_flags & (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) != (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) { dprintk("RPC: %s: FRMR registration " @@ -600,7 +631,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) /* Mind the ia limit on FRMR page list depth */ ia->ri_max_frmr_depth = min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - devattr.max_fast_reg_page_list_len); + devattr->max_fast_reg_page_list_len); } } if (memreg == RPCRDMA_MTHCAFMR) { @@ -638,14 +669,14 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) "phys register failed with %lX\n", __func__, PTR_ERR(ia->ri_bind_mem)); rc = -ENOMEM; - goto out2; + goto out3; } break; default: printk(KERN_ERR "RPC: Unsupported memory " "registration mode: %d\n", memreg); rc = -ENOMEM; - goto out2; + goto out3; } dprintk("RPC: %s: memory registration strategy is %d\n", __func__, memreg); @@ -655,6 +686,10 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) rwlock_init(&ia->ri_qplock); return 0; + +out3: + ib_dealloc_pd(ia->ri_pd); + ia->ri_pd = NULL; out2: rdma_destroy_id(ia->ri_id); ia->ri_id = NULL; @@ -698,20 +733,13 @@ int rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) { - struct ib_device_attr devattr; + struct ib_device_attr *devattr = &ia->ri_devattr; struct ib_cq *sendcq, *recvcq; int rc, err; - rc = ib_query_device(ia->ri_id->device, &devattr); - if (rc) { - dprintk("RPC: %s: ib_query_device failed %d\n", - __func__, rc); - return rc; - } - /* check provider's send/recv wr limits */ - if (cdata->max_requests > devattr.max_qp_wr) - cdata->max_requests = devattr.max_qp_wr; + if (cdata->max_requests > devattr->max_qp_wr) + cdata->max_requests = devattr->max_qp_wr; ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; ep->rep_attr.qp_context = ep; @@ -746,8 +774,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, } ep->rep_attr.cap.max_send_wr *= depth; - if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) { - cdata->max_requests = devattr.max_qp_wr / depth; + if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) { + cdata->max_requests = devattr->max_qp_wr / depth; if (!cdata->max_requests) return -EINVAL; ep->rep_attr.cap.max_send_wr = cdata->max_requests * @@ -766,6 +794,14 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.qp_type = IB_QPT_RC; ep->rep_attr.port_num = ~0; + if (cdata->padding) { + ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding, + GFP_KERNEL); + if (IS_ERR(ep->rep_padbuf)) + return PTR_ERR(ep->rep_padbuf); + } else + ep->rep_padbuf = NULL; + dprintk("RPC: %s: requested max: dtos: send %d recv %d; " "iovs: send %d recv %d\n", __func__, @@ -781,7 +817,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, else if (ep->rep_cqinit <= 2) ep->rep_cqinit = 0; INIT_CQCOUNT(ep); - ep->rep_ia = ia; init_waitqueue_head(&ep->rep_connect_wait); INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); @@ -831,10 +866,11 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, /* Client offers RDMA Read but does not initiate */ ep->rep_remote_cma.initiator_depth = 0; - if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ + if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */ ep->rep_remote_cma.responder_resources = 32; else - ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom; + ep->rep_remote_cma.responder_resources = + devattr->max_qp_rd_atom; ep->rep_remote_cma.retry_count = 7; ep->rep_remote_cma.flow_control = 0; @@ -848,6 +884,7 @@ out2: dprintk("RPC: %s: ib_destroy_cq returned %i\n", __func__, err); out1: + rpcrdma_free_regbuf(ia, ep->rep_padbuf); return rc; } @@ -874,11 +911,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ia->ri_id->qp = NULL; } - /* padding - could be done in rpcrdma_buffer_destroy... */ - if (ep->rep_pad_mr) { - rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad); - ep->rep_pad_mr = NULL; - } + rpcrdma_free_regbuf(ia, ep->rep_padbuf); rpcrdma_clean_cq(ep->rep_attr.recv_cq); rc = ib_destroy_cq(ep->rep_attr.recv_cq); @@ -1048,6 +1081,48 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) } } +static struct rpcrdma_req * +rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_req *req; + + req = kzalloc(sizeof(*req), GFP_KERNEL); + if (req == NULL) + return ERR_PTR(-ENOMEM); + + req->rl_buffer = &r_xprt->rx_buf; + return req; +} + +static struct rpcrdma_rep * +rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_rep *rep; + int rc; + + rc = -ENOMEM; + rep = kzalloc(sizeof(*rep), GFP_KERNEL); + if (rep == NULL) + goto out; + + rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize, + GFP_KERNEL); + if (IS_ERR(rep->rr_rdmabuf)) { + rc = PTR_ERR(rep->rr_rdmabuf); + goto out_free; + } + + rep->rr_buffer = &r_xprt->rx_buf; + return rep; + +out_free: + kfree(rep); +out: + return ERR_PTR(rc); +} + static int rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf) { @@ -1134,27 +1209,26 @@ out_free: } int -rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, - struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) +rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) { + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; char *p; - size_t len, rlen, wlen; + size_t len; int i, rc; buf->rb_max_requests = cdata->max_requests; spin_lock_init(&buf->rb_lock); - atomic_set(&buf->rb_credits, 1); /* Need to allocate: * 1. arrays for send and recv pointers * 2. arrays of struct rpcrdma_req to fill in pointers * 3. array of struct rpcrdma_rep for replies - * 4. padding, if any * Send/recv buffers in req/rep need to be registered */ len = buf->rb_max_requests * (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *)); - len += cdata->padding; p = kzalloc(len, GFP_KERNEL); if (p == NULL) { @@ -1170,17 +1244,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, buf->rb_recv_bufs = (struct rpcrdma_rep **) p; p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests]; - /* - * Register the zeroed pad buffer, if any. - */ - if (cdata->padding) { - rc = rpcrdma_register_internal(ia, p, cdata->padding, - &ep->rep_pad_mr, &ep->rep_pad); - if (rc) - goto out; - } - p += cdata->padding; - INIT_LIST_HEAD(&buf->rb_mws); INIT_LIST_HEAD(&buf->rb_all); switch (ia->ri_memreg_strategy) { @@ -1198,62 +1261,29 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, break; } - /* - * Allocate/init the request/reply buffers. Doing this - * using kmalloc for now -- one for each buf. - */ - wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req)); - rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep)); - dprintk("RPC: %s: wlen = %zu, rlen = %zu\n", - __func__, wlen, rlen); - for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; struct rpcrdma_rep *rep; - req = kmalloc(wlen, GFP_KERNEL); - if (req == NULL) { + req = rpcrdma_create_req(r_xprt); + if (IS_ERR(req)) { dprintk("RPC: %s: request buffer %d alloc" " failed\n", __func__, i); - rc = -ENOMEM; + rc = PTR_ERR(req); goto out; } - memset(req, 0, sizeof(struct rpcrdma_req)); buf->rb_send_bufs[i] = req; - buf->rb_send_bufs[i]->rl_buffer = buf; - rc = rpcrdma_register_internal(ia, req->rl_base, - wlen - offsetof(struct rpcrdma_req, rl_base), - &buf->rb_send_bufs[i]->rl_handle, - &buf->rb_send_bufs[i]->rl_iov); - if (rc) - goto out; - - buf->rb_send_bufs[i]->rl_size = wlen - - sizeof(struct rpcrdma_req); - - rep = kmalloc(rlen, GFP_KERNEL); - if (rep == NULL) { + rep = rpcrdma_create_rep(r_xprt); + if (IS_ERR(rep)) { dprintk("RPC: %s: reply buffer %d alloc failed\n", __func__, i); - rc = -ENOMEM; + rc = PTR_ERR(rep); goto out; } - memset(rep, 0, sizeof(struct rpcrdma_rep)); buf->rb_recv_bufs[i] = rep; - buf->rb_recv_bufs[i]->rr_buffer = buf; - - rc = rpcrdma_register_internal(ia, rep->rr_base, - rlen - offsetof(struct rpcrdma_rep, rr_base), - &buf->rb_recv_bufs[i]->rr_handle, - &buf->rb_recv_bufs[i]->rr_iov); - if (rc) - goto out; - } - dprintk("RPC: %s: max_requests %d\n", - __func__, buf->rb_max_requests); - /* done */ + return 0; out: rpcrdma_buffer_destroy(buf); @@ -1261,6 +1291,27 @@ out: } static void +rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) +{ + if (!rep) + return; + + rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); + kfree(rep); +} + +static void +rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) +{ + if (!req) + return; + + rpcrdma_free_regbuf(ia, req->rl_sendbuf); + rpcrdma_free_regbuf(ia, req->rl_rdmabuf); + kfree(req); +} + +static void rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf) { struct rpcrdma_mw *r; @@ -1315,18 +1366,10 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) dprintk("RPC: %s: entering\n", __func__); for (i = 0; i < buf->rb_max_requests; i++) { - if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) { - rpcrdma_deregister_internal(ia, - buf->rb_recv_bufs[i]->rr_handle, - &buf->rb_recv_bufs[i]->rr_iov); - kfree(buf->rb_recv_bufs[i]); - } - if (buf->rb_send_bufs && buf->rb_send_bufs[i]) { - rpcrdma_deregister_internal(ia, - buf->rb_send_bufs[i]->rl_handle, - &buf->rb_send_bufs[i]->rl_iov); - kfree(buf->rb_send_bufs[i]); - } + if (buf->rb_recv_bufs) + rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]); + if (buf->rb_send_bufs) + rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]); } switch (ia->ri_memreg_strategy) { @@ -1450,8 +1493,8 @@ rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) int i; for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++) - rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf); - rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf); + rpcrdma_buffer_put_mr(&seg->rl_mw, buf); + rpcrdma_buffer_put_mr(&seg1->rl_mw, buf); } static void @@ -1537,7 +1580,7 @@ rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf, list_add(&r->mw_list, stale); continue; } - req->rl_segments[i].mr_chunk.rl_mw = r; + req->rl_segments[i].rl_mw = r; if (unlikely(i-- == 0)) return req; /* Success */ } @@ -1559,7 +1602,7 @@ rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) r = list_entry(buf->rb_mws.next, struct rpcrdma_mw, mw_list); list_del(&r->mw_list); - req->rl_segments[i].mr_chunk.rl_mw = r; + req->rl_segments[i].rl_mw = r; if (unlikely(i-- == 0)) return req; /* Success */ } @@ -1658,8 +1701,6 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req) struct rpcrdma_buffer *buffers = req->rl_buffer; unsigned long flags; - if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */ - buffers = ((struct rpcrdma_req *) buffers)->rl_buffer; spin_lock_irqsave(&buffers->rb_lock, flags); if (buffers->rb_recv_index < buffers->rb_max_requests) { req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; @@ -1688,7 +1729,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) * Wrappers for internal-use kmalloc memory registration, used by buffer code. */ -int +static int rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len, struct ib_mr **mrp, struct ib_sge *iov) { @@ -1739,7 +1780,7 @@ rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len, return rc; } -int +static int rpcrdma_deregister_internal(struct rpcrdma_ia *ia, struct ib_mr *mr, struct ib_sge *iov) { @@ -1757,6 +1798,61 @@ rpcrdma_deregister_internal(struct rpcrdma_ia *ia, return rc; } +/** + * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers + * @ia: controlling rpcrdma_ia + * @size: size of buffer to be allocated, in bytes + * @flags: GFP flags + * + * Returns pointer to private header of an area of internally + * registered memory, or an ERR_PTR. The registered buffer follows + * the end of the private header. + * + * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for + * receiving the payload of RDMA RECV operations. regbufs are not + * used for RDMA READ/WRITE operations, thus are registered only for + * LOCAL access. + */ +struct rpcrdma_regbuf * +rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags) +{ + struct rpcrdma_regbuf *rb; + int rc; + + rc = -ENOMEM; + rb = kmalloc(sizeof(*rb) + size, flags); + if (rb == NULL) + goto out; + + rb->rg_size = size; + rb->rg_owner = NULL; + rc = rpcrdma_register_internal(ia, rb->rg_base, size, + &rb->rg_mr, &rb->rg_iov); + if (rc) + goto out_free; + + return rb; + +out_free: + kfree(rb); +out: + return ERR_PTR(rc); +} + +/** + * rpcrdma_free_regbuf - deregister and free registered buffer + * @ia: controlling rpcrdma_ia + * @rb: regbuf to be deregistered and freed + */ +void +rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) +{ + if (rb) { + rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov); + kfree(rb); + } +} + /* * Wrappers for chunk registration, shared by read/write chunk code. */ @@ -1799,7 +1895,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, struct rpcrdma_xprt *r_xprt) { struct rpcrdma_mr_seg *seg1 = seg; - struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw; + struct rpcrdma_mw *mw = seg1->rl_mw; struct rpcrdma_frmr *frmr = &mw->r.frmr; struct ib_mr *mr = frmr->fr_mr; struct ib_send_wr fastreg_wr, *bad_wr; @@ -1888,12 +1984,12 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg, struct ib_send_wr invalidate_wr, *bad_wr; int rc; - seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_INVALID; + seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID; memset(&invalidate_wr, 0, sizeof invalidate_wr); - invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw; + invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw; invalidate_wr.opcode = IB_WR_LOCAL_INV; - invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey; + invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey; DECR_CQCOUNT(&r_xprt->rx_ep); read_lock(&ia->ri_qplock); @@ -1903,7 +1999,7 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg, read_unlock(&ia->ri_qplock); if (rc) { /* Force rpcrdma_buffer_get() to retry */ - seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_STALE; + seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE; dprintk("RPC: %s: failed ib_post_send for invalidate," " status %i\n", __func__, rc); } @@ -1935,8 +2031,7 @@ rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg, offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr, - physaddrs, i, seg1->mr_dma); + rc = ib_map_phys_fmr(seg1->rl_mw->r.fmr, physaddrs, i, seg1->mr_dma); if (rc) { dprintk("RPC: %s: failed ib_map_phys_fmr " "%u@0x%llx+%i (%d)... status %i\n", __func__, @@ -1945,7 +2040,7 @@ rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg, while (i--) rpcrdma_unmap_one(ia, --seg); } else { - seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey; + seg1->mr_rkey = seg1->rl_mw->r.fmr->rkey; seg1->mr_base = seg1->mr_dma + pageoff; seg1->mr_nsegs = i; seg1->mr_len = len; @@ -1962,7 +2057,7 @@ rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg, LIST_HEAD(l); int rc; - list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l); + list_add(&seg1->rl_mw->r.fmr->list, &l); rc = ib_unmap_fmr(&l); read_lock(&ia->ri_qplock); while (seg1->mr_nsegs--) @@ -2104,11 +2199,13 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, recv_wr.next = NULL; recv_wr.wr_id = (u64) (unsigned long) rep; - recv_wr.sg_list = &rep->rr_iov; + recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; recv_wr.num_sge = 1; ib_dma_sync_single_for_cpu(ia->ri_id->device, - rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL); + rdmab_addr(rep->rr_rdmabuf), + rdmab_length(rep->rr_rdmabuf), + DMA_BIDIRECTIONAL); rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index b799041..d1b7039 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -70,6 +70,9 @@ struct rpcrdma_ia { int ri_async_rc; enum rpcrdma_memreg ri_memreg_strategy; unsigned int ri_max_frmr_depth; + struct ib_device_attr ri_devattr; + struct ib_qp_attr ri_qp_attr; + struct ib_qp_init_attr ri_qp_init_attr; }; /* @@ -83,13 +86,9 @@ struct rpcrdma_ep { atomic_t rep_cqcount; int rep_cqinit; int rep_connected; - struct rpcrdma_ia *rep_ia; struct ib_qp_init_attr rep_attr; wait_queue_head_t rep_connect_wait; - struct ib_sge rep_pad; /* holds zeroed pad */ - struct ib_mr *rep_pad_mr; /* holds zeroed pad */ - void (*rep_func)(struct rpcrdma_ep *); - struct rpc_xprt *rep_xprt; /* for rep_func */ + struct rpcrdma_regbuf *rep_padbuf; struct rdma_conn_param rep_remote_cma; struct sockaddr_storage rep_remote_addr; struct delayed_work rep_connect_worker; @@ -106,6 +105,44 @@ struct rpcrdma_ep { #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit) #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount) +/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV + * + * The below structure appears at the front of a large region of kmalloc'd + * memory, which always starts on a good alignment boundary. + */ + +struct rpcrdma_regbuf { + size_t rg_size; + struct rpcrdma_req *rg_owner; + struct ib_mr *rg_mr; + struct ib_sge rg_iov; + __be32 rg_base[0] __attribute__ ((aligned(256))); +}; + +static inline u64 +rdmab_addr(struct rpcrdma_regbuf *rb) +{ + return rb->rg_iov.addr; +} + +static inline u32 +rdmab_length(struct rpcrdma_regbuf *rb) +{ + return rb->rg_iov.length; +} + +static inline u32 +rdmab_lkey(struct rpcrdma_regbuf *rb) +{ + return rb->rg_iov.lkey; +} + +static inline struct rpcrdma_msg * +rdmab_to_msg(struct rpcrdma_regbuf *rb) +{ + return (struct rpcrdma_msg *)rb->rg_base; +} + enum rpcrdma_chunktype { rpcrdma_noch = 0, rpcrdma_readch, @@ -134,22 +171,16 @@ enum rpcrdma_chunktype { /* temporary static scatter/gather max */ #define RPCRDMA_MAX_DATA_SEGS (64) /* max scatter/gather */ #define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */ -#define MAX_RPCRDMAHDR (\ - /* max supported RPC/RDMA header */ \ - sizeof(struct rpcrdma_msg) + (2 * sizeof(u32)) + \ - (sizeof(struct rpcrdma_read_chunk) * RPCRDMA_MAX_SEGS) + sizeof(u32)) struct rpcrdma_buffer; struct rpcrdma_rep { - unsigned int rr_len; /* actual received reply length */ - struct rpcrdma_buffer *rr_buffer; /* home base for this structure */ - struct rpc_xprt *rr_xprt; /* needed for request/reply matching */ - void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */ - struct list_head rr_list; /* tasklet list */ - struct ib_sge rr_iov; /* for posting */ - struct ib_mr *rr_handle; /* handle for mem in rr_iov */ - char rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */ + unsigned int rr_len; + struct rpcrdma_buffer *rr_buffer; + struct rpc_xprt *rr_xprt; + void (*rr_func)(struct rpcrdma_rep *); + struct list_head rr_list; + struct rpcrdma_regbuf *rr_rdmabuf; }; /* @@ -211,10 +242,7 @@ struct rpcrdma_mw { */ struct rpcrdma_mr_seg { /* chunk descriptors */ - union { /* chunk memory handles */ - struct ib_mr *rl_mr; /* if registered directly */ - struct rpcrdma_mw *rl_mw; /* if registered from region */ - } mr_chunk; + struct rpcrdma_mw *rl_mw; /* registered MR */ u64 mr_base; /* registration result */ u32 mr_rkey; /* registration result */ u32 mr_len; /* length of chunk or segment */ @@ -227,22 +255,27 @@ struct rpcrdma_mr_seg { /* chunk descriptors */ }; struct rpcrdma_req { - size_t rl_size; /* actual length of buffer */ unsigned int rl_niovs; /* 0, 2 or 4 */ unsigned int rl_nchunks; /* non-zero if chunks */ unsigned int rl_connect_cookie; /* retry detection */ enum rpcrdma_chunktype rl_rtype, rl_wtype; struct rpcrdma_buffer *rl_buffer; /* home base for this structure */ struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ - struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */ struct ib_sge rl_send_iov[4]; /* for active requests */ - struct ib_sge rl_iov; /* for posting */ - struct ib_mr *rl_handle; /* handle for mem in rl_iov */ - char rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */ - __u32 rl_xdr_buf[0]; /* start of returned rpc rq_buffer */ + struct rpcrdma_regbuf *rl_rdmabuf; + struct rpcrdma_regbuf *rl_sendbuf; + struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; -#define rpcr_to_rdmar(r) \ - container_of((r)->rq_buffer, struct rpcrdma_req, rl_xdr_buf[0]) + +static inline struct rpcrdma_req * +rpcr_to_rdmar(struct rpc_rqst *rqst) +{ + void *buffer = rqst->rq_buffer; + struct rpcrdma_regbuf *rb; + + rb = container_of(buffer, struct rpcrdma_regbuf, rg_base); + return rb->rg_owner; +} /* * struct rpcrdma_buffer -- holds list/queue of pre-registered memory for @@ -252,7 +285,6 @@ struct rpcrdma_req { */ struct rpcrdma_buffer { spinlock_t rb_lock; /* protects indexes */ - atomic_t rb_credits; /* most recent server credits */ int rb_max_requests;/* client max requests */ struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */ struct list_head rb_all; @@ -318,16 +350,16 @@ struct rpcrdma_stats { * during unmount. */ struct rpcrdma_xprt { - struct rpc_xprt xprt; + struct rpc_xprt rx_xprt; struct rpcrdma_ia rx_ia; struct rpcrdma_ep rx_ep; struct rpcrdma_buffer rx_buf; struct rpcrdma_create_data_internal rx_data; - struct delayed_work rdma_connect; + struct delayed_work rx_connect_worker; struct rpcrdma_stats rx_stats; }; -#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, xprt) +#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, rx_xprt) #define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data) /* Setting this to 0 ensures interoperability with early servers. @@ -358,9 +390,7 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *, /* * Buffer calls - xprtrdma/verbs.c */ -int rpcrdma_buffer_create(struct rpcrdma_buffer *, struct rpcrdma_ep *, - struct rpcrdma_ia *, - struct rpcrdma_create_data_internal *); +int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); @@ -368,16 +398,16 @@ void rpcrdma_buffer_put(struct rpcrdma_req *); void rpcrdma_recv_buffer_get(struct rpcrdma_req *); void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); -int rpcrdma_register_internal(struct rpcrdma_ia *, void *, int, - struct ib_mr **, struct ib_sge *); -int rpcrdma_deregister_internal(struct rpcrdma_ia *, - struct ib_mr *, struct ib_sge *); - int rpcrdma_register_external(struct rpcrdma_mr_seg *, int, int, struct rpcrdma_xprt *); int rpcrdma_deregister_external(struct rpcrdma_mr_seg *, struct rpcrdma_xprt *); +struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *, + size_t, gfp_t); +void rpcrdma_free_regbuf(struct rpcrdma_ia *, + struct rpcrdma_regbuf *); + /* * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 87ce7e8..66891e3 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -63,6 +63,8 @@ static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE; static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) + #define XS_TCP_LINGER_TO (15U * HZ) static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; @@ -75,8 +77,6 @@ static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; * someone else's file names! */ -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) - static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT; @@ -627,7 +627,7 @@ process_status: * @xprt: transport * * Initiates a graceful shutdown of the TCP socket by calling the - * equivalent of shutdown(SHUT_WR); + * equivalent of shutdown(SHUT_RDWR); */ static void xs_tcp_shutdown(struct rpc_xprt *xprt) { @@ -635,7 +635,7 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt) struct socket *sock = transport->sock; if (sock != NULL) { - kernel_sock_shutdown(sock, SHUT_WR); + kernel_sock_shutdown(sock, SHUT_RDWR); trace_rpc_socket_shutdown(xprt, sock); } } @@ -718,9 +718,9 @@ static int xs_tcp_send_request(struct rpc_task *task) dprintk("RPC: sendmsg returned unrecognized error %d\n", -status); case -ECONNRESET: - xs_tcp_shutdown(xprt); case -ECONNREFUSED: case -ENOTCONN: + case -EADDRINUSE: case -EPIPE: clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); } @@ -773,6 +773,21 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s sk->sk_error_report = transport->old_error_report; } +static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) +{ + smp_mb__before_atomic(); + clear_bit(XPRT_CLOSE_WAIT, &xprt->state); + clear_bit(XPRT_CLOSING, &xprt->state); + smp_mb__after_atomic(); +} + +static void xs_sock_mark_closed(struct rpc_xprt *xprt) +{ + xs_sock_reset_connection_flags(xprt); + /* Mark transport as closed and wake up all pending tasks */ + xprt_disconnect_done(xprt); +} + /** * xs_error_report - callback to handle TCP socket state errors * @sk: socket @@ -792,11 +807,12 @@ static void xs_error_report(struct sock *sk) err = -sk->sk_err; if (err == 0) goto out; + /* Is this a reset event? */ + if (sk->sk_state == TCP_CLOSE) + xs_sock_mark_closed(xprt); dprintk("RPC: xs_error_report client %p, error=%d...\n", xprt, -err); trace_rpc_socket_error(xprt, sk->sk_socket, err); - if (test_bit(XPRT_CONNECTION_REUSE, &xprt->state)) - goto out; xprt_wake_pending_tasks(xprt, err); out: read_unlock_bh(&sk->sk_callback_lock); @@ -806,12 +822,11 @@ static void xs_reset_transport(struct sock_xprt *transport) { struct socket *sock = transport->sock; struct sock *sk = transport->inet; + struct rpc_xprt *xprt = &transport->xprt; if (sk == NULL) return; - transport->srcport = 0; - write_lock_bh(&sk->sk_callback_lock); transport->inet = NULL; transport->sock = NULL; @@ -820,8 +835,9 @@ static void xs_reset_transport(struct sock_xprt *transport) xs_restore_old_callbacks(transport, sk); write_unlock_bh(&sk->sk_callback_lock); + xs_sock_reset_connection_flags(xprt); - trace_rpc_socket_close(&transport->xprt, sock); + trace_rpc_socket_close(xprt, sock); sock_release(sock); } @@ -841,27 +857,12 @@ static void xs_close(struct rpc_xprt *xprt) dprintk("RPC: xs_close xprt %p\n", xprt); - cancel_delayed_work_sync(&transport->connect_worker); - xs_reset_transport(transport); xprt->reestablish_timeout = 0; - smp_mb__before_atomic(); - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); - clear_bit(XPRT_CLOSE_WAIT, &xprt->state); - clear_bit(XPRT_CLOSING, &xprt->state); - smp_mb__after_atomic(); xprt_disconnect_done(xprt); } -static void xs_tcp_close(struct rpc_xprt *xprt) -{ - if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state)) - xs_close(xprt); - else - xs_tcp_shutdown(xprt); -} - static void xs_xprt_free(struct rpc_xprt *xprt) { xs_free_peer_addresses(xprt); @@ -1032,7 +1033,6 @@ static void xs_udp_data_ready(struct sock *sk) */ static void xs_tcp_force_close(struct rpc_xprt *xprt) { - set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); xprt_force_disconnect(xprt); } @@ -1425,54 +1425,6 @@ out: read_unlock_bh(&sk->sk_callback_lock); } -/* - * Do the equivalent of linger/linger2 handling for dealing with - * broken servers that don't close the socket in a timely - * fashion - */ -static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt, - unsigned long timeout) -{ - struct sock_xprt *transport; - - if (xprt_test_and_set_connecting(xprt)) - return; - set_bit(XPRT_CONNECTION_ABORT, &xprt->state); - transport = container_of(xprt, struct sock_xprt, xprt); - queue_delayed_work(rpciod_workqueue, &transport->connect_worker, - timeout); -} - -static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt) -{ - struct sock_xprt *transport; - - transport = container_of(xprt, struct sock_xprt, xprt); - - if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) || - !cancel_delayed_work(&transport->connect_worker)) - return; - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); - xprt_clear_connecting(xprt); -} - -static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) -{ - smp_mb__before_atomic(); - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); - clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state); - clear_bit(XPRT_CLOSE_WAIT, &xprt->state); - clear_bit(XPRT_CLOSING, &xprt->state); - smp_mb__after_atomic(); -} - -static void xs_sock_mark_closed(struct rpc_xprt *xprt) -{ - xs_sock_reset_connection_flags(xprt); - /* Mark transport as closed and wake up all pending tasks */ - xprt_disconnect_done(xprt); -} - /** * xs_tcp_state_change - callback to handle TCP socket state changes * @sk: socket whose state has changed @@ -1521,7 +1473,6 @@ static void xs_tcp_state_change(struct sock *sk) clear_bit(XPRT_CONNECTED, &xprt->state); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); smp_mb__after_atomic(); - xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); break; case TCP_CLOSE_WAIT: /* The server initiated a shutdown of the socket */ @@ -1538,13 +1489,11 @@ static void xs_tcp_state_change(struct sock *sk) break; case TCP_LAST_ACK: set_bit(XPRT_CLOSING, &xprt->state); - xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); smp_mb__before_atomic(); clear_bit(XPRT_CONNECTED, &xprt->state); smp_mb__after_atomic(); break; case TCP_CLOSE: - xs_tcp_cancel_linger_timeout(xprt); xs_sock_mark_closed(xprt); } out: @@ -1667,6 +1616,40 @@ static unsigned short xs_get_random_port(void) } /** + * xs_set_reuseaddr_port - set the socket's port and address reuse options + * @sock: socket + * + * Note that this function has to be called on all sockets that share the + * same port, and it must be called before binding. + */ +static void xs_sock_set_reuseport(struct socket *sock) +{ + int opt = 1; + + kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, + (char *)&opt, sizeof(opt)); +} + +static unsigned short xs_sock_getport(struct socket *sock) +{ + struct sockaddr_storage buf; + int buflen; + unsigned short port = 0; + + if (kernel_getsockname(sock, (struct sockaddr *)&buf, &buflen) < 0) + goto out; + switch (buf.ss_family) { + case AF_INET6: + port = ntohs(((struct sockaddr_in6 *)&buf)->sin6_port); + break; + case AF_INET: + port = ntohs(((struct sockaddr_in *)&buf)->sin_port); + } +out: + return port; +} + +/** * xs_set_port - reset the port number in the remote endpoint address * @xprt: generic transport * @port: new port number @@ -1680,6 +1663,12 @@ static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) xs_update_peer_port(xprt); } +static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock) +{ + if (transport->srcport == 0) + transport->srcport = xs_sock_getport(sock); +} + static unsigned short xs_get_srcport(struct sock_xprt *transport) { unsigned short port = transport->srcport; @@ -1833,7 +1822,8 @@ static void xs_dummy_setup_socket(struct work_struct *work) } static struct socket *xs_create_sock(struct rpc_xprt *xprt, - struct sock_xprt *transport, int family, int type, int protocol) + struct sock_xprt *transport, int family, int type, + int protocol, bool reuseport) { struct socket *sock; int err; @@ -1846,6 +1836,9 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt, } xs_reclassify_socket(family, sock); + if (reuseport) + xs_sock_set_reuseport(sock); + err = xs_bind(transport, sock); if (err) { sock_release(sock); @@ -1903,7 +1896,6 @@ static int xs_local_setup_socket(struct sock_xprt *transport) struct socket *sock; int status = -EIO; - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); status = __sock_create(xprt->xprt_net, AF_LOCAL, SOCK_STREAM, 0, &sock, 1); if (status < 0) { @@ -2044,10 +2036,9 @@ static void xs_udp_setup_socket(struct work_struct *work) struct socket *sock = transport->sock; int status = -EIO; - /* Start by resetting any existing state */ - xs_reset_transport(transport); sock = xs_create_sock(xprt, transport, - xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP); + xs_addr(xprt)->sa_family, SOCK_DGRAM, + IPPROTO_UDP, false); if (IS_ERR(sock)) goto out; @@ -2061,61 +2052,11 @@ static void xs_udp_setup_socket(struct work_struct *work) trace_rpc_socket_connect(xprt, sock, 0); status = 0; out: + xprt_unlock_connect(xprt, transport); xprt_clear_connecting(xprt); xprt_wake_pending_tasks(xprt, status); } -/* - * We need to preserve the port number so the reply cache on the server can - * find our cached RPC replies when we get around to reconnecting. - */ -static void xs_abort_connection(struct sock_xprt *transport) -{ - int result; - struct sockaddr any; - - dprintk("RPC: disconnecting xprt %p to reuse port\n", transport); - - /* - * Disconnect the transport socket by doing a connect operation - * with AF_UNSPEC. This should return immediately... - */ - memset(&any, 0, sizeof(any)); - any.sa_family = AF_UNSPEC; - result = kernel_connect(transport->sock, &any, sizeof(any), 0); - trace_rpc_socket_reset_connection(&transport->xprt, - transport->sock, result); - if (!result) - xs_sock_reset_connection_flags(&transport->xprt); - dprintk("RPC: AF_UNSPEC connect return code %d\n", result); -} - -static void xs_tcp_reuse_connection(struct sock_xprt *transport) -{ - unsigned int state = transport->inet->sk_state; - - if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) { - /* we don't need to abort the connection if the socket - * hasn't undergone a shutdown - */ - if (transport->inet->sk_shutdown == 0) - return; - dprintk("RPC: %s: TCP_CLOSEd and sk_shutdown set to %d\n", - __func__, transport->inet->sk_shutdown); - } - if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) { - /* we don't need to abort the connection if the socket - * hasn't undergone a shutdown - */ - if (transport->inet->sk_shutdown == 0) - return; - dprintk("RPC: %s: ESTABLISHED/SYN_SENT " - "sk_shutdown set to %d\n", - __func__, transport->inet->sk_shutdown); - } - xs_abort_connection(transport); -} - static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); @@ -2149,9 +2090,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) sk->sk_allocation = GFP_ATOMIC; /* socket options */ - sk->sk_userlocks |= SOCK_BINDPORT_LOCK; sock_reset_flag(sk, SOCK_LINGER); - tcp_sk(sk)->linger2 = 0; tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; xprt_clear_connected(xprt); @@ -2174,6 +2113,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); switch (ret) { case 0: + xs_set_srcport(transport, sock); case -EINPROGRESS: /* SYN_SENT! */ if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) @@ -2200,25 +2140,13 @@ static void xs_tcp_setup_socket(struct work_struct *work) int status = -EIO; if (!sock) { - clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); sock = xs_create_sock(xprt, transport, - xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP); + xs_addr(xprt)->sa_family, SOCK_STREAM, + IPPROTO_TCP, true); if (IS_ERR(sock)) { status = PTR_ERR(sock); goto out; } - } else { - int abort_and_exit; - - abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT, - &xprt->state); - /* "close" the socket, preserving the local port */ - set_bit(XPRT_CONNECTION_REUSE, &xprt->state); - xs_tcp_reuse_connection(transport); - clear_bit(XPRT_CONNECTION_REUSE, &xprt->state); - - if (abort_and_exit) - goto out_eagain; } dprintk("RPC: worker connecting xprt %p via %s to " @@ -2245,6 +2173,7 @@ static void xs_tcp_setup_socket(struct work_struct *work) case 0: case -EINPROGRESS: case -EALREADY: + xprt_unlock_connect(xprt, transport); xprt_clear_connecting(xprt); return; case -EINVAL: @@ -2254,13 +2183,15 @@ static void xs_tcp_setup_socket(struct work_struct *work) case -ECONNREFUSED: case -ECONNRESET: case -ENETUNREACH: + case -EADDRINUSE: case -ENOBUFS: /* retry with existing socket, after a delay */ + xs_tcp_force_close(xprt); goto out; } -out_eagain: status = -EAGAIN; out: + xprt_unlock_connect(xprt, transport); xprt_clear_connecting(xprt); xprt_wake_pending_tasks(xprt, status); } @@ -2283,6 +2214,11 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport)); + + /* Start by resetting any existing state */ + xs_reset_transport(transport); + if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) { dprintk("RPC: xs_connect delayed xprt %p for %lu " "seconds\n", @@ -2559,7 +2495,7 @@ static struct rpc_xprt_ops xs_tcp_ops = { .buf_free = rpc_free, .send_request = xs_tcp_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, - .close = xs_tcp_close, + .close = xs_tcp_shutdown, .destroy = xs_destroy, .print_stats = xs_tcp_print_stats, }; |