From 9426bbc6de99b8649d897b94e8f5916b58195643 Mon Sep 17 00:00:00 2001
From: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Date: Tue, 6 Mar 2018 07:22:34 -0800
Subject: rds: use list structure to track information for zerocopy completion
 notification

Commit 401910db4cd4 ("rds: deliver zerocopy completion notification
with data") removes support fo r zerocopy completion notification
on the sk_error_queue, thus we no longer need to track the cookie
information in sk_buff structures.

This commit removes the struct sk_buff_head rs_zcookie_queue by
a simpler list that results in a smaller memory footprint as well
as more efficient memory_allocation time.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: Willem de Bruijn <willemb@google.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
---
 net/rds/af_rds.c  |  6 ++---
 net/rds/message.c | 77 +++++++++++++++++++++++++++++++++++--------------------
 net/rds/rds.h     | 23 ++++++++++++-----
 net/rds/recv.c    | 23 ++++++++++++-----
 4 files changed, 85 insertions(+), 44 deletions(-)

(limited to 'net/rds')

diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index f712610..ab751a1 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -77,7 +77,7 @@ static int rds_release(struct socket *sock)
 	rds_send_drop_to(rs, NULL);
 	rds_rdma_drop_keys(rs);
 	rds_notify_queue_get(rs, NULL);
-	__skb_queue_purge(&rs->rs_zcookie_queue);
+	rds_notify_msg_zcopy_purge(&rs->rs_zcookie_queue);
 
 	spin_lock_bh(&rds_sock_lock);
 	list_del_init(&rs->rs_item);
@@ -180,7 +180,7 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
 	}
 	if (!list_empty(&rs->rs_recv_queue) ||
 	    !list_empty(&rs->rs_notify_queue) ||
-	    !skb_queue_empty(&rs->rs_zcookie_queue))
+	    !list_empty(&rs->rs_zcookie_queue.zcookie_head))
 		mask |= (EPOLLIN | EPOLLRDNORM);
 	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
 		mask |= (EPOLLOUT | EPOLLWRNORM);
@@ -515,7 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
 	INIT_LIST_HEAD(&rs->rs_recv_queue);
 	INIT_LIST_HEAD(&rs->rs_notify_queue);
 	INIT_LIST_HEAD(&rs->rs_cong_list);
-	skb_queue_head_init(&rs->rs_zcookie_queue);
+	rds_message_zcopy_queue_init(&rs->rs_zcookie_queue);
 	spin_lock_init(&rs->rs_rdma_lock);
 	rs->rs_rdma_keys = RB_ROOT;
 	rs->rs_rx_traces = 0;
diff --git a/net/rds/message.c b/net/rds/message.c
index c36edbb..90dcdcf 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -48,7 +48,6 @@ static unsigned int	rds_exthdr_size[__RDS_EXTHDR_MAX] = {
 [RDS_EXTHDR_GEN_NUM]	= sizeof(u32),
 };
 
-
 void rds_message_addref(struct rds_message *rm)
 {
 	rdsdebug("addref rm %p ref %d\n", rm, refcount_read(&rm->m_refcount));
@@ -56,9 +55,9 @@ void rds_message_addref(struct rds_message *rm)
 }
 EXPORT_SYMBOL_GPL(rds_message_addref);
 
-static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
+static inline bool rds_zcookie_add(struct rds_msg_zcopy_info *info, u32 cookie)
 {
-	struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb;
+	struct rds_zcopy_cookies *ck = &info->zcookies;
 	int ncookies = ck->num;
 
 	if (ncookies == RDS_MAX_ZCOOKIES)
@@ -68,38 +67,61 @@ static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
 	return true;
 }
 
+struct rds_msg_zcopy_info *rds_info_from_znotifier(struct rds_znotifier *znotif)
+{
+	return container_of(znotif, struct rds_msg_zcopy_info, znotif);
+}
+
+void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *q)
+{
+	unsigned long flags;
+	LIST_HEAD(copy);
+	struct rds_msg_zcopy_info *info, *tmp;
+
+	spin_lock_irqsave(&q->lock, flags);
+	list_splice(&q->zcookie_head, &copy);
+	INIT_LIST_HEAD(&q->zcookie_head);
+	spin_unlock_irqrestore(&q->lock, flags);
+
+	list_for_each_entry_safe(info, tmp, &copy, rs_zcookie_next) {
+		list_del(&info->rs_zcookie_next);
+		kfree(info);
+	}
+}
+
 static void rds_rm_zerocopy_callback(struct rds_sock *rs,
 				     struct rds_znotifier *znotif)
 {
-	struct sk_buff *skb, *tail;
-	unsigned long flags;
-	struct sk_buff_head *q;
+	struct rds_msg_zcopy_info *info;
+	struct rds_msg_zcopy_queue *q;
 	u32 cookie = znotif->z_cookie;
 	struct rds_zcopy_cookies *ck;
+	struct list_head *head;
+	unsigned long flags;
 
+	mm_unaccount_pinned_pages(&znotif->z_mmp);
 	q = &rs->rs_zcookie_queue;
 	spin_lock_irqsave(&q->lock, flags);
-	tail = skb_peek_tail(q);
-
-	if (tail && skb_zcookie_add(tail, cookie)) {
-		spin_unlock_irqrestore(&q->lock, flags);
-		mm_unaccount_pinned_pages(&znotif->z_mmp);
-		consume_skb(rds_skb_from_znotifier(znotif));
-		/* caller invokes rds_wake_sk_sleep() */
-		return;
+	head = &q->zcookie_head;
+	if (!list_empty(head)) {
+		info = list_entry(head, struct rds_msg_zcopy_info,
+				  rs_zcookie_next);
+		if (info && rds_zcookie_add(info, cookie)) {
+			spin_unlock_irqrestore(&q->lock, flags);
+			kfree(rds_info_from_znotifier(znotif));
+			/* caller invokes rds_wake_sk_sleep() */
+			return;
+		}
 	}
 
-	skb = rds_skb_from_znotifier(znotif);
-	ck = (struct rds_zcopy_cookies *)skb->cb;
+	info = rds_info_from_znotifier(znotif);
+	ck = &info->zcookies;
 	memset(ck, 0, sizeof(*ck));
-	WARN_ON(!skb_zcookie_add(skb, cookie));
-
-	__skb_queue_tail(q, skb);
+	WARN_ON(!rds_zcookie_add(info, cookie));
+	list_add_tail(&q->zcookie_head, &info->rs_zcookie_next);
 
 	spin_unlock_irqrestore(&q->lock, flags);
 	/* caller invokes rds_wake_sk_sleep() */
-
-	mm_unaccount_pinned_pages(&znotif->z_mmp);
 }
 
 /*
@@ -340,7 +362,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
 	int ret = 0;
 	int length = iov_iter_count(from);
 	int total_copied = 0;
-	struct sk_buff *skb;
+	struct rds_msg_zcopy_info *info;
 
 	rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
 
@@ -350,12 +372,11 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
 	sg = rm->data.op_sg;
 	sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
 
-	skb = alloc_skb(0, GFP_KERNEL);
-	if (!skb)
+	info = kzalloc(sizeof(*info), GFP_KERNEL);
+	if (!info)
 		return -ENOMEM;
-	BUILD_BUG_ON(sizeof(skb->cb) < max_t(int, sizeof(struct rds_znotifier),
-					     sizeof(struct rds_zcopy_cookies)));
-	rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
+	INIT_LIST_HEAD(&info->rs_zcookie_next);
+	rm->data.op_mmp_znotifier = &info->znotif;
 	if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
 				    length)) {
 		ret = -ENOMEM;
@@ -389,7 +410,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
 	WARN_ON_ONCE(length != 0);
 	return ret;
 err:
-	consume_skb(skb);
+	kfree(info);
 	rm->data.op_mmp_znotifier = NULL;
 	return ret;
 }
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 33b1635..74cd27c 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -357,16 +357,27 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
 #define RDS_MSG_FLUSH		8
 
 struct rds_znotifier {
-	struct list_head	z_list;
 	struct mmpin		z_mmp;
 	u32			z_cookie;
 };
 
-#define	RDS_ZCOPY_SKB(__skb)	((struct rds_znotifier *)&((__skb)->cb[0]))
+struct rds_msg_zcopy_info {
+	struct list_head rs_zcookie_next;
+	union {
+		struct rds_znotifier znotif;
+		struct rds_zcopy_cookies zcookies;
+	};
+};
 
-static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z)
+struct rds_msg_zcopy_queue {
+	struct list_head zcookie_head;
+	spinlock_t lock; /* protects zcookie_head queue */
+};
+
+static inline void rds_message_zcopy_queue_init(struct rds_msg_zcopy_queue *q)
 {
-	return container_of((void *)z, struct sk_buff, cb);
+	spin_lock_init(&q->lock);
+	INIT_LIST_HEAD(&q->zcookie_head);
 }
 
 struct rds_message {
@@ -603,8 +614,7 @@ struct rds_sock {
 	/* Socket receive path trace points*/
 	u8			rs_rx_traces;
 	u8			rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
-
-	struct sk_buff_head	rs_zcookie_queue;
+	struct rds_msg_zcopy_queue rs_zcookie_queue;
 };
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
@@ -803,6 +813,7 @@ void rds_message_addref(struct rds_message *rm);
 void rds_message_put(struct rds_message *rm);
 void rds_message_wait(struct rds_message *rm);
 void rds_message_unmapped(struct rds_message *rm);
+void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *info);
 
 static inline void rds_message_make_checksum(struct rds_header *hdr)
 {
diff --git a/net/rds/recv.c b/net/rds/recv.c
index d507477..de50e21 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -579,9 +579,10 @@ out:
 
 static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
 {
-	struct sk_buff *skb;
-	struct sk_buff_head *q = &rs->rs_zcookie_queue;
+	struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue;
+	struct rds_msg_zcopy_info *info = NULL;
 	struct rds_zcopy_cookies *done;
+	unsigned long flags;
 
 	if (!msg->msg_control)
 		return false;
@@ -590,16 +591,24 @@ static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
 	    msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
 		return false;
 
-	skb = skb_dequeue(q);
-	if (!skb)
+	spin_lock_irqsave(&q->lock, flags);
+	if (!list_empty(&q->zcookie_head)) {
+		info = list_entry(q->zcookie_head.next,
+				  struct rds_msg_zcopy_info, rs_zcookie_next);
+		list_del(&info->rs_zcookie_next);
+	}
+	spin_unlock_irqrestore(&q->lock, flags);
+	if (!info)
 		return false;
-	done = (struct rds_zcopy_cookies *)skb->cb;
+	done = &info->zcookies;
 	if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
 		     done)) {
-		skb_queue_head(q, skb);
+		spin_lock_irqsave(&q->lock, flags);
+		list_add(&info->rs_zcookie_next, &q->zcookie_head);
+		spin_unlock_irqrestore(&q->lock, flags);
 		return false;
 	}
-	consume_skb(skb);
+	kfree(info);
 	return true;
 }
 
-- 
cgit v1.1