diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/rds/af_rds.c | 8 | ||||
-rw-r--r-- | net/rds/bind.c | 76 | ||||
-rw-r--r-- | net/rds/ib.c | 47 | ||||
-rw-r--r-- | net/rds/ib.h | 78 | ||||
-rw-r--r-- | net/rds/ib_cm.c | 114 | ||||
-rw-r--r-- | net/rds/ib_rdma.c | 116 | ||||
-rw-r--r-- | net/rds/ib_recv.c | 136 | ||||
-rw-r--r-- | net/rds/ib_send.c | 110 | ||||
-rw-r--r-- | net/rds/ib_stats.c | 22 | ||||
-rw-r--r-- | net/rds/rds.h | 1 | ||||
-rw-r--r-- | net/rds/send.c | 20 | ||||
-rw-r--r-- | net/rds/threads.c | 2 |
12 files changed, 448 insertions, 282 deletions
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index a2f28a6..384ea1e 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -72,13 +72,7 @@ static int rds_release(struct socket *sock) rds_clear_recv_queue(rs); rds_cong_remove_socket(rs); - /* - * the binding lookup hash uses rcu, we need to - * make sure we synchronize_rcu before we free our - * entry - */ rds_remove_bound(rs); - synchronize_rcu(); rds_send_drop_to(rs, NULL); rds_rdma_drop_keys(rs); @@ -588,6 +582,8 @@ static int rds_init(void) { int ret; + rds_bind_lock_init(); + ret = rds_conn_init(); if (ret) goto out; diff --git a/net/rds/bind.c b/net/rds/bind.c index dd666fb..bc6b93e 100644 --- a/net/rds/bind.c +++ b/net/rds/bind.c @@ -38,48 +38,50 @@ #include <linux/ratelimit.h> #include "rds.h" +struct bind_bucket { + rwlock_t lock; + struct hlist_head head; +}; + #define BIND_HASH_SIZE 1024 -static struct hlist_head bind_hash_table[BIND_HASH_SIZE]; -static DEFINE_SPINLOCK(rds_bind_lock); +static struct bind_bucket bind_hash_table[BIND_HASH_SIZE]; -static struct hlist_head *hash_to_bucket(__be32 addr, __be16 port) +static struct bind_bucket *hash_to_bucket(__be32 addr, __be16 port) { return bind_hash_table + (jhash_2words((u32)addr, (u32)port, 0) & (BIND_HASH_SIZE - 1)); } -static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port, +/* must hold either read or write lock (write lock for insert != NULL) */ +static struct rds_sock *rds_bind_lookup(struct bind_bucket *bucket, + __be32 addr, __be16 port, struct rds_sock *insert) { struct rds_sock *rs; - struct hlist_head *head = hash_to_bucket(addr, port); + struct hlist_head *head = &bucket->head; u64 cmp; u64 needle = ((u64)be32_to_cpu(addr) << 32) | be16_to_cpu(port); - rcu_read_lock(); - hlist_for_each_entry_rcu(rs, head, rs_bound_node) { + hlist_for_each_entry(rs, head, rs_bound_node) { cmp = ((u64)be32_to_cpu(rs->rs_bound_addr) << 32) | be16_to_cpu(rs->rs_bound_port); if (cmp == needle) { - rcu_read_unlock(); + rds_sock_addref(rs); return rs; } } - rcu_read_unlock(); if (insert) { /* * make sure our addr and port are set before - * we are added to the list, other people - * in rcu will find us as soon as the - * hlist_add_head_rcu is done + * we are added to the list. */ insert->rs_bound_addr = addr; insert->rs_bound_port = port; rds_sock_addref(insert); - hlist_add_head_rcu(&insert->rs_bound_node, head); + hlist_add_head(&insert->rs_bound_node, head); } return NULL; } @@ -93,16 +95,21 @@ static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port, struct rds_sock *rds_find_bound(__be32 addr, __be16 port) { struct rds_sock *rs; + unsigned long flags; + struct bind_bucket *bucket = hash_to_bucket(addr, port); - rs = rds_bind_lookup(addr, port, NULL); + read_lock_irqsave(&bucket->lock, flags); + rs = rds_bind_lookup(bucket, addr, port, NULL); + read_unlock_irqrestore(&bucket->lock, flags); - if (rs && !sock_flag(rds_rs_to_sk(rs), SOCK_DEAD)) - rds_sock_addref(rs); - else + if (rs && sock_flag(rds_rs_to_sk(rs), SOCK_DEAD)) { + rds_sock_put(rs); rs = NULL; + } rdsdebug("returning rs %p for %pI4:%u\n", rs, &addr, ntohs(port)); + return rs; } @@ -112,6 +119,7 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port) unsigned long flags; int ret = -EADDRINUSE; u16 rover, last; + struct bind_bucket *bucket; if (*port != 0) { rover = be16_to_cpu(*port); @@ -121,42 +129,48 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port) last = rover - 1; } - spin_lock_irqsave(&rds_bind_lock, flags); - do { + struct rds_sock *rrs; if (rover == 0) rover++; - if (!rds_bind_lookup(addr, cpu_to_be16(rover), rs)) { + + bucket = hash_to_bucket(addr, cpu_to_be16(rover)); + write_lock_irqsave(&bucket->lock, flags); + rrs = rds_bind_lookup(bucket, addr, cpu_to_be16(rover), rs); + write_unlock_irqrestore(&bucket->lock, flags); + if (!rrs) { *port = rs->rs_bound_port; ret = 0; rdsdebug("rs %p binding to %pI4:%d\n", rs, &addr, (int)ntohs(*port)); break; + } else { + rds_sock_put(rrs); } } while (rover++ != last); - spin_unlock_irqrestore(&rds_bind_lock, flags); - return ret; } void rds_remove_bound(struct rds_sock *rs) { unsigned long flags; + struct bind_bucket *bucket = + hash_to_bucket(rs->rs_bound_addr, rs->rs_bound_port); - spin_lock_irqsave(&rds_bind_lock, flags); + write_lock_irqsave(&bucket->lock, flags); if (rs->rs_bound_addr) { rdsdebug("rs %p unbinding from %pI4:%d\n", rs, &rs->rs_bound_addr, ntohs(rs->rs_bound_port)); - hlist_del_init_rcu(&rs->rs_bound_node); + hlist_del_init(&rs->rs_bound_node); rds_sock_put(rs); rs->rs_bound_addr = 0; } - spin_unlock_irqrestore(&rds_bind_lock, flags); + write_unlock_irqrestore(&bucket->lock, flags); } int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len) @@ -200,9 +214,13 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len) out: release_sock(sk); - - /* we might have called rds_remove_bound on error */ - if (ret) - synchronize_rcu(); return ret; } + +void rds_bind_lock_init(void) +{ + int i; + + for (i = 0; i < BIND_HASH_SIZE; i++) + rwlock_init(&bind_hash_table[i].lock); +} diff --git a/net/rds/ib.c b/net/rds/ib.c index 2d3f2ab..a833ab7 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -43,14 +43,14 @@ #include "rds.h" #include "ib.h" -static unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE; -unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */ +unsigned int rds_ib_fmr_1m_pool_size = RDS_FMR_1M_POOL_SIZE; +unsigned int rds_ib_fmr_8k_pool_size = RDS_FMR_8K_POOL_SIZE; unsigned int rds_ib_retry_count = RDS_IB_DEFAULT_RETRY_COUNT; -module_param(fmr_pool_size, int, 0444); -MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA"); -module_param(fmr_message_size, int, 0444); -MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer"); +module_param(rds_ib_fmr_1m_pool_size, int, 0444); +MODULE_PARM_DESC(rds_ib_fmr_1m_pool_size, " Max number of 1M fmr per HCA"); +module_param(rds_ib_fmr_8k_pool_size, int, 0444); +MODULE_PARM_DESC(rds_ib_fmr_8k_pool_size, " Max number of 8K fmr per HCA"); module_param(rds_ib_retry_count, int, 0444); MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error"); @@ -97,8 +97,10 @@ static void rds_ib_dev_free(struct work_struct *work) struct rds_ib_device *rds_ibdev = container_of(work, struct rds_ib_device, free_work); - if (rds_ibdev->mr_pool) - rds_ib_destroy_mr_pool(rds_ibdev->mr_pool); + if (rds_ibdev->mr_8k_pool) + rds_ib_destroy_mr_pool(rds_ibdev->mr_8k_pool); + if (rds_ibdev->mr_1m_pool) + rds_ib_destroy_mr_pool(rds_ibdev->mr_1m_pool); if (rds_ibdev->pd) ib_dealloc_pd(rds_ibdev->pd); @@ -148,9 +150,13 @@ static void rds_ib_add_one(struct ib_device *device) rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE); rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32; - rds_ibdev->max_fmrs = dev_attr->max_fmr ? - min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) : - fmr_pool_size; + rds_ibdev->max_1m_fmrs = dev_attr->max_mr ? + min_t(unsigned int, (dev_attr->max_mr / 2), + rds_ib_fmr_1m_pool_size) : rds_ib_fmr_1m_pool_size; + + rds_ibdev->max_8k_fmrs = dev_attr->max_mr ? + min_t(unsigned int, ((dev_attr->max_mr / 2) * RDS_MR_8K_SCALE), + rds_ib_fmr_8k_pool_size) : rds_ib_fmr_8k_pool_size; rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom; rds_ibdev->max_responder_resources = dev_attr->max_qp_rd_atom; @@ -162,12 +168,25 @@ static void rds_ib_add_one(struct ib_device *device) goto put_dev; } - rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev); - if (IS_ERR(rds_ibdev->mr_pool)) { - rds_ibdev->mr_pool = NULL; + rds_ibdev->mr_1m_pool = + rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_1M_POOL); + if (IS_ERR(rds_ibdev->mr_1m_pool)) { + rds_ibdev->mr_1m_pool = NULL; goto put_dev; } + rds_ibdev->mr_8k_pool = + rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_8K_POOL); + if (IS_ERR(rds_ibdev->mr_8k_pool)) { + rds_ibdev->mr_8k_pool = NULL; + goto put_dev; + } + + rdsdebug("RDS/IB: max_mr = %d, max_wrs = %d, max_sge = %d, fmr_max_remaps = %d, max_1m_fmrs = %d, max_8k_fmrs = %d\n", + dev_attr->max_fmr, rds_ibdev->max_wrs, rds_ibdev->max_sge, + rds_ibdev->fmr_max_remaps, rds_ibdev->max_1m_fmrs, + rds_ibdev->max_8k_fmrs); + INIT_LIST_HEAD(&rds_ibdev->ipaddr_list); INIT_LIST_HEAD(&rds_ibdev->conn_list); diff --git a/net/rds/ib.h b/net/rds/ib.h index aae60fd..f17d095 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -9,8 +9,11 @@ #include "rds.h" #include "rdma_transport.h" -#define RDS_FMR_SIZE 256 -#define RDS_FMR_POOL_SIZE 8192 +#define RDS_FMR_1M_POOL_SIZE (8192 / 2) +#define RDS_FMR_1M_MSG_SIZE 256 +#define RDS_FMR_8K_MSG_SIZE 2 +#define RDS_MR_8K_SCALE (256 / (RDS_FMR_8K_MSG_SIZE + 1)) +#define RDS_FMR_8K_POOL_SIZE (RDS_MR_8K_SCALE * (8192 / 2)) #define RDS_IB_MAX_SGE 8 #define RDS_IB_RECV_SGE 2 @@ -24,6 +27,9 @@ #define RDS_IB_RECYCLE_BATCH_COUNT 32 +#define RDS_IB_WC_MAX 32 +#define RDS_IB_SEND_OP BIT_ULL(63) + extern struct rw_semaphore rds_ib_devices_lock; extern struct list_head rds_ib_devices; @@ -89,6 +95,20 @@ struct rds_ib_work_ring { atomic_t w_free_ctr; }; +/* Rings are posted with all the allocations they'll need to queue the + * incoming message to the receiving socket so this can't fail. + * All fragments start with a header, so we can make sure we're not receiving + * garbage, and we can tell a small 8 byte fragment from an ACK frame. + */ +struct rds_ib_ack_state { + u64 ack_next; + u64 ack_recv; + unsigned int ack_required:1; + unsigned int ack_next_valid:1; + unsigned int ack_recv_valid:1; +}; + + struct rds_ib_device; struct rds_ib_connection { @@ -102,6 +122,12 @@ struct rds_ib_connection { struct ib_pd *i_pd; struct ib_cq *i_send_cq; struct ib_cq *i_recv_cq; + struct ib_wc i_send_wc[RDS_IB_WC_MAX]; + struct ib_wc i_recv_wc[RDS_IB_WC_MAX]; + + /* interrupt handling */ + struct tasklet_struct i_send_tasklet; + struct tasklet_struct i_recv_tasklet; /* tx */ struct rds_ib_work_ring i_send_ring; @@ -112,7 +138,6 @@ struct rds_ib_connection { atomic_t i_signaled_sends; /* rx */ - struct tasklet_struct i_recv_tasklet; struct mutex i_recv_mutex; struct rds_ib_work_ring i_recv_ring; struct rds_ib_incoming *i_ibinc; @@ -164,6 +189,12 @@ struct rds_ib_connection { struct rds_ib_ipaddr { struct list_head list; __be32 ipaddr; + struct rcu_head rcu; +}; + +enum { + RDS_IB_MR_8K_POOL, + RDS_IB_MR_1M_POOL, }; struct rds_ib_device { @@ -172,9 +203,12 @@ struct rds_ib_device { struct list_head conn_list; struct ib_device *dev; struct ib_pd *pd; - struct rds_ib_mr_pool *mr_pool; - unsigned int fmr_max_remaps; unsigned int max_fmrs; + struct rds_ib_mr_pool *mr_1m_pool; + struct rds_ib_mr_pool *mr_8k_pool; + unsigned int fmr_max_remaps; + unsigned int max_8k_fmrs; + unsigned int max_1m_fmrs; int max_sge; unsigned int max_wrs; unsigned int max_initiator_depth; @@ -197,14 +231,14 @@ struct rds_ib_device { struct rds_ib_statistics { uint64_t s_ib_connect_raced; uint64_t s_ib_listen_closed_stale; - uint64_t s_ib_tx_cq_call; + uint64_t s_ib_evt_handler_call; + uint64_t s_ib_tasklet_call; uint64_t s_ib_tx_cq_event; uint64_t s_ib_tx_ring_full; uint64_t s_ib_tx_throttle; uint64_t s_ib_tx_sg_mapping_failure; uint64_t s_ib_tx_stalled; uint64_t s_ib_tx_credit_updates; - uint64_t s_ib_rx_cq_call; uint64_t s_ib_rx_cq_event; uint64_t s_ib_rx_ring_empty; uint64_t s_ib_rx_refill_from_cq; @@ -216,12 +250,18 @@ struct rds_ib_statistics { uint64_t s_ib_ack_send_delayed; uint64_t s_ib_ack_send_piggybacked; uint64_t s_ib_ack_received; - uint64_t s_ib_rdma_mr_alloc; - uint64_t s_ib_rdma_mr_free; - uint64_t s_ib_rdma_mr_used; - uint64_t s_ib_rdma_mr_pool_flush; - uint64_t s_ib_rdma_mr_pool_wait; - uint64_t s_ib_rdma_mr_pool_depleted; + uint64_t s_ib_rdma_mr_8k_alloc; + uint64_t s_ib_rdma_mr_8k_free; + uint64_t s_ib_rdma_mr_8k_used; + uint64_t s_ib_rdma_mr_8k_pool_flush; + uint64_t s_ib_rdma_mr_8k_pool_wait; + uint64_t s_ib_rdma_mr_8k_pool_depleted; + uint64_t s_ib_rdma_mr_1m_alloc; + uint64_t s_ib_rdma_mr_1m_free; + uint64_t s_ib_rdma_mr_1m_used; + uint64_t s_ib_rdma_mr_1m_pool_flush; + uint64_t s_ib_rdma_mr_1m_pool_wait; + uint64_t s_ib_rdma_mr_1m_pool_depleted; uint64_t s_ib_atomic_cswp; uint64_t s_ib_atomic_fadd; }; @@ -273,7 +313,8 @@ struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device); void rds_ib_dev_put(struct rds_ib_device *rds_ibdev); extern struct ib_client rds_ib_client; -extern unsigned int fmr_message_size; +extern unsigned int rds_ib_fmr_1m_pool_size; +extern unsigned int rds_ib_fmr_8k_pool_size; extern unsigned int rds_ib_retry_count; extern spinlock_t ib_nodev_conns_lock; @@ -303,7 +344,8 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr); void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn); void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn); void rds_ib_destroy_nodev_conns(void); -struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *); +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev, + int npages); void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo); void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *); void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, @@ -323,7 +365,8 @@ void rds_ib_recv_free_caches(struct rds_ib_connection *ic); void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp); void rds_ib_inc_free(struct rds_incoming *inc); int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to); -void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context); +void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc, + struct rds_ib_ack_state *state); void rds_ib_recv_tasklet_fn(unsigned long data); void rds_ib_recv_init_ring(struct rds_ib_connection *ic); void rds_ib_recv_clear_ring(struct rds_ib_connection *ic); @@ -331,6 +374,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic); void rds_ib_attempt_ack(struct rds_ib_connection *ic); void rds_ib_ack_send_complete(struct rds_ib_connection *ic); u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic); +void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required); /* ib_ring.c */ void rds_ib_ring_init(struct rds_ib_work_ring *ring, u32 nr); @@ -348,7 +392,7 @@ extern wait_queue_head_t rds_ib_ring_empty_wait; void rds_ib_xmit_complete(struct rds_connection *conn); int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); -void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context); +void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc); void rds_ib_send_init_ring(struct rds_ib_connection *ic); void rds_ib_send_clear_ring(struct rds_ib_connection *ic); int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op); diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 9043f5c..2b2370e 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -216,6 +216,96 @@ static void rds_ib_cq_event_handler(struct ib_event *event, void *data) event->event, ib_event_msg(event->event), data); } +/* Plucking the oldest entry from the ring can be done concurrently with + * the thread refilling the ring. Each ring operation is protected by + * spinlocks and the transient state of refilling doesn't change the + * recording of which entry is oldest. + * + * This relies on IB only calling one cq comp_handler for each cq so that + * there will only be one caller of rds_recv_incoming() per RDS connection. + */ +static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context) +{ + struct rds_connection *conn = context; + struct rds_ib_connection *ic = conn->c_transport_data; + + rdsdebug("conn %p cq %p\n", conn, cq); + + rds_ib_stats_inc(s_ib_evt_handler_call); + + tasklet_schedule(&ic->i_recv_tasklet); +} + +static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, + struct ib_wc *wcs, + struct rds_ib_ack_state *ack_state) +{ + int nr; + int i; + struct ib_wc *wc; + + while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) { + for (i = 0; i < nr; i++) { + wc = wcs + i; + rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", + (unsigned long long)wc->wr_id, wc->status, + wc->byte_len, be32_to_cpu(wc->ex.imm_data)); + + if (wc->wr_id & RDS_IB_SEND_OP) + rds_ib_send_cqe_handler(ic, wc); + else + rds_ib_recv_cqe_handler(ic, wc, ack_state); + } + } +} + +static void rds_ib_tasklet_fn_send(unsigned long data) +{ + struct rds_ib_connection *ic = (struct rds_ib_connection *)data; + struct rds_connection *conn = ic->conn; + struct rds_ib_ack_state state; + + rds_ib_stats_inc(s_ib_tasklet_call); + + memset(&state, 0, sizeof(state)); + poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state); + ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP); + poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state); + + if (rds_conn_up(conn) && + (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || + test_bit(0, &conn->c_map_queued))) + rds_send_xmit(ic->conn); +} + +static void rds_ib_tasklet_fn_recv(unsigned long data) +{ + struct rds_ib_connection *ic = (struct rds_ib_connection *)data; + struct rds_connection *conn = ic->conn; + struct rds_ib_device *rds_ibdev = ic->rds_ibdev; + struct rds_ib_ack_state state; + + if (!rds_ibdev) + rds_conn_drop(conn); + + rds_ib_stats_inc(s_ib_tasklet_call); + + memset(&state, 0, sizeof(state)); + poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state); + ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED); + poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state); + + if (state.ack_next_valid) + rds_ib_set_ack(ic, state.ack_next, state.ack_required); + if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) { + rds_send_drop_acked(conn, state.ack_recv, NULL); + ic->i_ack_recv = state.ack_recv; + } + + if (rds_conn_up(conn)) + rds_ib_attempt_ack(ic); +} + static void rds_ib_qp_event_handler(struct ib_event *event, void *data) { struct rds_connection *conn = data; @@ -238,6 +328,18 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data) } } +static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context) +{ + struct rds_connection *conn = context; + struct rds_ib_connection *ic = conn->c_transport_data; + + rdsdebug("conn %p cq %p\n", conn, cq); + + rds_ib_stats_inc(s_ib_evt_handler_call); + + tasklet_schedule(&ic->i_send_tasklet); +} + /* * This needs to be very careful to not leave IS_ERR pointers around for * cleanup to trip over. @@ -271,7 +373,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn) ic->i_pd = rds_ibdev->pd; cq_attr.cqe = ic->i_send_ring.w_nr + 1; - ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler, + + ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send, rds_ib_cq_event_handler, conn, &cq_attr); if (IS_ERR(ic->i_send_cq)) { @@ -282,7 +385,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn) } cq_attr.cqe = ic->i_recv_ring.w_nr; - ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler, + ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv, rds_ib_cq_event_handler, conn, &cq_attr); if (IS_ERR(ic->i_recv_cq)) { @@ -637,6 +740,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) wait_event(rds_ib_ring_empty_wait, rds_ib_ring_empty(&ic->i_recv_ring) && (atomic_read(&ic->i_signaled_sends) == 0)); + tasklet_kill(&ic->i_send_tasklet); tasklet_kill(&ic->i_recv_tasklet); /* first destroy the ib state that generates callbacks */ @@ -743,8 +847,10 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) } INIT_LIST_HEAD(&ic->ib_node); - tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn, - (unsigned long) ic); + tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send, + (unsigned long)ic); + tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv, + (unsigned long)ic); mutex_init(&ic->i_recv_mutex); #ifndef KERNEL_HAS_ATOMIC64 spin_lock_init(&ic->i_ack_lock); diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c index 251d1ce..a234074 100644 --- a/net/rds/ib_rdma.c +++ b/net/rds/ib_rdma.c @@ -65,6 +65,7 @@ struct rds_ib_mr { * Our own little FMR pool */ struct rds_ib_mr_pool { + unsigned int pool_type; struct mutex flush_lock; /* serialize fmr invalidate */ struct delayed_work flush_worker; /* flush worker */ @@ -83,7 +84,7 @@ struct rds_ib_mr_pool { struct ib_fmr_attr fmr_attr; }; -struct workqueue_struct *rds_ib_fmr_wq; +static struct workqueue_struct *rds_ib_fmr_wq; int rds_ib_fmr_init(void) { @@ -159,10 +160,8 @@ static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) } spin_unlock_irq(&rds_ibdev->spinlock); - if (to_free) { - synchronize_rcu(); - kfree(to_free); - } + if (to_free) + kfree_rcu(to_free, rcu); } int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) @@ -236,7 +235,8 @@ void rds_ib_destroy_nodev_conns(void) rds_conn_destroy(ic->conn); } -struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev) +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev, + int pool_type) { struct rds_ib_mr_pool *pool; @@ -244,6 +244,7 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev) if (!pool) return ERR_PTR(-ENOMEM); + pool->pool_type = pool_type; init_llist_head(&pool->free_list); init_llist_head(&pool->drop_list); init_llist_head(&pool->clean_list); @@ -251,28 +252,30 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev) init_waitqueue_head(&pool->flush_wait); INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker); - pool->fmr_attr.max_pages = fmr_message_size; + if (pool_type == RDS_IB_MR_1M_POOL) { + /* +1 allows for unaligned MRs */ + pool->fmr_attr.max_pages = RDS_FMR_1M_MSG_SIZE + 1; + pool->max_items = RDS_FMR_1M_POOL_SIZE; + } else { + /* pool_type == RDS_IB_MR_8K_POOL */ + pool->fmr_attr.max_pages = RDS_FMR_8K_MSG_SIZE + 1; + pool->max_items = RDS_FMR_8K_POOL_SIZE; + } + + pool->max_free_pinned = pool->max_items * pool->fmr_attr.max_pages / 4; pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps; pool->fmr_attr.page_shift = PAGE_SHIFT; - pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4; - - /* We never allow more than max_items MRs to be allocated. - * When we exceed more than max_items_soft, we start freeing - * items more aggressively. - * Make sure that max_items > max_items_soft > max_items / 2 - */ pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4; - pool->max_items = rds_ibdev->max_fmrs; return pool; } void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo) { - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool; - iinfo->rdma_mr_max = pool->max_items; - iinfo->rdma_mr_size = pool->fmr_attr.max_pages; + iinfo->rdma_mr_max = pool_1m->max_items; + iinfo->rdma_mr_size = pool_1m->fmr_attr.max_pages; } void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool) @@ -314,14 +317,28 @@ static inline void wait_clean_list_grace(void) } } -static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) +static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev, + int npages) { - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + struct rds_ib_mr_pool *pool; struct rds_ib_mr *ibmr = NULL; int err = 0, iter = 0; + if (npages <= RDS_FMR_8K_MSG_SIZE) + pool = rds_ibdev->mr_8k_pool; + else + pool = rds_ibdev->mr_1m_pool; + if (atomic_read(&pool->dirty_count) >= pool->max_items / 10) - schedule_delayed_work(&pool->flush_worker, 10); + queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10); + + /* Switch pools if one of the pool is reaching upper limit */ + if (atomic_read(&pool->dirty_count) >= pool->max_items * 9 / 10) { + if (pool->pool_type == RDS_IB_MR_8K_POOL) + pool = rds_ibdev->mr_1m_pool; + else + pool = rds_ibdev->mr_8k_pool; + } while (1) { ibmr = rds_ib_reuse_fmr(pool); @@ -343,12 +360,18 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) atomic_dec(&pool->item_count); if (++iter > 2) { - rds_ib_stats_inc(s_ib_rdma_mr_pool_depleted); + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted); return ERR_PTR(-EAGAIN); } /* We do have some empty MRs. Flush them out. */ - rds_ib_stats_inc(s_ib_rdma_mr_pool_wait); + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait); rds_ib_flush_mr_pool(pool, 0, &ibmr); if (ibmr) return ibmr; @@ -373,7 +396,12 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) goto out_no_cigar; } - rds_ib_stats_inc(s_ib_rdma_mr_alloc); + ibmr->pool = pool; + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_alloc); + return ibmr; out_no_cigar: @@ -429,7 +457,7 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm } page_cnt += len >> PAGE_SHIFT; - if (page_cnt > fmr_message_size) + if (page_cnt > ibmr->pool->fmr_attr.max_pages) return -EINVAL; dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC, @@ -461,7 +489,10 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm ibmr->sg_dma_len = sg_dma_len; ibmr->remap_count++; - rds_ib_stats_inc(s_ib_rdma_mr_used); + if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_used); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_used); ret = 0; out: @@ -524,8 +555,7 @@ static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr) __rds_ib_teardown_mr(ibmr); if (pinned) { - struct rds_ib_device *rds_ibdev = ibmr->device; - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + struct rds_ib_mr_pool *pool = ibmr->pool; atomic_sub(pinned, &pool->free_pinned); } @@ -594,7 +624,7 @@ static void list_to_llist_nodes(struct rds_ib_mr_pool *pool, * to free as many MRs as needed to get back to this limit. */ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, - int free_all, struct rds_ib_mr **ibmr_ret) + int free_all, struct rds_ib_mr **ibmr_ret) { struct rds_ib_mr *ibmr, *next; struct llist_node *clean_nodes; @@ -605,11 +635,14 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, unsigned int nfreed = 0, dirty_to_clean = 0, free_goal; int ret = 0; - rds_ib_stats_inc(s_ib_rdma_mr_pool_flush); + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_flush); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_flush); if (ibmr_ret) { DEFINE_WAIT(wait); - while(!mutex_trylock(&pool->flush_lock)) { + while (!mutex_trylock(&pool->flush_lock)) { ibmr = rds_ib_reuse_fmr(pool); if (ibmr) { *ibmr_ret = ibmr; @@ -666,8 +699,12 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) { unpinned += ibmr->sg_len; __rds_ib_teardown_mr(ibmr); - if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) { - rds_ib_stats_inc(s_ib_rdma_mr_free); + if (nfreed < free_goal || + ibmr->remap_count >= pool->fmr_attr.max_maps) { + if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_free); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_free); list_del(&ibmr->unmap_list); ib_dealloc_fmr(ibmr->fmr); kfree(ibmr); @@ -719,8 +756,8 @@ static void rds_ib_mr_pool_flush_worker(struct work_struct *work) void rds_ib_free_mr(void *trans_private, int invalidate) { struct rds_ib_mr *ibmr = trans_private; + struct rds_ib_mr_pool *pool = ibmr->pool; struct rds_ib_device *rds_ibdev = ibmr->device; - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len); @@ -759,10 +796,11 @@ void rds_ib_flush_mrs(void) down_read(&rds_ib_devices_lock); list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + if (rds_ibdev->mr_8k_pool) + rds_ib_flush_mr_pool(rds_ibdev->mr_8k_pool, 0, NULL); - if (pool) - rds_ib_flush_mr_pool(pool, 0, NULL); + if (rds_ibdev->mr_1m_pool) + rds_ib_flush_mr_pool(rds_ibdev->mr_1m_pool, 0, NULL); } up_read(&rds_ib_devices_lock); } @@ -780,12 +818,12 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, goto out; } - if (!rds_ibdev->mr_pool) { + if (!rds_ibdev->mr_8k_pool || !rds_ibdev->mr_1m_pool) { ret = -ENODEV; goto out; } - ibmr = rds_ib_alloc_fmr(rds_ibdev); + ibmr = rds_ib_alloc_fmr(rds_ibdev, nents); if (IS_ERR(ibmr)) { rds_ib_dev_put(rds_ibdev); return ibmr; diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index f43831e..96744b7 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -596,8 +596,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic) * wr_id and avoids working with the ring in that case. */ #ifndef KERNEL_HAS_ATOMIC64 -static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, - int ack_required) +void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required) { unsigned long flags; @@ -622,8 +621,7 @@ static u64 rds_ib_get_ack(struct rds_ib_connection *ic) return seq; } #else -static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, - int ack_required) +void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required) { atomic64_set(&ic->i_ack_next, seq); if (ack_required) { @@ -830,20 +828,6 @@ static void rds_ib_cong_recv(struct rds_connection *conn, rds_cong_map_updated(map, uncongested); } -/* - * Rings are posted with all the allocations they'll need to queue the - * incoming message to the receiving socket so this can't fail. - * All fragments start with a header, so we can make sure we're not receiving - * garbage, and we can tell a small 8 byte fragment from an ACK frame. - */ -struct rds_ib_ack_state { - u64 ack_next; - u64 ack_recv; - unsigned int ack_required:1; - unsigned int ack_next_valid:1; - unsigned int ack_recv_valid:1; -}; - static void rds_ib_process_recv(struct rds_connection *conn, struct rds_ib_recv_work *recv, u32 data_len, struct rds_ib_ack_state *state) @@ -969,96 +953,50 @@ static void rds_ib_process_recv(struct rds_connection *conn, } } -/* - * Plucking the oldest entry from the ring can be done concurrently with - * the thread refilling the ring. Each ring operation is protected by - * spinlocks and the transient state of refilling doesn't change the - * recording of which entry is oldest. - * - * This relies on IB only calling one cq comp_handler for each cq so that - * there will only be one caller of rds_recv_incoming() per RDS connection. - */ -void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context) -{ - struct rds_connection *conn = context; - struct rds_ib_connection *ic = conn->c_transport_data; - - rdsdebug("conn %p cq %p\n", conn, cq); - - rds_ib_stats_inc(s_ib_rx_cq_call); - - tasklet_schedule(&ic->i_recv_tasklet); -} - -static inline void rds_poll_cq(struct rds_ib_connection *ic, - struct rds_ib_ack_state *state) +void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, + struct ib_wc *wc, + struct rds_ib_ack_state *state) { struct rds_connection *conn = ic->conn; - struct ib_wc wc; struct rds_ib_recv_work *recv; - while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) { - rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", - (unsigned long long)wc.wr_id, wc.status, - ib_wc_status_msg(wc.status), wc.byte_len, - be32_to_cpu(wc.ex.imm_data)); - rds_ib_stats_inc(s_ib_rx_cq_event); + rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", + (unsigned long long)wc->wr_id, wc->status, + ib_wc_status_msg(wc->status), wc->byte_len, + be32_to_cpu(wc->ex.imm_data)); - recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)]; - - ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE); - - /* - * Also process recvs in connecting state because it is possible - * to get a recv completion _before_ the rdmacm ESTABLISHED - * event is processed. - */ - if (wc.status == IB_WC_SUCCESS) { - rds_ib_process_recv(conn, recv, wc.byte_len, state); - } else { - /* We expect errors as the qp is drained during shutdown */ - if (rds_conn_up(conn) || rds_conn_connecting(conn)) - rds_ib_conn_error(conn, "recv completion on %pI4 had " - "status %u (%s), disconnecting and " - "reconnecting\n", &conn->c_faddr, - wc.status, - ib_wc_status_msg(wc.status)); - } + rds_ib_stats_inc(s_ib_rx_cq_event); + recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)]; + ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, + DMA_FROM_DEVICE); - /* - * rds_ib_process_recv() doesn't always consume the frag, and - * we might not have called it at all if the wc didn't indicate - * success. We already unmapped the frag's pages, though, and - * the following rds_ib_ring_free() call tells the refill path - * that it will not find an allocated frag here. Make sure we - * keep that promise by freeing a frag that's still on the ring. - */ - if (recv->r_frag) { - rds_ib_frag_free(ic, recv->r_frag); - recv->r_frag = NULL; - } - rds_ib_ring_free(&ic->i_recv_ring, 1); + /* Also process recvs in connecting state because it is possible + * to get a recv completion _before_ the rdmacm ESTABLISHED + * event is processed. + */ + if (wc->status == IB_WC_SUCCESS) { + rds_ib_process_recv(conn, recv, wc->byte_len, state); + } else { + /* We expect errors as the qp is drained during shutdown */ + if (rds_conn_up(conn) || rds_conn_connecting(conn)) + rds_ib_conn_error(conn, "recv completion on %pI4 had status %u (%s), disconnecting and reconnecting\n", + &conn->c_faddr, + wc->status, + ib_wc_status_msg(wc->status)); } -} -void rds_ib_recv_tasklet_fn(unsigned long data) -{ - struct rds_ib_connection *ic = (struct rds_ib_connection *) data; - struct rds_connection *conn = ic->conn; - struct rds_ib_ack_state state = { 0, }; - - rds_poll_cq(ic, &state); - ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED); - rds_poll_cq(ic, &state); - - if (state.ack_next_valid) - rds_ib_set_ack(ic, state.ack_next, state.ack_required); - if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) { - rds_send_drop_acked(conn, state.ack_recv, NULL); - ic->i_ack_recv = state.ack_recv; + /* rds_ib_process_recv() doesn't always consume the frag, and + * we might not have called it at all if the wc didn't indicate + * success. We already unmapped the frag's pages, though, and + * the following rds_ib_ring_free() call tells the refill path + * that it will not find an allocated frag here. Make sure we + * keep that promise by freeing a frag that's still on the ring. + */ + if (recv->r_frag) { + rds_ib_frag_free(ic, recv->r_frag); + recv->r_frag = NULL; } - if (rds_conn_up(conn)) - rds_ib_attempt_ack(ic); + rds_ib_ring_free(&ic->i_recv_ring, 1); /* If we ever end up with a really empty receive ring, we're * in deep trouble, as the sender will definitely see RNR diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 4e88047..670882c 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -195,7 +195,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic) send->s_op = NULL; - send->s_wr.wr_id = i; + send->s_wr.wr_id = i | RDS_IB_SEND_OP; send->s_wr.sg_list = send->s_sge; send->s_wr.ex.imm_data = 0; @@ -237,81 +237,73 @@ static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr) * unallocs the next free entry in the ring it doesn't alter which is * the next to be freed, which is what this is concerned with. */ -void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) +void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc) { - struct rds_connection *conn = context; - struct rds_ib_connection *ic = conn->c_transport_data; struct rds_message *rm = NULL; - struct ib_wc wc; + struct rds_connection *conn = ic->conn; struct rds_ib_send_work *send; u32 completed; u32 oldest; u32 i = 0; - int ret; int nr_sig = 0; - rdsdebug("cq %p conn %p\n", cq, conn); - rds_ib_stats_inc(s_ib_tx_cq_call); - ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP); - if (ret) - rdsdebug("ib_req_notify_cq send failed: %d\n", ret); - - while (ib_poll_cq(cq, 1, &wc) > 0) { - rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", - (unsigned long long)wc.wr_id, wc.status, - ib_wc_status_msg(wc.status), wc.byte_len, - be32_to_cpu(wc.ex.imm_data)); - rds_ib_stats_inc(s_ib_tx_cq_event); - - if (wc.wr_id == RDS_IB_ACK_WR_ID) { - if (time_after(jiffies, ic->i_ack_queued + HZ/2)) - rds_ib_stats_inc(s_ib_tx_stalled); - rds_ib_ack_send_complete(ic); - continue; - } - oldest = rds_ib_ring_oldest(&ic->i_send_ring); + rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", + (unsigned long long)wc->wr_id, wc->status, + ib_wc_status_msg(wc->status), wc->byte_len, + be32_to_cpu(wc->ex.imm_data)); + rds_ib_stats_inc(s_ib_tx_cq_event); - completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest); + if (wc->wr_id == RDS_IB_ACK_WR_ID) { + if (time_after(jiffies, ic->i_ack_queued + HZ / 2)) + rds_ib_stats_inc(s_ib_tx_stalled); + rds_ib_ack_send_complete(ic); + return; + } - for (i = 0; i < completed; i++) { - send = &ic->i_sends[oldest]; - if (send->s_wr.send_flags & IB_SEND_SIGNALED) - nr_sig++; + oldest = rds_ib_ring_oldest(&ic->i_send_ring); - rm = rds_ib_send_unmap_op(ic, send, wc.status); + completed = rds_ib_ring_completed(&ic->i_send_ring, + (wc->wr_id & ~RDS_IB_SEND_OP), + oldest); - if (time_after(jiffies, send->s_queued + HZ/2)) - rds_ib_stats_inc(s_ib_tx_stalled); + for (i = 0; i < completed; i++) { + send = &ic->i_sends[oldest]; + if (send->s_wr.send_flags & IB_SEND_SIGNALED) + nr_sig++; - if (send->s_op) { - if (send->s_op == rm->m_final_op) { - /* If anyone waited for this message to get flushed out, wake - * them up now */ - rds_message_unmapped(rm); - } - rds_message_put(rm); - send->s_op = NULL; - } + rm = rds_ib_send_unmap_op(ic, send, wc->status); - oldest = (oldest + 1) % ic->i_send_ring.w_nr; - } + if (time_after(jiffies, send->s_queued + HZ / 2)) + rds_ib_stats_inc(s_ib_tx_stalled); - rds_ib_ring_free(&ic->i_send_ring, completed); - rds_ib_sub_signaled(ic, nr_sig); - nr_sig = 0; - - if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || - test_bit(0, &conn->c_map_queued)) - queue_delayed_work(rds_wq, &conn->c_send_w, 0); - - /* We expect errors as the qp is drained during shutdown */ - if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) { - rds_ib_conn_error(conn, "send completion on %pI4 had status " - "%u (%s), disconnecting and reconnecting\n", - &conn->c_faddr, wc.status, - ib_wc_status_msg(wc.status)); + if (send->s_op) { + if (send->s_op == rm->m_final_op) { + /* If anyone waited for this message to get + * flushed out, wake them up now + */ + rds_message_unmapped(rm); + } + rds_message_put(rm); + send->s_op = NULL; } + + oldest = (oldest + 1) % ic->i_send_ring.w_nr; + } + + rds_ib_ring_free(&ic->i_send_ring, completed); + rds_ib_sub_signaled(ic, nr_sig); + nr_sig = 0; + + if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || + test_bit(0, &conn->c_map_queued)) + queue_delayed_work(rds_wq, &conn->c_send_w, 0); + + /* We expect errors as the qp is drained during shutdown */ + if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) { + rds_ib_conn_error(conn, "send completion on %pI4 had status %u (%s), disconnecting and reconnecting\n", + &conn->c_faddr, wc->status, + ib_wc_status_msg(wc->status)); } } diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c index 2d5965d..d77e044 100644 --- a/net/rds/ib_stats.c +++ b/net/rds/ib_stats.c @@ -42,14 +42,14 @@ DEFINE_PER_CPU_SHARED_ALIGNED(struct rds_ib_statistics, rds_ib_stats); static const char *const rds_ib_stat_names[] = { "ib_connect_raced", "ib_listen_closed_stale", - "ib_tx_cq_call", + "s_ib_evt_handler_call", + "ib_tasklet_call", "ib_tx_cq_event", "ib_tx_ring_full", "ib_tx_throttle", "ib_tx_sg_mapping_failure", "ib_tx_stalled", "ib_tx_credit_updates", - "ib_rx_cq_call", "ib_rx_cq_event", "ib_rx_ring_empty", "ib_rx_refill_from_cq", @@ -61,12 +61,18 @@ static const char *const rds_ib_stat_names[] = { "ib_ack_send_delayed", "ib_ack_send_piggybacked", "ib_ack_received", - "ib_rdma_mr_alloc", - "ib_rdma_mr_free", - "ib_rdma_mr_used", - "ib_rdma_mr_pool_flush", - "ib_rdma_mr_pool_wait", - "ib_rdma_mr_pool_depleted", + "ib_rdma_mr_8k_alloc", + "ib_rdma_mr_8k_free", + "ib_rdma_mr_8k_used", + "ib_rdma_mr_8k_pool_flush", + "ib_rdma_mr_8k_pool_wait", + "ib_rdma_mr_8k_pool_depleted", + "ib_rdma_mr_1m_alloc", + "ib_rdma_mr_1m_free", + "ib_rdma_mr_1m_used", + "ib_rdma_mr_1m_pool_flush", + "ib_rdma_mr_1m_pool_wait", + "ib_rdma_mr_1m_pool_depleted", "ib_atomic_cswp", "ib_atomic_fadd", }; diff --git a/net/rds/rds.h b/net/rds/rds.h index b4c7ac0..543c308 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -605,6 +605,7 @@ extern wait_queue_head_t rds_poll_waitq; int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len); void rds_remove_bound(struct rds_sock *rs); struct rds_sock *rds_find_bound(__be32 addr, __be16 port); +void rds_bind_lock_init(void); /* cong.c */ int rds_cong_get_maps(struct rds_connection *conn); diff --git a/net/rds/send.c b/net/rds/send.c index 4df61a5..ee49c25 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -38,6 +38,7 @@ #include <linux/list.h> #include <linux/ratelimit.h> #include <linux/export.h> +#include <linux/sizes.h> #include "rds.h" @@ -51,7 +52,7 @@ * it to 0 will restore the old behavior (where we looped until we had * drained the queue). */ -static int send_batch_count = 64; +static int send_batch_count = SZ_1K; module_param(send_batch_count, int, 0444); MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); @@ -223,7 +224,7 @@ restart: * through a lot of messages, lets back off and see * if anyone else jumps in */ - if (batch_count >= 1024) + if (batch_count >= send_batch_count) goto over_batch; spin_lock_irqsave(&conn->c_lock, flags); @@ -423,12 +424,15 @@ over_batch: !list_empty(&conn->c_send_queue)) && send_gen == conn->c_send_gen) { rds_stats_inc(s_send_lock_queue_raced); - goto restart; + if (batch_count < send_batch_count) + goto restart; + queue_delayed_work(rds_wq, &conn->c_send_w, 1); } } out: return ret; } +EXPORT_SYMBOL_GPL(rds_send_xmit); static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) { @@ -1120,8 +1124,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) */ rds_stats_inc(s_send_queued); - if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - rds_send_xmit(conn); + ret = rds_send_xmit(conn); + if (ret == -ENOMEM || ret == -EAGAIN) + queue_delayed_work(rds_wq, &conn->c_send_w, 1); rds_message_put(rm); return payload_len; @@ -1177,8 +1182,9 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) rds_stats_inc(s_send_queued); rds_stats_inc(s_send_pong); - if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + ret = rds_send_xmit(conn); + if (ret == -ENOMEM || ret == -EAGAIN) + queue_delayed_work(rds_wq, &conn->c_send_w, 1); rds_message_put(rm); return 0; diff --git a/net/rds/threads.c b/net/rds/threads.c index dc2402e..454aa6d 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -162,7 +162,9 @@ void rds_send_worker(struct work_struct *work) int ret; if (rds_conn_state(conn) == RDS_CONN_UP) { + clear_bit(RDS_LL_SEND_FULL, &conn->c_flags); ret = rds_send_xmit(conn); + cond_resched(); rdsdebug("conn %p ret %d\n", conn, ret); switch (ret) { case -EAGAIN: |