summaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c132
1 files changed, 85 insertions, 47 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 611a04f..c1a4611 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -41,6 +41,7 @@
#include "node.h"
#include "link.h"
#include "config.h"
+#include "name_distr.h"
#include "socket.h"
#define SS_LISTENING -1 /* socket is listening */
@@ -785,10 +786,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
struct sk_buff *b;
uint i, last, dst = 0;
u32 scope = TIPC_CLUSTER_SCOPE;
+ struct sk_buff_head msgs;
if (in_own_node(net, msg_orignode(msg)))
scope = TIPC_NODE_SCOPE;
+ if (unlikely(!msg_mcast(msg))) {
+ pr_warn("Received non-multicast msg in multicast\n");
+ kfree_skb(buf);
+ goto exit;
+ }
/* Create destination port list: */
tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
msg_nameupper(msg), scope, &dports);
@@ -806,9 +813,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
continue;
}
msg_set_destport(msg, item->ports[i]);
- tipc_sk_rcv(net, b);
+ skb_queue_head_init(&msgs);
+ skb_queue_tail(&msgs, b);
+ tipc_sk_rcv(net, &msgs);
}
}
+exit:
tipc_port_list_free(&dports);
}
@@ -1760,71 +1770,99 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
}
/**
- * tipc_sk_enqueue_skb - enqueue buffer to socket or backlog queue
- * @sk: socket
- * @skb: pointer to message. Set to NULL if buffer is consumed.
- * @dnode: if buffer should be forwarded/returned, send to this node
+ * tipc_sk_enqueue - extract all buffers with destination 'dport' from
+ * inputq and try adding them to socket or backlog queue
+ * @inputq: list of incoming buffers with potentially different destinations
+ * @sk: socket where the buffers should be enqueued
+ * @dport: port number for the socket
+ * @_skb: returned buffer to be forwarded or rejected, if applicable
*
* Caller must hold socket lock
*
- * Returns TIPC_OK (0) or -tipc error code
+ * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
+ * or -TIPC_ERR_NO_PORT
*/
-static int tipc_sk_enqueue_skb(struct sock *sk, struct sk_buff **skb)
+static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
+ u32 dport, struct sk_buff **_skb)
{
unsigned int lim;
atomic_t *dcnt;
-
- if (unlikely(!*skb))
- return TIPC_OK;
- if (!sock_owned_by_user(sk))
- return filter_rcv(sk, skb);
- dcnt = &tipc_sk(sk)->dupl_rcvcnt;
- if (sk->sk_backlog.len)
- atomic_set(dcnt, 0);
- lim = rcvbuf_limit(sk, *skb) + atomic_read(dcnt);
- if (unlikely(sk_add_backlog(sk, *skb, lim)))
+ int err;
+ struct sk_buff *skb;
+ unsigned long time_limit = jiffies + 2;
+
+ while (skb_queue_len(inputq)) {
+ skb = tipc_skb_dequeue(inputq, dport);
+ if (unlikely(!skb))
+ return TIPC_OK;
+ /* Return if softirq window exhausted */
+ if (unlikely(time_after_eq(jiffies, time_limit)))
+ return TIPC_OK;
+ if (!sock_owned_by_user(sk)) {
+ err = filter_rcv(sk, &skb);
+ if (likely(!skb))
+ continue;
+ *_skb = skb;
+ return err;
+ }
+ dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+ if (sk->sk_backlog.len)
+ atomic_set(dcnt, 0);
+ lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+ if (likely(!sk_add_backlog(sk, skb, lim)))
+ continue;
+ *_skb = skb;
return -TIPC_ERR_OVERLOAD;
- *skb = NULL;
+ }
return TIPC_OK;
}
/**
- * tipc_sk_rcv - handle incoming message
- * @skb: buffer containing arriving message
- * Consumes buffer
- * Returns 0 if success, or errno: -EHOSTUNREACH
+ * tipc_sk_rcv - handle a chain of incoming buffers
+ * @inputq: buffer list containing the buffers
+ * Consumes all buffers in list until inputq is empty
+ * Note: may be called in multiple threads referring to the same queue
+ * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
+ * Only node local calls check the return value, sending single-buffer queues
*/
-int tipc_sk_rcv(struct net *net, struct sk_buff *skb)
+int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
{
+ u32 dnode, dport = 0;
+ int err = -TIPC_ERR_NO_PORT;
+ struct sk_buff *skb;
struct tipc_sock *tsk;
struct tipc_net *tn;
struct sock *sk;
- u32 dport = msg_destport(buf_msg(skb));
- int err = -TIPC_ERR_NO_PORT;
- u32 dnode;
- /* Find destination */
- tsk = tipc_sk_lookup(net, dport);
- if (likely(tsk)) {
- sk = &tsk->sk;
- spin_lock_bh(&sk->sk_lock.slock);
- err = tipc_sk_enqueue_skb(sk, &skb);
- spin_unlock_bh(&sk->sk_lock.slock);
- sock_put(sk);
- }
- if (likely(!skb))
- return 0;
- if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
- goto xmit;
- if (!err) {
- dnode = msg_destnode(buf_msg(skb));
- goto xmit;
- }
- tn = net_generic(net, tipc_net_id);
- if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
- return -EHOSTUNREACH;
+ while (skb_queue_len(inputq)) {
+ skb = NULL;
+ dport = tipc_skb_peek_port(inputq, dport);
+ tsk = tipc_sk_lookup(net, dport);
+ if (likely(tsk)) {
+ sk = &tsk->sk;
+ if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
+ err = tipc_sk_enqueue(inputq, sk, dport, &skb);
+ spin_unlock_bh(&sk->sk_lock.slock);
+ dport = 0;
+ }
+ sock_put(sk);
+ } else {
+ skb = tipc_skb_dequeue(inputq, dport);
+ }
+ if (likely(!skb))
+ continue;
+ if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
+ goto xmit;
+ if (!err) {
+ dnode = msg_destnode(buf_msg(skb));
+ goto xmit;
+ }
+ tn = net_generic(net, tipc_net_id);
+ if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
+ continue;
xmit:
- tipc_link_xmit_skb(net, skb, dnode, dport);
+ tipc_link_xmit_skb(net, skb, dnode, dport);
+ }
return err ? -EHOSTUNREACH : 0;
}
OpenPOWER on IntegriCloud