diff options
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r-- | net/tipc/socket.c | 132 |
1 files changed, 85 insertions, 47 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 611a04f..c1a4611 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -41,6 +41,7 @@ #include "node.h" #include "link.h" #include "config.h" +#include "name_distr.h" #include "socket.h" #define SS_LISTENING -1 /* socket is listening */ @@ -785,10 +786,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf) struct sk_buff *b; uint i, last, dst = 0; u32 scope = TIPC_CLUSTER_SCOPE; + struct sk_buff_head msgs; if (in_own_node(net, msg_orignode(msg))) scope = TIPC_NODE_SCOPE; + if (unlikely(!msg_mcast(msg))) { + pr_warn("Received non-multicast msg in multicast\n"); + kfree_skb(buf); + goto exit; + } /* Create destination port list: */ tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), msg_nameupper(msg), scope, &dports); @@ -806,9 +813,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf) continue; } msg_set_destport(msg, item->ports[i]); - tipc_sk_rcv(net, b); + skb_queue_head_init(&msgs); + skb_queue_tail(&msgs, b); + tipc_sk_rcv(net, &msgs); } } +exit: tipc_port_list_free(&dports); } @@ -1760,71 +1770,99 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) } /** - * tipc_sk_enqueue_skb - enqueue buffer to socket or backlog queue - * @sk: socket - * @skb: pointer to message. Set to NULL if buffer is consumed. - * @dnode: if buffer should be forwarded/returned, send to this node + * tipc_sk_enqueue - extract all buffers with destination 'dport' from + * inputq and try adding them to socket or backlog queue + * @inputq: list of incoming buffers with potentially different destinations + * @sk: socket where the buffers should be enqueued + * @dport: port number for the socket + * @_skb: returned buffer to be forwarded or rejected, if applicable * * Caller must hold socket lock * - * Returns TIPC_OK (0) or -tipc error code + * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD + * or -TIPC_ERR_NO_PORT */ -static int tipc_sk_enqueue_skb(struct sock *sk, struct sk_buff **skb) +static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, + u32 dport, struct sk_buff **_skb) { unsigned int lim; atomic_t *dcnt; - - if (unlikely(!*skb)) - return TIPC_OK; - if (!sock_owned_by_user(sk)) - return filter_rcv(sk, skb); - dcnt = &tipc_sk(sk)->dupl_rcvcnt; - if (sk->sk_backlog.len) - atomic_set(dcnt, 0); - lim = rcvbuf_limit(sk, *skb) + atomic_read(dcnt); - if (unlikely(sk_add_backlog(sk, *skb, lim))) + int err; + struct sk_buff *skb; + unsigned long time_limit = jiffies + 2; + + while (skb_queue_len(inputq)) { + skb = tipc_skb_dequeue(inputq, dport); + if (unlikely(!skb)) + return TIPC_OK; + /* Return if softirq window exhausted */ + if (unlikely(time_after_eq(jiffies, time_limit))) + return TIPC_OK; + if (!sock_owned_by_user(sk)) { + err = filter_rcv(sk, &skb); + if (likely(!skb)) + continue; + *_skb = skb; + return err; + } + dcnt = &tipc_sk(sk)->dupl_rcvcnt; + if (sk->sk_backlog.len) + atomic_set(dcnt, 0); + lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); + if (likely(!sk_add_backlog(sk, skb, lim))) + continue; + *_skb = skb; return -TIPC_ERR_OVERLOAD; - *skb = NULL; + } return TIPC_OK; } /** - * tipc_sk_rcv - handle incoming message - * @skb: buffer containing arriving message - * Consumes buffer - * Returns 0 if success, or errno: -EHOSTUNREACH + * tipc_sk_rcv - handle a chain of incoming buffers + * @inputq: buffer list containing the buffers + * Consumes all buffers in list until inputq is empty + * Note: may be called in multiple threads referring to the same queue + * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH + * Only node local calls check the return value, sending single-buffer queues */ -int tipc_sk_rcv(struct net *net, struct sk_buff *skb) +int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) { + u32 dnode, dport = 0; + int err = -TIPC_ERR_NO_PORT; + struct sk_buff *skb; struct tipc_sock *tsk; struct tipc_net *tn; struct sock *sk; - u32 dport = msg_destport(buf_msg(skb)); - int err = -TIPC_ERR_NO_PORT; - u32 dnode; - /* Find destination */ - tsk = tipc_sk_lookup(net, dport); - if (likely(tsk)) { - sk = &tsk->sk; - spin_lock_bh(&sk->sk_lock.slock); - err = tipc_sk_enqueue_skb(sk, &skb); - spin_unlock_bh(&sk->sk_lock.slock); - sock_put(sk); - } - if (likely(!skb)) - return 0; - if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) - goto xmit; - if (!err) { - dnode = msg_destnode(buf_msg(skb)); - goto xmit; - } - tn = net_generic(net, tipc_net_id); - if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) - return -EHOSTUNREACH; + while (skb_queue_len(inputq)) { + skb = NULL; + dport = tipc_skb_peek_port(inputq, dport); + tsk = tipc_sk_lookup(net, dport); + if (likely(tsk)) { + sk = &tsk->sk; + if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { + err = tipc_sk_enqueue(inputq, sk, dport, &skb); + spin_unlock_bh(&sk->sk_lock.slock); + dport = 0; + } + sock_put(sk); + } else { + skb = tipc_skb_dequeue(inputq, dport); + } + if (likely(!skb)) + continue; + if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) + goto xmit; + if (!err) { + dnode = msg_destnode(buf_msg(skb)); + goto xmit; + } + tn = net_generic(net, tipc_net_id); + if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) + continue; xmit: - tipc_link_xmit_skb(net, skb, dnode, dport); + tipc_link_xmit_skb(net, skb, dnode, dport); + } return err ? -EHOSTUNREACH : 0; } |