diff options
-rw-r--r-- | net/rds/ib.h | 1 | ||||
-rw-r--r-- | net/rds/ib_cm.c | 14 | ||||
-rw-r--r-- | net/rds/ib_send.c | 47 |
3 files changed, 54 insertions, 8 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h index acda2db..a13ced5 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -108,6 +108,7 @@ struct rds_ib_connection { struct rds_header *i_send_hdrs; u64 i_send_hdrs_dma; struct rds_ib_send_work *i_sends; + atomic_t i_signaled_sends; /* rx */ struct tasklet_struct i_recv_tasklet; diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 10f6a88..123c7d3 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -615,11 +615,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) } /* - * Don't wait for the send ring to be empty -- there may be completed - * non-signaled entries sitting on there. We unmap these below. + * We want to wait for tx and rx completion to finish + * before we tear down the connection, but we have to be + * careful not to get stuck waiting on a send ring that + * only has unsignaled sends in it. We've shutdown new + * sends before getting here so by waiting for signaled + * sends to complete we're ensured that there will be no + * more tx processing. */ wait_event(rds_ib_ring_empty_wait, - rds_ib_ring_empty(&ic->i_recv_ring)); + rds_ib_ring_empty(&ic->i_recv_ring) && + (atomic_read(&ic->i_signaled_sends) == 0)); + tasklet_kill(&ic->i_recv_tasklet); if (ic->i_send_hdrs) ib_dma_free_coherent(dev, @@ -729,6 +736,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) #ifndef KERNEL_HAS_ATOMIC64 spin_lock_init(&ic->i_ack_lock); #endif + atomic_set(&ic->i_signaled_sends, 0); /* * rds_ib_conn_shutdown() waits for these to be emptied so they diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index e88cb4a..15f7569 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -220,6 +220,18 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic) } /* + * The only fast path caller always has a non-zero nr, so we don't + * bother testing nr before performing the atomic sub. + */ +static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr) +{ + if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) && + waitqueue_active(&rds_ib_ring_empty_wait)) + wake_up(&rds_ib_ring_empty_wait); + BUG_ON(atomic_read(&ic->i_signaled_sends) < 0); +} + +/* * The _oldest/_free ring operations here race cleanly with the alloc/unalloc * operations performed in the send path. As the sender allocs and potentially * unallocs the next free entry in the ring it doesn't alter which is @@ -236,6 +248,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) u32 oldest; u32 i = 0; int ret; + int nr_sig = 0; rdsdebug("cq %p conn %p\n", cq, conn); rds_ib_stats_inc(s_ib_tx_cq_call); @@ -262,6 +275,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) for (i = 0; i < completed; i++) { send = &ic->i_sends[oldest]; + if (send->s_wr.send_flags & IB_SEND_SIGNALED) + nr_sig++; rm = rds_ib_send_unmap_op(ic, send, wc.status); @@ -282,6 +297,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) } rds_ib_ring_free(&ic->i_send_ring, completed); + rds_ib_sub_signaled(ic, nr_sig); + nr_sig = 0; if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || test_bit(0, &conn->c_map_queued)) @@ -440,9 +457,9 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted) set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); } -static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, - struct rds_ib_send_work *send, - bool notify) +static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, + struct rds_ib_send_work *send, + bool notify) { /* * We want to delay signaling completions just enough to get @@ -452,7 +469,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, if (ic->i_unsignaled_wrs-- == 0 || notify) { ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; send->s_wr.send_flags |= IB_SEND_SIGNALED; + return 1; } + return 0; } /* @@ -488,6 +507,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, int bytes_sent = 0; int ret; int flow_controlled = 0; + int nr_sig = 0; BUG_ON(off % RDS_FRAG_SIZE); BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header)); @@ -645,6 +665,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, if (ic->i_flowctl && flow_controlled && i == (work_alloc-1)) send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; + if (send->s_wr.send_flags & IB_SEND_SIGNALED) + nr_sig++; + rdsdebug("send %p wr %p num_sge %u next %p\n", send, &send->s_wr, send->s_wr.num_sge, send->s_wr.next); @@ -689,6 +712,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, if (ic->i_flowctl && i < credit_alloc) rds_ib_send_add_credits(conn, credit_alloc - i); + if (nr_sig) + atomic_add(nr_sig, &ic->i_signaled_sends); + /* XXX need to worry about failed_wr and partial sends. */ failed_wr = &first->s_wr; ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); @@ -699,6 +725,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 " "returned %d\n", &conn->c_faddr, ret); rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); + rds_ib_sub_signaled(ic, nr_sig); if (prev->s_op) { ic->i_data_op = prev->s_op; prev->s_op = NULL; @@ -728,6 +755,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) u32 pos; u32 work_alloc; int ret; + int nr_sig = 0; rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); @@ -752,7 +780,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) send->s_wr.wr.atomic.compare_add = op->op_swap_add; send->s_wr.wr.atomic.swap = 0; } - rds_ib_set_wr_signal_state(ic, send, op->op_notify); + nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify); send->s_wr.num_sge = 1; send->s_wr.next = NULL; send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; @@ -778,6 +806,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr, send->s_sge[0].addr, send->s_sge[0].length); + if (nr_sig) + atomic_add(nr_sig, &ic->i_signaled_sends); + failed_wr = &send->s_wr; ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr); rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic, @@ -787,6 +818,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 " "returned %d\n", &conn->c_faddr, ret); rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); + rds_ib_sub_signaled(ic, nr_sig); goto out; } @@ -817,6 +849,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) int sent; int ret; int num_sge; + int nr_sig = 0; /* map the op the first time we see it */ if (!op->op_mapped) { @@ -859,7 +892,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) send->s_queued = jiffies; send->s_op = NULL; - rds_ib_set_wr_signal_state(ic, send, op->op_notify); + nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify); send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; send->s_wr.wr.rdma.remote_addr = remote_addr; @@ -910,6 +943,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) work_alloc = i; } + if (nr_sig) + atomic_add(nr_sig, &ic->i_signaled_sends); + failed_wr = &first->s_wr; ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, @@ -919,6 +955,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 " "returned %d\n", &conn->c_faddr, ret); rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); + rds_ib_sub_signaled(ic, nr_sig); goto out; } |