diff options
author | Björn Töpel <bjorn.topel@intel.com> | 2018-05-02 13:01:27 +0200 |
---|---|---|
committer | Alexei Starovoitov <ast@kernel.org> | 2018-05-03 15:55:24 -0700 |
commit | c497176cb2e478f0a5713b0e05f242276e3194b5 (patch) | |
tree | c23d28e7cfc7dbdd26498885cfe86da01436c797 | |
parent | 965a990984432cd01a9eb3514c64d86f56704295 (diff) | |
download | op-kernel-dev-c497176cb2e478f0a5713b0e05f242276e3194b5.zip op-kernel-dev-c497176cb2e478f0a5713b0e05f242276e3194b5.tar.gz |
xsk: add Rx receive functions and poll support
Here the actual receive functions of AF_XDP are implemented, that in a
later commit, will be called from the XDP layers.
There's one set of functions for the XDP_DRV side and another for
XDP_SKB (generic).
A new XDP API, xdp_return_buff, is also introduced.
Adding xdp_return_buff, which is analogous to xdp_return_frame, but
acts upon an struct xdp_buff. The API will be used by AF_XDP in future
commits.
Support for the poll syscall is also implemented.
v2: xskq_validate_id did not update cons_tail.
The entries variable was calculated twice in xskq_nb_avail.
Squashed xdp_return_buff commit.
Signed-off-by: Björn Töpel <bjorn.topel@intel.com>
Signed-off-by: Alexei Starovoitov <ast@kernel.org>
-rw-r--r-- | include/net/xdp.h | 1 | ||||
-rw-r--r-- | include/net/xdp_sock.h | 22 | ||||
-rw-r--r-- | net/core/xdp.c | 15 | ||||
-rw-r--r-- | net/xdp/xdp_umem.h | 18 | ||||
-rw-r--r-- | net/xdp/xsk.c | 73 | ||||
-rw-r--r-- | net/xdp/xsk_queue.h | 114 |
6 files changed, 238 insertions, 5 deletions
diff --git a/include/net/xdp.h b/include/net/xdp.h index 137ad5f..0b689cf5 100644 --- a/include/net/xdp.h +++ b/include/net/xdp.h @@ -104,6 +104,7 @@ struct xdp_frame *convert_to_xdp_frame(struct xdp_buff *xdp) } void xdp_return_frame(struct xdp_frame *xdpf); +void xdp_return_buff(struct xdp_buff *xdp); int xdp_rxq_info_reg(struct xdp_rxq_info *xdp_rxq, struct net_device *dev, u32 queue_index); diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index 85d0251..a0342df 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -31,6 +31,28 @@ struct xdp_sock { u16 queue_id; /* Protects multiple processes in the control path */ struct mutex mutex; + u64 rx_dropped; }; +struct xdp_buff; +#ifdef CONFIG_XDP_SOCKETS +int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp); +int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp); +void xsk_flush(struct xdp_sock *xs); +#else +static inline int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) +{ + return -ENOTSUPP; +} + +static inline int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) +{ + return -ENOTSUPP; +} + +static inline void xsk_flush(struct xdp_sock *xs) +{ +} +#endif /* CONFIG_XDP_SOCKETS */ + #endif /* _LINUX_XDP_SOCK_H */ diff --git a/net/core/xdp.c b/net/core/xdp.c index 0c86b53..bf6758f 100644 --- a/net/core/xdp.c +++ b/net/core/xdp.c @@ -308,11 +308,9 @@ err: } EXPORT_SYMBOL_GPL(xdp_rxq_info_reg_mem_model); -void xdp_return_frame(struct xdp_frame *xdpf) +static void xdp_return(void *data, struct xdp_mem_info *mem) { - struct xdp_mem_info *mem = &xdpf->mem; struct xdp_mem_allocator *xa; - void *data = xdpf->data; struct page *page; switch (mem->type) { @@ -339,4 +337,15 @@ void xdp_return_frame(struct xdp_frame *xdpf) break; } } + +void xdp_return_frame(struct xdp_frame *xdpf) +{ + xdp_return(xdpf->data, &xdpf->mem); +} EXPORT_SYMBOL_GPL(xdp_return_frame); + +void xdp_return_buff(struct xdp_buff *xdp) +{ + xdp_return(xdp->data, &xdp->rxq->mem); +} +EXPORT_SYMBOL_GPL(xdp_return_buff); diff --git a/net/xdp/xdp_umem.h b/net/xdp/xdp_umem.h index b13133e..c7378a1 100644 --- a/net/xdp/xdp_umem.h +++ b/net/xdp/xdp_umem.h @@ -39,6 +39,24 @@ struct xdp_umem { struct work_struct work; }; +static inline char *xdp_umem_get_data(struct xdp_umem *umem, u32 idx) +{ + u64 pg, off; + char *data; + + pg = idx >> umem->nfpplog2; + off = (idx & umem->nfpp_mask) << umem->frame_size_log2; + + data = page_address(umem->pgs[pg]); + return data + off; +} + +static inline char *xdp_umem_get_data_with_headroom(struct xdp_umem *umem, + u32 idx) +{ + return xdp_umem_get_data(umem, idx) + umem->frame_headroom; +} + bool xdp_umem_validate_queues(struct xdp_umem *umem); int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr); void xdp_get_umem(struct xdp_umem *umem); diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index bf2c97b..4e1e6c5 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -41,6 +41,74 @@ static struct xdp_sock *xdp_sk(struct sock *sk) return (struct xdp_sock *)sk; } +static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) +{ + u32 *id, len = xdp->data_end - xdp->data; + void *buffer; + int err = 0; + + if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index) + return -EINVAL; + + id = xskq_peek_id(xs->umem->fq); + if (!id) + return -ENOSPC; + + buffer = xdp_umem_get_data_with_headroom(xs->umem, *id); + memcpy(buffer, xdp->data, len); + err = xskq_produce_batch_desc(xs->rx, *id, len, + xs->umem->frame_headroom); + if (!err) + xskq_discard_id(xs->umem->fq); + + return err; +} + +int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) +{ + int err; + + err = __xsk_rcv(xs, xdp); + if (likely(!err)) + xdp_return_buff(xdp); + else + xs->rx_dropped++; + + return err; +} + +void xsk_flush(struct xdp_sock *xs) +{ + xskq_produce_flush_desc(xs->rx); + xs->sk.sk_data_ready(&xs->sk); +} + +int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) +{ + int err; + + err = __xsk_rcv(xs, xdp); + if (!err) + xsk_flush(xs); + else + xs->rx_dropped++; + + return err; +} + +static unsigned int xsk_poll(struct file *file, struct socket *sock, + struct poll_table_struct *wait) +{ + unsigned int mask = datagram_poll(file, sock, wait); + struct sock *sk = sock->sk; + struct xdp_sock *xs = xdp_sk(sk); + + if (xs->rx && !xskq_empty_desc(xs->rx)) + mask |= POLLIN | POLLRDNORM; + + return mask; +} + static int xsk_init_queue(u32 entries, struct xsk_queue **queue, bool umem_queue) { @@ -179,6 +247,9 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) } else if (!xs->umem || !xdp_umem_validate_queues(xs->umem)) { err = -EINVAL; goto out_unlock; + } else { + /* This xsk has its own umem. */ + xskq_set_umem(xs->umem->fq, &xs->umem->props); } /* Rebind? */ @@ -330,7 +401,7 @@ static const struct proto_ops xsk_proto_ops = { .socketpair = sock_no_socketpair, .accept = sock_no_accept, .getname = sock_no_getname, - .poll = sock_no_poll, + .poll = xsk_poll, .ioctl = sock_no_ioctl, .listen = sock_no_listen, .shutdown = sock_no_shutdown, diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 9ddd2ee..0a9b92b 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -20,6 +20,8 @@ #include "xdp_umem_props.h" +#define RX_BATCH_SIZE 16 + struct xsk_queue { struct xdp_umem_props umem_props; u32 ring_mask; @@ -32,8 +34,118 @@ struct xsk_queue { u64 invalid_descs; }; +/* Common functions operating for both RXTX and umem queues */ + +static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) +{ + u32 entries = q->prod_tail - q->cons_tail; + + if (entries == 0) { + /* Refresh the local pointer */ + q->prod_tail = READ_ONCE(q->ring->producer); + entries = q->prod_tail - q->cons_tail; + } + + return (entries > dcnt) ? dcnt : entries; +} + +static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) +{ + u32 free_entries = q->nentries - (producer - q->cons_tail); + + if (free_entries >= dcnt) + return free_entries; + + /* Refresh the local tail pointer */ + q->cons_tail = READ_ONCE(q->ring->consumer); + return q->nentries - (producer - q->cons_tail); +} + +/* UMEM queue */ + +static inline bool xskq_is_valid_id(struct xsk_queue *q, u32 idx) +{ + if (unlikely(idx >= q->umem_props.nframes)) { + q->invalid_descs++; + return false; + } + return true; +} + +static inline u32 *xskq_validate_id(struct xsk_queue *q) +{ + while (q->cons_tail != q->cons_head) { + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; + unsigned int idx = q->cons_tail & q->ring_mask; + + if (xskq_is_valid_id(q, ring->desc[idx])) + return &ring->desc[idx]; + + q->cons_tail++; + } + + return NULL; +} + +static inline u32 *xskq_peek_id(struct xsk_queue *q) +{ + struct xdp_umem_ring *ring; + + if (q->cons_tail == q->cons_head) { + WRITE_ONCE(q->ring->consumer, q->cons_tail); + q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); + + /* Order consumer and data */ + smp_rmb(); + + return xskq_validate_id(q); + } + + ring = (struct xdp_umem_ring *)q->ring; + return &ring->desc[q->cons_tail & q->ring_mask]; +} + +static inline void xskq_discard_id(struct xsk_queue *q) +{ + q->cons_tail++; + (void)xskq_validate_id(q); +} + +/* Rx queue */ + +static inline int xskq_produce_batch_desc(struct xsk_queue *q, + u32 id, u32 len, u16 offset) +{ + struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; + unsigned int idx; + + if (xskq_nb_free(q, q->prod_head, 1) == 0) + return -ENOSPC; + + idx = (q->prod_head++) & q->ring_mask; + ring->desc[idx].idx = id; + ring->desc[idx].len = len; + ring->desc[idx].offset = offset; + + return 0; +} + +static inline void xskq_produce_flush_desc(struct xsk_queue *q) +{ + /* Order producer and data */ + smp_wmb(); + + q->prod_tail = q->prod_head, + WRITE_ONCE(q->ring->producer, q->prod_tail); +} + +static inline bool xskq_empty_desc(struct xsk_queue *q) +{ + return (xskq_nb_free(q, q->prod_tail, 1) == q->nentries); +} + void xskq_set_umem(struct xsk_queue *q, struct xdp_umem_props *umem_props); struct xsk_queue *xskq_create(u32 nentries, bool umem_queue); -void xskq_destroy(struct xsk_queue *q); +void xskq_destroy(struct xsk_queue *q_ops); #endif /* _LINUX_XSK_QUEUE_H */ |