diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2014-12-11 14:27:06 -0800 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2014-12-11 14:27:06 -0800 |
commit | 70e71ca0af244f48a5dcf56dc435243792e3a495 (patch) | |
tree | f7d9c4c4d9a857a00043e9bf6aa2d6f533a34778 /net/tipc | |
parent | bae41e45b7400496b9bf0c70c6004419d9987819 (diff) | |
parent | 00c83b01d58068dfeb2e1351cca6fccf2a83fa8f (diff) | |
download | op-kernel-dev-70e71ca0af244f48a5dcf56dc435243792e3a495.zip op-kernel-dev-70e71ca0af244f48a5dcf56dc435243792e3a495.tar.gz |
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next
Pull networking updates from David Miller:
1) New offloading infrastructure and example 'rocker' driver for
offloading of switching and routing to hardware.
This work was done by a large group of dedicated individuals, not
limited to: Scott Feldman, Jiri Pirko, Thomas Graf, John Fastabend,
Jamal Hadi Salim, Andy Gospodarek, Florian Fainelli, Roopa Prabhu
2) Start making the networking operate on IOV iterators instead of
modifying iov objects in-situ during transfers. Thanks to Al Viro
and Herbert Xu.
3) A set of new netlink interfaces for the TIPC stack, from Richard
Alpe.
4) Remove unnecessary looping during ipv6 routing lookups, from Martin
KaFai Lau.
5) Add PAUSE frame generation support to gianfar driver, from Matei
Pavaluca.
6) Allow for larger reordering levels in TCP, which are easily
achievable in the real world right now, from Eric Dumazet.
7) Add a variable of napi_schedule that doesn't need to disable cpu
interrupts, from Eric Dumazet.
8) Use a doubly linked list to optimize neigh_parms_release(), from
Nicolas Dichtel.
9) Various enhancements to the kernel BPF verifier, and allow eBPF
programs to actually be attached to sockets. From Alexei
Starovoitov.
10) Support TSO/LSO in sunvnet driver, from David L Stevens.
11) Allow controlling ECN usage via routing metrics, from Florian
Westphal.
12) Remote checksum offload, from Tom Herbert.
13) Add split-header receive, BQL, and xmit_more support to amd-xgbe
driver, from Thomas Lendacky.
14) Add MPLS support to openvswitch, from Simon Horman.
15) Support wildcard tunnel endpoints in ipv6 tunnels, from Steffen
Klassert.
16) Do gro flushes on a per-device basis using a timer, from Eric
Dumazet. This tries to resolve the conflicting goals between the
desired handling of bulk vs. RPC-like traffic.
17) Allow userspace to ask for the CPU upon what a packet was
received/steered, via SO_INCOMING_CPU. From Eric Dumazet.
18) Limit GSO packets to half the current congestion window, from Eric
Dumazet.
19) Add a generic helper so that all drivers set their RSS keys in a
consistent way, from Eric Dumazet.
20) Add xmit_more support to enic driver, from Govindarajulu
Varadarajan.
21) Add VLAN packet scheduler action, from Jiri Pirko.
22) Support configurable RSS hash functions via ethtool, from Eyal
Perry.
* git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next: (1820 commits)
Fix race condition between vxlan_sock_add and vxlan_sock_release
net/macb: fix compilation warning for print_hex_dump() called with skb->mac_header
net/mlx4: Add support for A0 steering
net/mlx4: Refactor QUERY_PORT
net/mlx4_core: Add explicit error message when rule doesn't meet configuration
net/mlx4: Add A0 hybrid steering
net/mlx4: Add mlx4_bitmap zone allocator
net/mlx4: Add a check if there are too many reserved QPs
net/mlx4: Change QP allocation scheme
net/mlx4_core: Use tasklet for user-space CQ completion events
net/mlx4_core: Mask out host side virtualization features for guests
net/mlx4_en: Set csum level for encapsulated packets
be2net: Export tunnel offloads only when a VxLAN tunnel is created
gianfar: Fix dma check map error when DMA_API_DEBUG is enabled
cxgb4/csiostor: Don't use MASTER_MUST for fw_hello call
net: fec: only enable mdio interrupt before phy device link up
net: fec: clear all interrupt events to support i.MX6SX
net: fec: reset fep link status in suspend function
net: sock: fix access via invalid file descriptor
net: introduce helper macro for_each_cmsghdr
...
Diffstat (limited to 'net/tipc')
-rw-r--r-- | net/tipc/Makefile | 4 | ||||
-rw-r--r-- | net/tipc/bcast.c | 230 | ||||
-rw-r--r-- | net/tipc/bcast.h | 6 | ||||
-rw-r--r-- | net/tipc/bearer.c | 447 | ||||
-rw-r--r-- | net/tipc/bearer.h | 16 | ||||
-rw-r--r-- | net/tipc/core.h | 2 | ||||
-rw-r--r-- | net/tipc/link.c | 981 | ||||
-rw-r--r-- | net/tipc/link.h | 55 | ||||
-rw-r--r-- | net/tipc/msg.c | 133 | ||||
-rw-r--r-- | net/tipc/msg.h | 16 | ||||
-rw-r--r-- | net/tipc/name_distr.c | 181 | ||||
-rw-r--r-- | net/tipc/name_distr.h | 1 | ||||
-rw-r--r-- | net/tipc/name_table.c | 373 | ||||
-rw-r--r-- | net/tipc/name_table.h | 30 | ||||
-rw-r--r-- | net/tipc/net.c | 106 | ||||
-rw-r--r-- | net/tipc/net.h | 8 | ||||
-rw-r--r-- | net/tipc/netlink.c | 133 | ||||
-rw-r--r-- | net/tipc/netlink.h (renamed from net/tipc/node_subscr.h) | 35 | ||||
-rw-r--r-- | net/tipc/node.c | 108 | ||||
-rw-r--r-- | net/tipc/node.h | 16 | ||||
-rw-r--r-- | net/tipc/node_subscr.c | 96 | ||||
-rw-r--r-- | net/tipc/socket.c | 419 | ||||
-rw-r--r-- | net/tipc/socket.h | 3 | ||||
-rw-r--r-- | net/tipc/subscr.c | 1 |
24 files changed, 2486 insertions, 914 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile index b8a13ca..333e459 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -7,8 +7,8 @@ obj-$(CONFIG_TIPC) := tipc.o tipc-y += addr.o bcast.o bearer.o config.o \ core.o link.o discover.o msg.o \ name_distr.o subscr.o name_table.o net.o \ - netlink.o node.o node_subscr.o \ - socket.o log.o eth_media.o server.o + netlink.o node.o socket.o log.o eth_media.o \ + server.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o tipc-$(CONFIG_SYSCTL) += sysctl.o diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index b8670bf..96ceefe 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -217,12 +217,13 @@ struct tipc_node *tipc_bclink_retransmit_to(void) */ static void bclink_retransmit_pkt(u32 after, u32 to) { - struct sk_buff *buf; + struct sk_buff *skb; - buf = bcl->first_out; - while (buf && less_eq(buf_seqno(buf), after)) - buf = buf->next; - tipc_link_retransmit(bcl, buf, mod(to - after)); + skb_queue_walk(&bcl->outqueue, skb) { + if (more(buf_seqno(skb), after)) + break; + } + tipc_link_retransmit(bcl, skb, mod(to - after)); } /** @@ -232,8 +233,11 @@ static void bclink_retransmit_pkt(u32 after, u32 to) */ void tipc_bclink_wakeup_users(void) { - while (skb_queue_len(&bclink->link.waiting_sks)) - tipc_sk_rcv(skb_dequeue(&bclink->link.waiting_sks)); + struct sk_buff *skb; + + while ((skb = skb_dequeue(&bclink->link.waiting_sks))) + tipc_sk_rcv(skb); + } /** @@ -245,14 +249,14 @@ void tipc_bclink_wakeup_users(void) */ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) { - struct sk_buff *crs; + struct sk_buff *skb, *tmp; struct sk_buff *next; unsigned int released = 0; tipc_bclink_lock(); /* Bail out if tx queue is empty (no clean up is required) */ - crs = bcl->first_out; - if (!crs) + skb = skb_peek(&bcl->outqueue); + if (!skb) goto exit; /* Determine which messages need to be acknowledged */ @@ -271,43 +275,43 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) * Bail out if specified sequence number does not correspond * to a message that has been sent and not yet acknowledged */ - if (less(acked, buf_seqno(crs)) || + if (less(acked, buf_seqno(skb)) || less(bcl->fsm_msg_cnt, acked) || less_eq(acked, n_ptr->bclink.acked)) goto exit; } /* Skip over packets that node has previously acknowledged */ - while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) - crs = crs->next; + skb_queue_walk(&bcl->outqueue, skb) { + if (more(buf_seqno(skb), n_ptr->bclink.acked)) + break; + } /* Update packets that node is now acknowledging */ + skb_queue_walk_from_safe(&bcl->outqueue, skb, tmp) { + if (more(buf_seqno(skb), acked)) + break; - while (crs && less_eq(buf_seqno(crs), acked)) { - next = crs->next; - - if (crs != bcl->next_out) - bcbuf_decr_acks(crs); - else { - bcbuf_set_acks(crs, 0); + next = tipc_skb_queue_next(&bcl->outqueue, skb); + if (skb != bcl->next_out) { + bcbuf_decr_acks(skb); + } else { + bcbuf_set_acks(skb, 0); bcl->next_out = next; bclink_set_last_sent(); } - if (bcbuf_acks(crs) == 0) { - bcl->first_out = next; - bcl->out_queue_size--; - kfree_skb(crs); + if (bcbuf_acks(skb) == 0) { + __skb_unlink(skb, &bcl->outqueue); + kfree_skb(skb); released = 1; } - crs = next; } n_ptr->bclink.acked = acked; /* Try resolving broadcast link congestion, if necessary */ - if (unlikely(bcl->next_out)) { - tipc_link_push_queue(bcl); + tipc_link_push_packets(bcl); bclink_set_last_sent(); } if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks))) @@ -327,19 +331,16 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) struct sk_buff *buf; /* Ignore "stale" link state info */ - if (less_eq(last_sent, n_ptr->bclink.last_in)) return; /* Update link synchronization state; quit if in sync */ - bclink_update_last_sent(n_ptr, last_sent); if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) return; /* Update out-of-sync state; quit if loss is still unconfirmed */ - if ((++n_ptr->bclink.oos_state) == 1) { if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) return; @@ -347,15 +348,15 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) } /* Don't NACK if one has been recently sent (or seen) */ - if (n_ptr->bclink.oos_state & 0x1) return; /* Send NACK */ - buf = tipc_buf_acquire(INT_H_SIZE); if (buf) { struct tipc_msg *msg = buf_msg(buf); + struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue); + u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, n_ptr->addr); @@ -363,9 +364,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) msg_set_mc_netid(msg, tipc_net_id); msg_set_bcast_ack(msg, n_ptr->bclink.last_in); msg_set_bcgap_after(msg, n_ptr->bclink.last_in); - msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head - ? buf_seqno(n_ptr->bclink.deferred_head) - 1 - : n_ptr->bclink.last_sent); + msg_set_bcgap_to(msg, to); tipc_bclink_lock(); tipc_bearer_send(MAX_BEARERS, buf, NULL); @@ -402,20 +401,20 @@ static void bclink_peek_nack(struct tipc_msg *msg) /* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster * and to identified node local sockets - * @buf: chain of buffers containing message + * @list: chain of buffers containing message * Consumes the buffer chain, except when returning -ELINKCONG * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ -int tipc_bclink_xmit(struct sk_buff *buf) +int tipc_bclink_xmit(struct sk_buff_head *list) { int rc = 0; int bc = 0; - struct sk_buff *clbuf; + struct sk_buff *skb; /* Prepare clone of message for local node */ - clbuf = tipc_msg_reassemble(buf); - if (unlikely(!clbuf)) { - kfree_skb_list(buf); + skb = tipc_msg_reassemble(list); + if (unlikely(!skb)) { + __skb_queue_purge(list); return -EHOSTUNREACH; } @@ -423,11 +422,13 @@ int tipc_bclink_xmit(struct sk_buff *buf) if (likely(bclink)) { tipc_bclink_lock(); if (likely(bclink->bcast_nodes.count)) { - rc = __tipc_link_xmit(bcl, buf); + rc = __tipc_link_xmit(bcl, list); if (likely(!rc)) { + u32 len = skb_queue_len(&bcl->outqueue); + bclink_set_last_sent(); bcl->stats.queue_sz_counts++; - bcl->stats.accu_queue_sz += bcl->out_queue_size; + bcl->stats.accu_queue_sz += len; } bc = 1; } @@ -435,13 +436,13 @@ int tipc_bclink_xmit(struct sk_buff *buf) } if (unlikely(!bc)) - kfree_skb_list(buf); + __skb_queue_purge(list); /* Deliver message clone */ if (likely(!rc)) - tipc_sk_mcast_rcv(clbuf); + tipc_sk_mcast_rcv(skb); else - kfree_skb(clbuf); + kfree_skb(skb); return rc; } @@ -462,7 +463,6 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) * Unicast an ACK periodically, ensuring that * all nodes in the cluster don't ACK at the same time */ - if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { tipc_link_proto_xmit(node->active_links[node->addr & 1], STATE_MSG, 0, 0, 0, 0, 0); @@ -484,7 +484,6 @@ void tipc_bclink_rcv(struct sk_buff *buf) int deferred = 0; /* Screen out unwanted broadcast messages */ - if (msg_mc_netid(msg) != tipc_net_id) goto exit; @@ -497,7 +496,6 @@ void tipc_bclink_rcv(struct sk_buff *buf) goto unlock; /* Handle broadcast protocol message */ - if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { if (msg_type(msg) != STATE_MSG) goto unlock; @@ -518,14 +516,12 @@ void tipc_bclink_rcv(struct sk_buff *buf) } /* Handle in-sequence broadcast message */ - seqno = msg_seqno(msg); next_in = mod(node->bclink.last_in + 1); if (likely(seqno == next_in)) { receive: /* Deliver message to destination */ - if (likely(msg_isdata(msg))) { tipc_bclink_lock(); bclink_accept_pkt(node, seqno); @@ -574,7 +570,6 @@ receive: buf = NULL; /* Determine new synchronization state */ - tipc_node_lock(node); if (unlikely(!tipc_node_is_up(node))) goto unlock; @@ -582,33 +577,26 @@ receive: if (node->bclink.last_in == node->bclink.last_sent) goto unlock; - if (!node->bclink.deferred_head) { + if (skb_queue_empty(&node->bclink.deferred_queue)) { node->bclink.oos_state = 1; goto unlock; } - msg = buf_msg(node->bclink.deferred_head); + msg = buf_msg(skb_peek(&node->bclink.deferred_queue)); seqno = msg_seqno(msg); next_in = mod(next_in + 1); if (seqno != next_in) goto unlock; /* Take in-sequence message from deferred queue & deliver it */ - - buf = node->bclink.deferred_head; - node->bclink.deferred_head = buf->next; - buf->next = NULL; - node->bclink.deferred_size--; + buf = __skb_dequeue(&node->bclink.deferred_queue); goto receive; } /* Handle out-of-sequence broadcast message */ - if (less(next_in, seqno)) { - deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, - &node->bclink.deferred_tail, + deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue, buf); - node->bclink.deferred_size += deferred; bclink_update_last_sent(node, seqno); buf = NULL; } @@ -767,6 +755,118 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action) tipc_bclink_unlock(); } +static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, + struct tipc_stats *stats) +{ + int i; + struct nlattr *nest; + + struct nla_map { + __u32 key; + __u32 val; + }; + + struct nla_map map[] = { + {TIPC_NLA_STATS_RX_INFO, stats->recv_info}, + {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments}, + {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented}, + {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles}, + {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled}, + {TIPC_NLA_STATS_TX_INFO, stats->sent_info}, + {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments}, + {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented}, + {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles}, + {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled}, + {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks}, + {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv}, + {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks}, + {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks}, + {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted}, + {TIPC_NLA_STATS_DUPLICATES, stats->duplicates}, + {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs}, + {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz}, + {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ? + (stats->accu_queue_sz / stats->queue_sz_counts) : 0} + }; + + nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS); + if (!nest) + return -EMSGSIZE; + + for (i = 0; i < ARRAY_SIZE(map); i++) + if (nla_put_u32(skb, map[i].key, map[i].val)) + goto msg_full; + + nla_nest_end(skb, nest); + + return 0; +msg_full: + nla_nest_cancel(skb, nest); + + return -EMSGSIZE; +} + +int tipc_nl_add_bc_link(struct tipc_nl_msg *msg) +{ + int err; + void *hdr; + struct nlattr *attrs; + struct nlattr *prop; + + if (!bcl) + return 0; + + tipc_bclink_lock(); + + hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family, + NLM_F_MULTI, TIPC_NL_LINK_GET); + if (!hdr) + return -EMSGSIZE; + + attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK); + if (!attrs) + goto msg_full; + + /* The broadcast link is always up */ + if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) + goto attr_msg_full; + + if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST)) + goto attr_msg_full; + if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name)) + goto attr_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->next_in_no)) + goto attr_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->next_out_no)) + goto attr_msg_full; + + prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP); + if (!prop) + goto attr_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0])) + goto prop_msg_full; + nla_nest_end(msg->skb, prop); + + err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats); + if (err) + goto attr_msg_full; + + tipc_bclink_unlock(); + nla_nest_end(msg->skb, attrs); + genlmsg_end(msg->skb, hdr); + + return 0; + +prop_msg_full: + nla_nest_cancel(msg->skb, prop); +attr_msg_full: + nla_nest_cancel(msg->skb, attrs); +msg_full: + tipc_bclink_unlock(); + genlmsg_cancel(msg->skb, hdr); + + return -EMSGSIZE; +} int tipc_bclink_stats(char *buf, const u32 buf_size) { @@ -851,7 +951,9 @@ int tipc_bclink_init(void) sprintf(bcbearer->media.name, "tipc-broadcast"); spin_lock_init(&bclink->lock); - __skb_queue_head_init(&bcl->waiting_sks); + __skb_queue_head_init(&bcl->outqueue); + __skb_queue_head_init(&bcl->deferred_queue); + skb_queue_head_init(&bcl->waiting_sks); bcl->next_out_no = 1; spin_lock_init(&bclink->node.lock); __skb_queue_head_init(&bclink->node.waiting_sks); diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index e7b0f85..644d791 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -37,6 +37,8 @@ #ifndef _TIPC_BCAST_H #define _TIPC_BCAST_H +#include "netlink.h" + #define MAX_NODES 4096 #define WSIZE 32 #define TIPC_BCLINK_RESET 1 @@ -98,6 +100,8 @@ int tipc_bclink_reset_stats(void); int tipc_bclink_set_queue_limits(u32 limit); void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); uint tipc_bclink_get_mtu(void); -int tipc_bclink_xmit(struct sk_buff *buf); +int tipc_bclink_xmit(struct sk_buff_head *list); void tipc_bclink_wakeup_users(void); +int tipc_nl_add_bc_link(struct tipc_nl_msg *msg); + #endif diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 2644743..463db5b 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -1,7 +1,7 @@ /* * net/tipc/bearer.c: TIPC bearer code * - * Copyright (c) 1996-2006, 2013, Ericsson AB + * Copyright (c) 1996-2006, 2013-2014, Ericsson AB * Copyright (c) 2004-2006, 2010-2013, Wind River Systems * All rights reserved. * @@ -37,6 +37,7 @@ #include "core.h" #include "config.h" #include "bearer.h" +#include "link.h" #include "discover.h" #define MAX_ADDR_STR 60 @@ -49,6 +50,23 @@ static struct tipc_media * const media_info_array[] = { NULL }; +static const struct nla_policy +tipc_nl_bearer_policy[TIPC_NLA_BEARER_MAX + 1] = { + [TIPC_NLA_BEARER_UNSPEC] = { .type = NLA_UNSPEC }, + [TIPC_NLA_BEARER_NAME] = { + .type = NLA_STRING, + .len = TIPC_MAX_BEARER_NAME + }, + [TIPC_NLA_BEARER_PROP] = { .type = NLA_NESTED }, + [TIPC_NLA_BEARER_DOMAIN] = { .type = NLA_U32 } +}; + +static const struct nla_policy tipc_nl_media_policy[TIPC_NLA_MEDIA_MAX + 1] = { + [TIPC_NLA_MEDIA_UNSPEC] = { .type = NLA_UNSPEC }, + [TIPC_NLA_MEDIA_NAME] = { .type = NLA_STRING }, + [TIPC_NLA_MEDIA_PROP] = { .type = NLA_NESTED } +}; + struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1]; static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down); @@ -627,3 +645,430 @@ void tipc_bearer_stop(void) } } } + +/* Caller should hold rtnl_lock to protect the bearer */ +static int __tipc_nl_add_bearer(struct tipc_nl_msg *msg, + struct tipc_bearer *bearer) +{ + void *hdr; + struct nlattr *attrs; + struct nlattr *prop; + + hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family, + NLM_F_MULTI, TIPC_NL_BEARER_GET); + if (!hdr) + return -EMSGSIZE; + + attrs = nla_nest_start(msg->skb, TIPC_NLA_BEARER); + if (!attrs) + goto msg_full; + + if (nla_put_string(msg->skb, TIPC_NLA_BEARER_NAME, bearer->name)) + goto attr_msg_full; + + prop = nla_nest_start(msg->skb, TIPC_NLA_BEARER_PROP); + if (!prop) + goto prop_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, bearer->priority)) + goto prop_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, bearer->tolerance)) + goto prop_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bearer->window)) + goto prop_msg_full; + + nla_nest_end(msg->skb, prop); + nla_nest_end(msg->skb, attrs); + genlmsg_end(msg->skb, hdr); + + return 0; + +prop_msg_full: + nla_nest_cancel(msg->skb, prop); +attr_msg_full: + nla_nest_cancel(msg->skb, attrs); +msg_full: + genlmsg_cancel(msg->skb, hdr); + + return -EMSGSIZE; +} + +int tipc_nl_bearer_dump(struct sk_buff *skb, struct netlink_callback *cb) +{ + int err; + int i = cb->args[0]; + struct tipc_bearer *bearer; + struct tipc_nl_msg msg; + + if (i == MAX_BEARERS) + return 0; + + msg.skb = skb; + msg.portid = NETLINK_CB(cb->skb).portid; + msg.seq = cb->nlh->nlmsg_seq; + + rtnl_lock(); + for (i = 0; i < MAX_BEARERS; i++) { + bearer = rtnl_dereference(bearer_list[i]); + if (!bearer) + continue; + + err = __tipc_nl_add_bearer(&msg, bearer); + if (err) + break; + } + rtnl_unlock(); + + cb->args[0] = i; + return skb->len; +} + +int tipc_nl_bearer_get(struct sk_buff *skb, struct genl_info *info) +{ + int err; + char *name; + struct sk_buff *rep; + struct tipc_bearer *bearer; + struct tipc_nl_msg msg; + struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1]; + + if (!info->attrs[TIPC_NLA_BEARER]) + return -EINVAL; + + err = nla_parse_nested(attrs, TIPC_NLA_BEARER_MAX, + info->attrs[TIPC_NLA_BEARER], + tipc_nl_bearer_policy); + if (err) + return err; + + if (!attrs[TIPC_NLA_BEARER_NAME]) + return -EINVAL; + name = nla_data(attrs[TIPC_NLA_BEARER_NAME]); + + rep = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL); + if (!rep) + return -ENOMEM; + + msg.skb = rep; + msg.portid = info->snd_portid; + msg.seq = info->snd_seq; + + rtnl_lock(); + bearer = tipc_bearer_find(name); + if (!bearer) { + err = -EINVAL; + goto err_out; + } + + err = __tipc_nl_add_bearer(&msg, bearer); + if (err) + goto err_out; + rtnl_unlock(); + + return genlmsg_reply(rep, info); +err_out: + rtnl_unlock(); + nlmsg_free(rep); + + return err; +} + +int tipc_nl_bearer_disable(struct sk_buff *skb, struct genl_info *info) +{ + int err; + char *name; + struct tipc_bearer *bearer; + struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1]; + + if (!info->attrs[TIPC_NLA_BEARER]) + return -EINVAL; + + err = nla_parse_nested(attrs, TIPC_NLA_BEARER_MAX, + info->attrs[TIPC_NLA_BEARER], + tipc_nl_bearer_policy); + if (err) + return err; + + if (!attrs[TIPC_NLA_BEARER_NAME]) + return -EINVAL; + + name = nla_data(attrs[TIPC_NLA_BEARER_NAME]); + + rtnl_lock(); + bearer = tipc_bearer_find(name); + if (!bearer) { + rtnl_unlock(); + return -EINVAL; + } + + bearer_disable(bearer, false); + rtnl_unlock(); + + return 0; +} + +int tipc_nl_bearer_enable(struct sk_buff *skb, struct genl_info *info) +{ + int err; + char *bearer; + struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1]; + u32 domain; + u32 prio; + + prio = TIPC_MEDIA_LINK_PRI; + domain = tipc_own_addr & TIPC_CLUSTER_MASK; + + if (!info->attrs[TIPC_NLA_BEARER]) + return -EINVAL; + + err = nla_parse_nested(attrs, TIPC_NLA_BEARER_MAX, + info->attrs[TIPC_NLA_BEARER], + tipc_nl_bearer_policy); + if (err) + return err; + + if (!attrs[TIPC_NLA_BEARER_NAME]) + return -EINVAL; + + bearer = nla_data(attrs[TIPC_NLA_BEARER_NAME]); + + if (attrs[TIPC_NLA_BEARER_DOMAIN]) + domain = nla_get_u32(attrs[TIPC_NLA_BEARER_DOMAIN]); + + if (attrs[TIPC_NLA_BEARER_PROP]) { + struct nlattr *props[TIPC_NLA_PROP_MAX + 1]; + + err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_BEARER_PROP], + props); + if (err) + return err; + + if (props[TIPC_NLA_PROP_PRIO]) + prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]); + } + + rtnl_lock(); + err = tipc_enable_bearer(bearer, domain, prio); + if (err) { + rtnl_unlock(); + return err; + } + rtnl_unlock(); + + return 0; +} + +int tipc_nl_bearer_set(struct sk_buff *skb, struct genl_info *info) +{ + int err; + char *name; + struct tipc_bearer *b; + struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1]; + + if (!info->attrs[TIPC_NLA_BEARER]) + return -EINVAL; + + err = nla_parse_nested(attrs, TIPC_NLA_BEARER_MAX, + info->attrs[TIPC_NLA_BEARER], + tipc_nl_bearer_policy); + if (err) + return err; + + if (!attrs[TIPC_NLA_BEARER_NAME]) + return -EINVAL; + name = nla_data(attrs[TIPC_NLA_BEARER_NAME]); + + rtnl_lock(); + b = tipc_bearer_find(name); + if (!b) { + rtnl_unlock(); + return -EINVAL; + } + + if (attrs[TIPC_NLA_BEARER_PROP]) { + struct nlattr *props[TIPC_NLA_PROP_MAX + 1]; + + err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_BEARER_PROP], + props); + if (err) { + rtnl_unlock(); + return err; + } + + if (props[TIPC_NLA_PROP_TOL]) + b->tolerance = nla_get_u32(props[TIPC_NLA_PROP_TOL]); + if (props[TIPC_NLA_PROP_PRIO]) + b->priority = nla_get_u32(props[TIPC_NLA_PROP_PRIO]); + if (props[TIPC_NLA_PROP_WIN]) + b->window = nla_get_u32(props[TIPC_NLA_PROP_WIN]); + } + rtnl_unlock(); + + return 0; +} + +static int __tipc_nl_add_media(struct tipc_nl_msg *msg, + struct tipc_media *media) +{ + void *hdr; + struct nlattr *attrs; + struct nlattr *prop; + + hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family, + NLM_F_MULTI, TIPC_NL_MEDIA_GET); + if (!hdr) + return -EMSGSIZE; + + attrs = nla_nest_start(msg->skb, TIPC_NLA_MEDIA); + if (!attrs) + goto msg_full; + + if (nla_put_string(msg->skb, TIPC_NLA_MEDIA_NAME, media->name)) + goto attr_msg_full; + + prop = nla_nest_start(msg->skb, TIPC_NLA_MEDIA_PROP); + if (!prop) + goto prop_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, media->priority)) + goto prop_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, media->tolerance)) + goto prop_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, media->window)) + goto prop_msg_full; + + nla_nest_end(msg->skb, prop); + nla_nest_end(msg->skb, attrs); + genlmsg_end(msg->skb, hdr); + + return 0; + +prop_msg_full: + nla_nest_cancel(msg->skb, prop); +attr_msg_full: + nla_nest_cancel(msg->skb, attrs); +msg_full: + genlmsg_cancel(msg->skb, hdr); + + return -EMSGSIZE; +} + +int tipc_nl_media_dump(struct sk_buff *skb, struct netlink_callback *cb) +{ + int err; + int i = cb->args[0]; + struct tipc_nl_msg msg; + + if (i == MAX_MEDIA) + return 0; + + msg.skb = skb; + msg.portid = NETLINK_CB(cb->skb).portid; + msg.seq = cb->nlh->nlmsg_seq; + + rtnl_lock(); + for (; media_info_array[i] != NULL; i++) { + err = __tipc_nl_add_media(&msg, media_info_array[i]); + if (err) + break; + } + rtnl_unlock(); + + cb->args[0] = i; + return skb->len; +} + +int tipc_nl_media_get(struct sk_buff *skb, struct genl_info *info) +{ + int err; + char *name; + struct tipc_nl_msg msg; + struct tipc_media *media; + struct sk_buff *rep; + struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1]; + + if (!info->attrs[TIPC_NLA_MEDIA]) + return -EINVAL; + + err = nla_parse_nested(attrs, TIPC_NLA_MEDIA_MAX, + info->attrs[TIPC_NLA_MEDIA], + tipc_nl_media_policy); + if (err) + return err; + + if (!attrs[TIPC_NLA_MEDIA_NAME]) + return -EINVAL; + name = nla_data(attrs[TIPC_NLA_MEDIA_NAME]); + + rep = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL); + if (!rep) + return -ENOMEM; + + msg.skb = rep; + msg.portid = info->snd_portid; + msg.seq = info->snd_seq; + + rtnl_lock(); + media = tipc_media_find(name); + if (!media) { + err = -EINVAL; + goto err_out; + } + + err = __tipc_nl_add_media(&msg, media); + if (err) + goto err_out; + rtnl_unlock(); + + return genlmsg_reply(rep, info); +err_out: + rtnl_unlock(); + nlmsg_free(rep); + + return err; +} + +int tipc_nl_media_set(struct sk_buff *skb, struct genl_info *info) +{ + int err; + char *name; + struct tipc_media *m; + struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1]; + + if (!info->attrs[TIPC_NLA_MEDIA]) + return -EINVAL; + + err = nla_parse_nested(attrs, TIPC_NLA_MEDIA_MAX, + info->attrs[TIPC_NLA_MEDIA], + tipc_nl_media_policy); + + if (!attrs[TIPC_NLA_MEDIA_NAME]) + return -EINVAL; + name = nla_data(attrs[TIPC_NLA_MEDIA_NAME]); + + rtnl_lock(); + m = tipc_media_find(name); + if (!m) { + rtnl_unlock(); + return -EINVAL; + } + + if (attrs[TIPC_NLA_MEDIA_PROP]) { + struct nlattr *props[TIPC_NLA_PROP_MAX + 1]; + + err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_MEDIA_PROP], + props); + if (err) { + rtnl_unlock(); + return err; + } + + if (props[TIPC_NLA_PROP_TOL]) + m->tolerance = nla_get_u32(props[TIPC_NLA_PROP_TOL]); + if (props[TIPC_NLA_PROP_PRIO]) + m->priority = nla_get_u32(props[TIPC_NLA_PROP_PRIO]); + if (props[TIPC_NLA_PROP_WIN]) + m->window = nla_get_u32(props[TIPC_NLA_PROP_WIN]); + } + rtnl_unlock(); + + return 0; +} diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 78fccc4..2c1230a 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -1,7 +1,7 @@ /* * net/tipc/bearer.h: Include file for TIPC bearer code * - * Copyright (c) 1996-2006, 2013, Ericsson AB + * Copyright (c) 1996-2006, 2013-2014, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -38,6 +38,8 @@ #define _TIPC_BEARER_H #include "bcast.h" +#include "netlink.h" +#include <net/genetlink.h> #define MAX_BEARERS 2 #define MAX_MEDIA 2 @@ -163,7 +165,7 @@ extern struct tipc_bearer __rcu *bearer_list[]; * TIPC routines available to supported media types */ -void tipc_rcv(struct sk_buff *buf, struct tipc_bearer *tb_ptr); +void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *tb_ptr); int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority); int tipc_disable_bearer(const char *name); @@ -176,6 +178,16 @@ extern struct tipc_media eth_media_info; extern struct tipc_media ib_media_info; #endif +int tipc_nl_bearer_disable(struct sk_buff *skb, struct genl_info *info); +int tipc_nl_bearer_enable(struct sk_buff *skb, struct genl_info *info); +int tipc_nl_bearer_dump(struct sk_buff *skb, struct netlink_callback *cb); +int tipc_nl_bearer_get(struct sk_buff *skb, struct genl_info *info); +int tipc_nl_bearer_set(struct sk_buff *skb, struct genl_info *info); + +int tipc_nl_media_dump(struct sk_buff *skb, struct netlink_callback *cb); +int tipc_nl_media_get(struct sk_buff *skb, struct genl_info *info); +int tipc_nl_media_set(struct sk_buff *skb, struct genl_info *info); + int tipc_media_set_priority(const char *name, u32 new_value); int tipc_media_set_window(const char *name, u32 new_value); void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a); diff --git a/net/tipc/core.h b/net/tipc/core.h index f773b14..8460213 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -41,6 +41,7 @@ #include <linux/tipc.h> #include <linux/tipc_config.h> +#include <linux/tipc_netlink.h> #include <linux/types.h> #include <linux/kernel.h> #include <linux/errno.h> @@ -191,6 +192,7 @@ struct tipc_skb_cb { struct sk_buff *tail; bool deferred; bool wakeup_pending; + bool bundling; u16 chain_sz; u16 chain_imp; }; diff --git a/net/tipc/link.c b/net/tipc/link.c index 1db162a..23bcc11 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -36,10 +36,12 @@ #include "core.h" #include "link.h" +#include "bcast.h" #include "socket.h" #include "name_distr.h" #include "discover.h" #include "config.h" +#include "netlink.h" #include <linux/pkt_sched.h> @@ -50,6 +52,30 @@ static const char *link_co_err = "Link changeover error, "; static const char *link_rst_msg = "Resetting link "; static const char *link_unk_evt = "Unknown link event "; +static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = { + [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC }, + [TIPC_NLA_LINK_NAME] = { + .type = NLA_STRING, + .len = TIPC_MAX_LINK_NAME + }, + [TIPC_NLA_LINK_MTU] = { .type = NLA_U32 }, + [TIPC_NLA_LINK_BROADCAST] = { .type = NLA_FLAG }, + [TIPC_NLA_LINK_UP] = { .type = NLA_FLAG }, + [TIPC_NLA_LINK_ACTIVE] = { .type = NLA_FLAG }, + [TIPC_NLA_LINK_PROP] = { .type = NLA_NESTED }, + [TIPC_NLA_LINK_STATS] = { .type = NLA_NESTED }, + [TIPC_NLA_LINK_RX] = { .type = NLA_U32 }, + [TIPC_NLA_LINK_TX] = { .type = NLA_U32 } +}; + +/* Properties valid for media, bearar and link */ +static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = { + [TIPC_NLA_PROP_UNSPEC] = { .type = NLA_UNSPEC }, + [TIPC_NLA_PROP_PRIO] = { .type = NLA_U32 }, + [TIPC_NLA_PROP_TOL] = { .type = NLA_U32 }, + [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 } +}; + /* * Out-of-range value for link session numbers */ @@ -123,18 +149,6 @@ static void link_init_max_pkt(struct tipc_link *l_ptr) l_ptr->max_pkt_probes = 0; } -static u32 link_next_sent(struct tipc_link *l_ptr) -{ - if (l_ptr->next_out) - return buf_seqno(l_ptr->next_out); - return mod(l_ptr->next_out_no); -} - -static u32 link_last_sent(struct tipc_link *l_ptr) -{ - return mod(link_next_sent(l_ptr) - 1); -} - /* * Simple non-static link routines (i.e. referenced outside this file) */ @@ -157,14 +171,17 @@ int tipc_link_is_active(struct tipc_link *l_ptr) */ static void link_timeout(struct tipc_link *l_ptr) { + struct sk_buff *skb; + tipc_node_lock(l_ptr->owner); /* update counters used in statistical profiling of send traffic */ - l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; + l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue); l_ptr->stats.queue_sz_counts++; - if (l_ptr->first_out) { - struct tipc_msg *msg = buf_msg(l_ptr->first_out); + skb = skb_peek(&l_ptr->outqueue); + if (skb) { + struct tipc_msg *msg = buf_msg(skb); u32 length = msg_size(msg); if ((msg_user(msg) == MSG_FRAGMENTER) && @@ -192,11 +209,10 @@ static void link_timeout(struct tipc_link *l_ptr) } /* do all other link processing performed on a periodic basis */ - link_state_event(l_ptr, TIMEOUT_EVT); if (l_ptr->next_out) - tipc_link_push_queue(l_ptr); + tipc_link_push_packets(l_ptr); tipc_node_unlock(l_ptr->owner); } @@ -224,9 +240,10 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, char addr_string[16]; u32 peer = n_ptr->addr; - if (n_ptr->link_cnt >= 2) { + if (n_ptr->link_cnt >= MAX_BEARERS) { tipc_addr_string_fill(addr_string, n_ptr->addr); - pr_err("Attempt to establish third link to %s\n", addr_string); + pr_err("Attempt to establish %uth link to %s. Max %u allowed.\n", + n_ptr->link_cnt, addr_string, MAX_BEARERS); return NULL; } @@ -274,7 +291,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, link_init_max_pkt(l_ptr); l_ptr->next_out_no = 1; - __skb_queue_head_init(&l_ptr->waiting_sks); + __skb_queue_head_init(&l_ptr->outqueue); + __skb_queue_head_init(&l_ptr->deferred_queue); + skb_queue_head_init(&l_ptr->waiting_sks); link_reset_statistics(l_ptr); @@ -339,7 +358,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport, return false; TIPC_SKB_CB(buf)->chain_sz = chain_sz; TIPC_SKB_CB(buf)->chain_imp = imp; - __skb_queue_tail(&link->waiting_sks, buf); + skb_queue_tail(&link->waiting_sks, buf); link->stats.link_congs++; return true; } @@ -352,30 +371,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport, */ static void link_prepare_wakeup(struct tipc_link *link) { - struct sk_buff_head *wq = &link->waiting_sks; - struct sk_buff *buf; - uint pend_qsz = link->out_queue_size; + uint pend_qsz = skb_queue_len(&link->outqueue); + struct sk_buff *skb, *tmp; - for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) { - if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp]) + skb_queue_walk_safe(&link->waiting_sks, skb, tmp) { + if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp]) break; - pend_qsz += TIPC_SKB_CB(buf)->chain_sz; - __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq)); + pend_qsz += TIPC_SKB_CB(skb)->chain_sz; + skb_unlink(skb, &link->waiting_sks); + skb_queue_tail(&link->owner->waiting_sks, skb); } } /** - * link_release_outqueue - purge link's outbound message queue - * @l_ptr: pointer to link - */ -static void link_release_outqueue(struct tipc_link *l_ptr) -{ - kfree_skb_list(l_ptr->first_out); - l_ptr->first_out = NULL; - l_ptr->out_queue_size = 0; -} - -/** * tipc_link_reset_fragments - purge link's inbound message fragments queue * @l_ptr: pointer to link */ @@ -391,11 +399,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr) */ void tipc_link_purge_queues(struct tipc_link *l_ptr) { - kfree_skb_list(l_ptr->oldest_deferred_in); - kfree_skb_list(l_ptr->first_out); + __skb_queue_purge(&l_ptr->deferred_queue); + __skb_queue_purge(&l_ptr->outqueue); tipc_link_reset_fragments(l_ptr); - kfree_skb(l_ptr->proto_msg_queue); - l_ptr->proto_msg_queue = NULL; } void tipc_link_reset(struct tipc_link *l_ptr) @@ -427,25 +433,16 @@ void tipc_link_reset(struct tipc_link *l_ptr) } /* Clean up all queues: */ - link_release_outqueue(l_ptr); - kfree_skb(l_ptr->proto_msg_queue); - l_ptr->proto_msg_queue = NULL; - kfree_skb_list(l_ptr->oldest_deferred_in); + __skb_queue_purge(&l_ptr->outqueue); + __skb_queue_purge(&l_ptr->deferred_queue); if (!skb_queue_empty(&l_ptr->waiting_sks)) { skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); owner->action_flags |= TIPC_WAKEUP_USERS; } - l_ptr->retransm_queue_head = 0; - l_ptr->retransm_queue_size = 0; - l_ptr->last_out = NULL; - l_ptr->first_out = NULL; l_ptr->next_out = NULL; l_ptr->unacked_window = 0; l_ptr->checkpoint = 1; l_ptr->next_out_no = 1; - l_ptr->deferred_inqueue_sz = 0; - l_ptr->oldest_deferred_in = NULL; - l_ptr->newest_deferred_in = NULL; l_ptr->fsm_msg_cnt = 0; l_ptr->stale_count = 0; link_reset_statistics(l_ptr); @@ -667,9 +664,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) * - For all other messages we discard the buffer and return -EHOSTUNREACH * - For TIPC internal messages we also reset the link */ -static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) +static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list) { - struct tipc_msg *msg = buf_msg(buf); + struct sk_buff *skb = skb_peek(list); + struct tipc_msg *msg = buf_msg(skb); uint imp = tipc_msg_tot_importance(msg); u32 oport = msg_tot_origport(msg); @@ -682,30 +680,30 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) goto drop; if (unlikely(msg_reroute_cnt(msg))) goto drop; - if (TIPC_SKB_CB(buf)->wakeup_pending) + if (TIPC_SKB_CB(skb)->wakeup_pending) return -ELINKCONG; - if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp)) + if (link_schedule_user(link, oport, skb_queue_len(list), imp)) return -ELINKCONG; drop: - kfree_skb_list(buf); + __skb_queue_purge(list); return -EHOSTUNREACH; } /** * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked * @link: link to use - * @buf: chain of buffers containing message + * @list: chain of buffers containing message + * * Consumes the buffer chain, except when returning -ELINKCONG * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket * user data messages) or -EHOSTUNREACH (all other messages/senders) * Only the socket functions tipc_send_stream() and tipc_send_packet() need * to act on the return value, since they may need to do more send attempts. */ -int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list) { - struct tipc_msg *msg = buf_msg(buf); + struct tipc_msg *msg = buf_msg(skb_peek(list)); uint psz = msg_size(msg); - uint qsz = link->out_queue_size; uint sndlim = link->queue_limit[0]; uint imp = tipc_msg_tot_importance(msg); uint mtu = link->max_pkt; @@ -713,71 +711,83 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) uint seqno = link->next_out_no; uint bc_last_in = link->owner->bclink.last_in; struct tipc_media_addr *addr = &link->media_addr; - struct sk_buff *next = buf->next; + struct sk_buff_head *outqueue = &link->outqueue; + struct sk_buff *skb, *tmp; /* Match queue limits against msg importance: */ - if (unlikely(qsz >= link->queue_limit[imp])) - return tipc_link_cong(link, buf); + if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp])) + return tipc_link_cong(link, list); /* Has valid packet limit been used ? */ if (unlikely(psz > mtu)) { - kfree_skb_list(buf); + __skb_queue_purge(list); return -EMSGSIZE; } /* Prepare each packet for sending, and add to outqueue: */ - while (buf) { - next = buf->next; - msg = buf_msg(buf); + skb_queue_walk_safe(list, skb, tmp) { + __skb_unlink(skb, list); + msg = buf_msg(skb); msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); msg_set_bcast_ack(msg, bc_last_in); - if (!link->first_out) { - link->first_out = buf; - } else if (qsz < sndlim) { - link->last_out->next = buf; - } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { + if (skb_queue_len(outqueue) < sndlim) { + __skb_queue_tail(outqueue, skb); + tipc_bearer_send(link->bearer_id, skb, addr); + link->next_out = NULL; + link->unacked_window = 0; + } else if (tipc_msg_bundle(outqueue, skb, mtu)) { link->stats.sent_bundled++; - buf = next; - next = buf->next; continue; - } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { + } else if (tipc_msg_make_bundle(outqueue, skb, mtu, + link->addr)) { link->stats.sent_bundled++; link->stats.sent_bundles++; - link->last_out->next = buf; if (!link->next_out) - link->next_out = buf; + link->next_out = skb_peek_tail(outqueue); } else { - link->last_out->next = buf; + __skb_queue_tail(outqueue, skb); if (!link->next_out) - link->next_out = buf; - } - - /* Send packet if possible: */ - if (likely(++qsz <= sndlim)) { - tipc_bearer_send(link->bearer_id, buf, addr); - link->next_out = next; - link->unacked_window = 0; + link->next_out = skb; } seqno++; - link->last_out = buf; - buf = next; } link->next_out_no = seqno; - link->out_queue_size = qsz; return 0; } +static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) +{ + __skb_queue_head_init(list); + __skb_queue_tail(list, skb); +} + +static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb) +{ + struct sk_buff_head head; + + skb2list(skb, &head); + return __tipc_link_xmit(link, &head); +} + +int tipc_link_xmit_skb(struct sk_buff *skb, u32 dnode, u32 selector) +{ + struct sk_buff_head head; + + skb2list(skb, &head); + return tipc_link_xmit(&head, dnode, selector); +} + /** * tipc_link_xmit() is the general link level function for message sending - * @buf: chain of buffers containing message + * @list: chain of buffers containing message * @dsz: amount of user data to be sent * @dnode: address of destination node * @selector: a number used for deterministic link selection * Consumes the buffer chain, except when returning -ELINKCONG * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ -int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) +int tipc_link_xmit(struct sk_buff_head *list, u32 dnode, u32 selector) { struct tipc_link *link = NULL; struct tipc_node *node; @@ -788,17 +798,22 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) tipc_node_lock(node); link = node->active_links[selector & 1]; if (link) - rc = __tipc_link_xmit(link, buf); + rc = __tipc_link_xmit(link, list); tipc_node_unlock(node); } if (link) return rc; - if (likely(in_own_node(dnode))) - return tipc_sk_rcv(buf); + if (likely(in_own_node(dnode))) { + /* As a node local message chain never contains more than one + * buffer, we just need to dequeue one SKB buffer from the + * head list. + */ + return tipc_sk_rcv(__skb_dequeue(list)); + } + __skb_queue_purge(list); - kfree_skb_list(buf); return rc; } @@ -812,17 +827,17 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) */ static void tipc_link_sync_xmit(struct tipc_link *link) { - struct sk_buff *buf; + struct sk_buff *skb; struct tipc_msg *msg; - buf = tipc_buf_acquire(INT_H_SIZE); - if (!buf) + skb = tipc_buf_acquire(INT_H_SIZE); + if (!skb) return; - msg = buf_msg(buf); + msg = buf_msg(skb); tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); msg_set_last_bcast(msg, link->owner->bclink.acked); - __tipc_link_xmit(link, buf); + __tipc_link_xmit_skb(link, skb); } /* @@ -842,85 +857,46 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) kfree_skb(buf); } +struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list, + const struct sk_buff *skb) +{ + if (skb_queue_is_last(list, skb)) + return NULL; + return skb->next; +} + /* - * tipc_link_push_packet: Push one unsent packet to the media + * tipc_link_push_packets - push unsent packets to bearer + * + * Push out the unsent messages of a link where congestion + * has abated. Node is locked. + * + * Called with node locked */ -static u32 tipc_link_push_packet(struct tipc_link *l_ptr) -{ - struct sk_buff *buf = l_ptr->first_out; - u32 r_q_size = l_ptr->retransm_queue_size; - u32 r_q_head = l_ptr->retransm_queue_head; - - /* Step to position where retransmission failed, if any, */ - /* consider that buffers may have been released in meantime */ - if (r_q_size && buf) { - u32 last = lesser(mod(r_q_head + r_q_size), - link_last_sent(l_ptr)); - u32 first = buf_seqno(buf); - - while (buf && less(first, r_q_head)) { - first = mod(first + 1); - buf = buf->next; - } - l_ptr->retransm_queue_head = r_q_head = first; - l_ptr->retransm_queue_size = r_q_size = mod(last - first); - } - - /* Continue retransmission now, if there is anything: */ - if (r_q_size && buf) { - msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); - msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); - l_ptr->retransm_queue_head = mod(++r_q_head); - l_ptr->retransm_queue_size = --r_q_size; - l_ptr->stats.retransmitted++; - return 0; - } - - /* Send deferred protocol message, if any: */ - buf = l_ptr->proto_msg_queue; - if (buf) { - msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); - msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); - l_ptr->unacked_window = 0; - kfree_skb(buf); - l_ptr->proto_msg_queue = NULL; - return 0; - } +void tipc_link_push_packets(struct tipc_link *l_ptr) +{ + struct sk_buff_head *outqueue = &l_ptr->outqueue; + struct sk_buff *skb = l_ptr->next_out; + struct tipc_msg *msg; + u32 next, first; - /* Send one deferred data message, if send window not full: */ - buf = l_ptr->next_out; - if (buf) { - struct tipc_msg *msg = buf_msg(buf); - u32 next = msg_seqno(msg); - u32 first = buf_seqno(l_ptr->first_out); + skb_queue_walk_from(outqueue, skb) { + msg = buf_msg(skb); + next = msg_seqno(msg); + first = buf_seqno(skb_peek(outqueue)); if (mod(next - first) < l_ptr->queue_limit[0]) { msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->bearer_id, buf, - &l_ptr->media_addr); if (msg_user(msg) == MSG_BUNDLER) - msg_set_type(msg, BUNDLE_CLOSED); - l_ptr->next_out = buf->next; - return 0; + TIPC_SKB_CB(skb)->bundling = false; + tipc_bearer_send(l_ptr->bearer_id, skb, + &l_ptr->media_addr); + l_ptr->next_out = tipc_skb_queue_next(outqueue, skb); + } else { + break; } } - return 1; -} - -/* - * push_queue(): push out the unsent messages of a link where - * congestion has abated. Node is locked - */ -void tipc_link_push_queue(struct tipc_link *l_ptr) -{ - u32 res; - - do { - res = tipc_link_push_packet(l_ptr); - } while (!res); } void tipc_link_reset_all(struct tipc_node *node) @@ -984,20 +960,20 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, } } -void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, +void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, u32 retransmits) { struct tipc_msg *msg; - if (!buf) + if (!skb) return; - msg = buf_msg(buf); + msg = buf_msg(skb); /* Detect repeated retransmit failures */ if (l_ptr->last_retransmitted == msg_seqno(msg)) { if (++l_ptr->stale_count > 100) { - link_retransmit_failure(l_ptr, buf); + link_retransmit_failure(l_ptr, skb); return; } } else { @@ -1005,38 +981,29 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, l_ptr->stale_count = 1; } - while (retransmits && (buf != l_ptr->next_out) && buf) { - msg = buf_msg(buf); + skb_queue_walk_from(&l_ptr->outqueue, skb) { + if (!retransmits || skb == l_ptr->next_out) + break; + msg = buf_msg(skb); msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); - buf = buf->next; + tipc_bearer_send(l_ptr->bearer_id, skb, &l_ptr->media_addr); retransmits--; l_ptr->stats.retransmitted++; } - - l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; } -/** - * link_insert_deferred_queue - insert deferred messages back into receive chain - */ -static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr, - struct sk_buff *buf) +static void link_retrieve_defq(struct tipc_link *link, + struct sk_buff_head *list) { u32 seq_no; - if (l_ptr->oldest_deferred_in == NULL) - return buf; + if (skb_queue_empty(&link->deferred_queue)) + return; - seq_no = buf_seqno(l_ptr->oldest_deferred_in); - if (seq_no == mod(l_ptr->next_in_no)) { - l_ptr->newest_deferred_in->next = buf; - buf = l_ptr->oldest_deferred_in; - l_ptr->oldest_deferred_in = NULL; - l_ptr->deferred_inqueue_sz = 0; - } - return buf; + seq_no = buf_seqno(skb_peek(&link->deferred_queue)); + if (seq_no == mod(link->next_in_no)) + skb_queue_splice_tail_init(&link->deferred_queue, list); } /** @@ -1096,43 +1063,42 @@ static int link_recv_buf_validate(struct sk_buff *buf) /** * tipc_rcv - process TIPC packets/messages arriving from off-node - * @head: pointer to message buffer chain + * @skb: TIPC packet * @b_ptr: pointer to bearer message arrived on * * Invoked with no locks held. Bearer pointer must point to a valid bearer * structure (i.e. cannot be NULL), but bearer can be inactive. */ -void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) +void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr) { - while (head) { - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; - struct sk_buff *crs; - struct sk_buff *buf = head; - struct tipc_msg *msg; - u32 seq_no; - u32 ackd; - u32 released = 0; + struct sk_buff_head head; + struct tipc_node *n_ptr; + struct tipc_link *l_ptr; + struct sk_buff *skb1, *tmp; + struct tipc_msg *msg; + u32 seq_no; + u32 ackd; + u32 released; - head = head->next; - buf->next = NULL; + skb2list(skb, &head); + while ((skb = __skb_dequeue(&head))) { /* Ensure message is well-formed */ - if (unlikely(!link_recv_buf_validate(buf))) + if (unlikely(!link_recv_buf_validate(skb))) goto discard; /* Ensure message data is a single contiguous unit */ - if (unlikely(skb_linearize(buf))) + if (unlikely(skb_linearize(skb))) goto discard; /* Handle arrival of a non-unicast link message */ - msg = buf_msg(buf); + msg = buf_msg(skb); if (unlikely(msg_non_seq(msg))) { if (msg_user(msg) == LINK_CONFIG) - tipc_disc_rcv(buf, b_ptr); + tipc_disc_rcv(skb, b_ptr); else - tipc_bclink_rcv(buf); + tipc_bclink_rcv(skb); continue; } @@ -1171,22 +1137,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (n_ptr->bclink.recv_permitted) tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); - crs = l_ptr->first_out; - while ((crs != l_ptr->next_out) && - less_eq(buf_seqno(crs), ackd)) { - struct sk_buff *next = crs->next; - kfree_skb(crs); - crs = next; - released++; - } - if (released) { - l_ptr->first_out = crs; - l_ptr->out_queue_size -= released; + released = 0; + skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) { + if (skb1 == l_ptr->next_out || + more(buf_seqno(skb1), ackd)) + break; + __skb_unlink(skb1, &l_ptr->outqueue); + kfree_skb(skb1); + released = 1; } /* Try sending any messages link endpoint has pending */ if (unlikely(l_ptr->next_out)) - tipc_link_push_queue(l_ptr); + tipc_link_push_packets(l_ptr); if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { link_prepare_wakeup(l_ptr); @@ -1196,8 +1159,8 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) /* Process the incoming packet */ if (unlikely(!link_working_working(l_ptr))) { if (msg_user(msg) == LINK_PROTOCOL) { - tipc_link_proto_rcv(l_ptr, buf); - head = link_insert_deferred_queue(l_ptr, head); + tipc_link_proto_rcv(l_ptr, skb); + link_retrieve_defq(l_ptr, &head); tipc_node_unlock(n_ptr); continue; } @@ -1207,8 +1170,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (link_working_working(l_ptr)) { /* Re-insert buffer in front of queue */ - buf->next = head; - head = buf; + __skb_queue_head(&head, skb); tipc_node_unlock(n_ptr); continue; } @@ -1217,33 +1179,33 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) /* Link is now in state WORKING_WORKING */ if (unlikely(seq_no != mod(l_ptr->next_in_no))) { - link_handle_out_of_seq_msg(l_ptr, buf); - head = link_insert_deferred_queue(l_ptr, head); + link_handle_out_of_seq_msg(l_ptr, skb); + link_retrieve_defq(l_ptr, &head); tipc_node_unlock(n_ptr); continue; } l_ptr->next_in_no++; - if (unlikely(l_ptr->oldest_deferred_in)) - head = link_insert_deferred_queue(l_ptr, head); + if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) + link_retrieve_defq(l_ptr, &head); if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { l_ptr->stats.sent_acks++; tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); } - if (tipc_link_prepare_input(l_ptr, &buf)) { + if (tipc_link_prepare_input(l_ptr, &skb)) { tipc_node_unlock(n_ptr); continue; } tipc_node_unlock(n_ptr); - msg = buf_msg(buf); - if (tipc_link_input(l_ptr, buf) != 0) + + if (tipc_link_input(l_ptr, skb) != 0) goto discard; continue; unlock_discard: tipc_node_unlock(n_ptr); discard: - kfree_skb(buf); + kfree_skb(skb); } } @@ -1326,48 +1288,37 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf) * * Returns increase in queue length (i.e. 0 or 1) */ -u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, - struct sk_buff *buf) +u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb) { - struct sk_buff *queue_buf; - struct sk_buff **prev; - u32 seq_no = buf_seqno(buf); - - buf->next = NULL; + struct sk_buff *skb1; + u32 seq_no = buf_seqno(skb); /* Empty queue ? */ - if (*head == NULL) { - *head = *tail = buf; + if (skb_queue_empty(list)) { + __skb_queue_tail(list, skb); return 1; } /* Last ? */ - if (less(buf_seqno(*tail), seq_no)) { - (*tail)->next = buf; - *tail = buf; + if (less(buf_seqno(skb_peek_tail(list)), seq_no)) { + __skb_queue_tail(list, skb); return 1; } /* Locate insertion point in queue, then insert; discard if duplicate */ - prev = head; - queue_buf = *head; - for (;;) { - u32 curr_seqno = buf_seqno(queue_buf); + skb_queue_walk(list, skb1) { + u32 curr_seqno = buf_seqno(skb1); if (seq_no == curr_seqno) { - kfree_skb(buf); + kfree_skb(skb); return 0; } if (less(seq_no, curr_seqno)) break; - - prev = &queue_buf->next; - queue_buf = queue_buf->next; } - buf->next = queue_buf; - *prev = buf; + __skb_queue_before(list, skb1, skb); return 1; } @@ -1397,15 +1348,14 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, return; } - if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, - &l_ptr->newest_deferred_in, buf)) { - l_ptr->deferred_inqueue_sz++; + if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) { l_ptr->stats.deferred_recv++; TIPC_SKB_CB(buf)->deferred = true; - if ((l_ptr->deferred_inqueue_sz % 16) == 1) + if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1) tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); - } else + } else { l_ptr->stats.duplicates++; + } } /* @@ -1419,12 +1369,6 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, u32 msg_size = sizeof(l_ptr->proto_msg); int r_flag; - /* Discard any previous message that was deferred due to congestion */ - if (l_ptr->proto_msg_queue) { - kfree_skb(l_ptr->proto_msg_queue); - l_ptr->proto_msg_queue = NULL; - } - /* Don't send protocol message during link changeover */ if (l_ptr->exp_msg_count) return; @@ -1447,8 +1391,8 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, if (l_ptr->next_out) next_sent = buf_seqno(l_ptr->next_out); msg_set_next_sent(msg, next_sent); - if (l_ptr->oldest_deferred_in) { - u32 rec = buf_seqno(l_ptr->oldest_deferred_in); + if (!skb_queue_empty(&l_ptr->deferred_queue)) { + u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue)); gap = mod(rec - mod(l_ptr->next_in_no)); } msg_set_seq_gap(msg, gap); @@ -1636,7 +1580,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) } if (msg_seq_gap(msg)) { l_ptr->stats.recv_nacks++; - tipc_link_retransmit(l_ptr, l_ptr->first_out, + tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue), msg_seq_gap(msg)); } break; @@ -1655,7 +1599,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, u32 selector) { struct tipc_link *tunnel; - struct sk_buff *buf; + struct sk_buff *skb; u32 length = msg_size(msg); tunnel = l_ptr->owner->active_links[selector & 1]; @@ -1664,14 +1608,14 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, return; } msg_set_size(tunnel_hdr, length + INT_H_SIZE); - buf = tipc_buf_acquire(length + INT_H_SIZE); - if (!buf) { + skb = tipc_buf_acquire(length + INT_H_SIZE); + if (!skb) { pr_warn("%sunable to send tunnel msg\n", link_co_err); return; } - skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); - __tipc_link_xmit(tunnel, buf); + skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length); + __tipc_link_xmit_skb(tunnel, skb); } @@ -1683,10 +1627,10 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, */ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) { - u32 msgcount = l_ptr->out_queue_size; - struct sk_buff *crs = l_ptr->first_out; + u32 msgcount = skb_queue_len(&l_ptr->outqueue); struct tipc_link *tunnel = l_ptr->owner->active_links[0]; struct tipc_msg tunnel_hdr; + struct sk_buff *skb; int split_bundles; if (!tunnel) @@ -1697,14 +1641,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); msg_set_msgcnt(&tunnel_hdr, msgcount); - if (!l_ptr->first_out) { - struct sk_buff *buf; - - buf = tipc_buf_acquire(INT_H_SIZE); - if (buf) { - skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); + if (skb_queue_empty(&l_ptr->outqueue)) { + skb = tipc_buf_acquire(INT_H_SIZE); + if (skb) { + skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); msg_set_size(&tunnel_hdr, INT_H_SIZE); - __tipc_link_xmit(tunnel, buf); + __tipc_link_xmit_skb(tunnel, skb); } else { pr_warn("%sunable to send changeover msg\n", link_co_err); @@ -1715,8 +1657,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) split_bundles = (l_ptr->owner->active_links[0] != l_ptr->owner->active_links[1]); - while (crs) { - struct tipc_msg *msg = buf_msg(crs); + skb_queue_walk(&l_ptr->outqueue, skb) { + struct tipc_msg *msg = buf_msg(skb); if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { struct tipc_msg *m = msg_get_wrapped(msg); @@ -1734,7 +1676,6 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, msg_link_selector(msg)); } - crs = crs->next; } } @@ -1750,17 +1691,16 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *tunnel) { - struct sk_buff *iter; + struct sk_buff *skb; struct tipc_msg tunnel_hdr; tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); - msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); + msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue)); msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); - iter = l_ptr->first_out; - while (iter) { - struct sk_buff *outbuf; - struct tipc_msg *msg = buf_msg(iter); + skb_queue_walk(&l_ptr->outqueue, skb) { + struct sk_buff *outskb; + struct tipc_msg *msg = buf_msg(skb); u32 length = msg_size(msg); if (msg_user(msg) == MSG_BUNDLER) @@ -1768,19 +1708,18 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); msg_set_size(&tunnel_hdr, length + INT_H_SIZE); - outbuf = tipc_buf_acquire(length + INT_H_SIZE); - if (outbuf == NULL) { + outskb = tipc_buf_acquire(length + INT_H_SIZE); + if (outskb == NULL) { pr_warn("%sunable to send duplicate msg\n", link_co_err); return; } - skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, + skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data, length); - __tipc_link_xmit(tunnel, outbuf); + __tipc_link_xmit_skb(tunnel, outskb); if (!tipc_link_is_up(l_ptr)) return; - iter = iter->next; } } @@ -2375,3 +2314,435 @@ static void link_print(struct tipc_link *l_ptr, const char *str) else pr_cont("\n"); } + +/* Parse and validate nested (link) properties valid for media, bearer and link + */ +int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]) +{ + int err; + + err = nla_parse_nested(props, TIPC_NLA_PROP_MAX, prop, + tipc_nl_prop_policy); + if (err) + return err; + + if (props[TIPC_NLA_PROP_PRIO]) { + u32 prio; + + prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]); + if (prio > TIPC_MAX_LINK_PRI) + return -EINVAL; + } + + if (props[TIPC_NLA_PROP_TOL]) { + u32 tol; + + tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]); + if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL)) + return -EINVAL; + } + + if (props[TIPC_NLA_PROP_WIN]) { + u32 win; + + win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); + if ((win < TIPC_MIN_LINK_WIN) || (win > TIPC_MAX_LINK_WIN)) + return -EINVAL; + } + + return 0; +} + +int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info) +{ + int err; + int res = 0; + int bearer_id; + char *name; + struct tipc_link *link; + struct tipc_node *node; + struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1]; + + if (!info->attrs[TIPC_NLA_LINK]) + return -EINVAL; + + err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX, + info->attrs[TIPC_NLA_LINK], + tipc_nl_link_policy); + if (err) + return err; + + if (!attrs[TIPC_NLA_LINK_NAME]) + return -EINVAL; + + name = nla_data(attrs[TIPC_NLA_LINK_NAME]); + + node = tipc_link_find_owner(name, &bearer_id); + if (!node) + return -EINVAL; + + tipc_node_lock(node); + + link = node->links[bearer_id]; + if (!link) { + res = -EINVAL; + goto out; + } + + if (attrs[TIPC_NLA_LINK_PROP]) { + struct nlattr *props[TIPC_NLA_PROP_MAX + 1]; + + err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], + props); + if (err) { + res = err; + goto out; + } + + if (props[TIPC_NLA_PROP_TOL]) { + u32 tol; + + tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]); + link_set_supervision_props(link, tol); + tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0, 0); + } + if (props[TIPC_NLA_PROP_PRIO]) { + u32 prio; + + prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]); + link->priority = prio; + tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio, 0); + } + if (props[TIPC_NLA_PROP_WIN]) { + u32 win; + + win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); + tipc_link_set_queue_limits(link, win); + } + } + +out: + tipc_node_unlock(node); + + return res; +} + +static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s) +{ + int i; + struct nlattr *stats; + + struct nla_map { + u32 key; + u32 val; + }; + + struct nla_map map[] = { + {TIPC_NLA_STATS_RX_INFO, s->recv_info}, + {TIPC_NLA_STATS_RX_FRAGMENTS, s->recv_fragments}, + {TIPC_NLA_STATS_RX_FRAGMENTED, s->recv_fragmented}, + {TIPC_NLA_STATS_RX_BUNDLES, s->recv_bundles}, + {TIPC_NLA_STATS_RX_BUNDLED, s->recv_bundled}, + {TIPC_NLA_STATS_TX_INFO, s->sent_info}, + {TIPC_NLA_STATS_TX_FRAGMENTS, s->sent_fragments}, + {TIPC_NLA_STATS_TX_FRAGMENTED, s->sent_fragmented}, + {TIPC_NLA_STATS_TX_BUNDLES, s->sent_bundles}, + {TIPC_NLA_STATS_TX_BUNDLED, s->sent_bundled}, + {TIPC_NLA_STATS_MSG_PROF_TOT, (s->msg_length_counts) ? + s->msg_length_counts : 1}, + {TIPC_NLA_STATS_MSG_LEN_CNT, s->msg_length_counts}, + {TIPC_NLA_STATS_MSG_LEN_TOT, s->msg_lengths_total}, + {TIPC_NLA_STATS_MSG_LEN_P0, s->msg_length_profile[0]}, + {TIPC_NLA_STATS_MSG_LEN_P1, s->msg_length_profile[1]}, + {TIPC_NLA_STATS_MSG_LEN_P2, s->msg_length_profile[2]}, + {TIPC_NLA_STATS_MSG_LEN_P3, s->msg_length_profile[3]}, + {TIPC_NLA_STATS_MSG_LEN_P4, s->msg_length_profile[4]}, + {TIPC_NLA_STATS_MSG_LEN_P5, s->msg_length_profile[5]}, + {TIPC_NLA_STATS_MSG_LEN_P6, s->msg_length_profile[6]}, + {TIPC_NLA_STATS_RX_STATES, s->recv_states}, + {TIPC_NLA_STATS_RX_PROBES, s->recv_probes}, + {TIPC_NLA_STATS_RX_NACKS, s->recv_nacks}, + {TIPC_NLA_STATS_RX_DEFERRED, s->deferred_recv}, + {TIPC_NLA_STATS_TX_STATES, s->sent_states}, + {TIPC_NLA_STATS_TX_PROBES, s->sent_probes}, + {TIPC_NLA_STATS_TX_NACKS, s->sent_nacks}, + {TIPC_NLA_STATS_TX_ACKS, s->sent_acks}, + {TIPC_NLA_STATS_RETRANSMITTED, s->retransmitted}, + {TIPC_NLA_STATS_DUPLICATES, s->duplicates}, + {TIPC_NLA_STATS_LINK_CONGS, s->link_congs}, + {TIPC_NLA_STATS_MAX_QUEUE, s->max_queue_sz}, + {TIPC_NLA_STATS_AVG_QUEUE, s->queue_sz_counts ? + (s->accu_queue_sz / s->queue_sz_counts) : 0} + }; + + stats = nla_nest_start(skb, TIPC_NLA_LINK_STATS); + if (!stats) + return -EMSGSIZE; + + for (i = 0; i < ARRAY_SIZE(map); i++) + if (nla_put_u32(skb, map[i].key, map[i].val)) + goto msg_full; + + nla_nest_end(skb, stats); + + return 0; +msg_full: + nla_nest_cancel(skb, stats); + + return -EMSGSIZE; +} + +/* Caller should hold appropriate locks to protect the link */ +static int __tipc_nl_add_link(struct tipc_nl_msg *msg, struct tipc_link *link) +{ + int err; + void *hdr; + struct nlattr *attrs; + struct nlattr *prop; + + hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family, + NLM_F_MULTI, TIPC_NL_LINK_GET); + if (!hdr) + return -EMSGSIZE; + + attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK); + if (!attrs) + goto msg_full; + + if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, link->name)) + goto attr_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST, + tipc_cluster_mask(tipc_own_addr))) + goto attr_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->max_pkt)) + goto attr_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->next_in_no)) + goto attr_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, link->next_out_no)) + goto attr_msg_full; + + if (tipc_link_is_up(link)) + if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) + goto attr_msg_full; + if (tipc_link_is_active(link)) + if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE)) + goto attr_msg_full; + + prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP); + if (!prop) + goto attr_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority)) + goto prop_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance)) + goto prop_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, + link->queue_limit[TIPC_LOW_IMPORTANCE])) + goto prop_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority)) + goto prop_msg_full; + nla_nest_end(msg->skb, prop); + + err = __tipc_nl_add_stats(msg->skb, &link->stats); + if (err) + goto attr_msg_full; + + nla_nest_end(msg->skb, attrs); + genlmsg_end(msg->skb, hdr); + + return 0; + +prop_msg_full: + nla_nest_cancel(msg->skb, prop); +attr_msg_full: + nla_nest_cancel(msg->skb, attrs); +msg_full: + genlmsg_cancel(msg->skb, hdr); + + return -EMSGSIZE; +} + +/* Caller should hold node lock */ +static int __tipc_nl_add_node_links(struct tipc_nl_msg *msg, + struct tipc_node *node, + u32 *prev_link) +{ + u32 i; + int err; + + for (i = *prev_link; i < MAX_BEARERS; i++) { + *prev_link = i; + + if (!node->links[i]) + continue; + + err = __tipc_nl_add_link(msg, node->links[i]); + if (err) + return err; + } + *prev_link = 0; + + return 0; +} + +int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb) +{ + struct tipc_node *node; + struct tipc_nl_msg msg; + u32 prev_node = cb->args[0]; + u32 prev_link = cb->args[1]; + int done = cb->args[2]; + int err; + + if (done) + return 0; + + msg.skb = skb; + msg.portid = NETLINK_CB(cb->skb).portid; + msg.seq = cb->nlh->nlmsg_seq; + + rcu_read_lock(); + + if (prev_node) { + node = tipc_node_find(prev_node); + if (!node) { + /* We never set seq or call nl_dump_check_consistent() + * this means that setting prev_seq here will cause the + * consistence check to fail in the netlink callback + * handler. Resulting in the last NLMSG_DONE message + * having the NLM_F_DUMP_INTR flag set. + */ + cb->prev_seq = 1; + goto out; + } + + list_for_each_entry_continue_rcu(node, &tipc_node_list, list) { + tipc_node_lock(node); + err = __tipc_nl_add_node_links(&msg, node, &prev_link); + tipc_node_unlock(node); + if (err) + goto out; + + prev_node = node->addr; + } + } else { + err = tipc_nl_add_bc_link(&msg); + if (err) + goto out; + + list_for_each_entry_rcu(node, &tipc_node_list, list) { + tipc_node_lock(node); + err = __tipc_nl_add_node_links(&msg, node, &prev_link); + tipc_node_unlock(node); + if (err) + goto out; + + prev_node = node->addr; + } + } + done = 1; +out: + rcu_read_unlock(); + + cb->args[0] = prev_node; + cb->args[1] = prev_link; + cb->args[2] = done; + + return skb->len; +} + +int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info) +{ + struct sk_buff *ans_skb; + struct tipc_nl_msg msg; + struct tipc_link *link; + struct tipc_node *node; + char *name; + int bearer_id; + int err; + + if (!info->attrs[TIPC_NLA_LINK_NAME]) + return -EINVAL; + + name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]); + node = tipc_link_find_owner(name, &bearer_id); + if (!node) + return -EINVAL; + + ans_skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL); + if (!ans_skb) + return -ENOMEM; + + msg.skb = ans_skb; + msg.portid = info->snd_portid; + msg.seq = info->snd_seq; + + tipc_node_lock(node); + link = node->links[bearer_id]; + if (!link) { + err = -EINVAL; + goto err_out; + } + + err = __tipc_nl_add_link(&msg, link); + if (err) + goto err_out; + + tipc_node_unlock(node); + + return genlmsg_reply(ans_skb, info); + +err_out: + tipc_node_unlock(node); + nlmsg_free(ans_skb); + + return err; +} + +int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info) +{ + int err; + char *link_name; + unsigned int bearer_id; + struct tipc_link *link; + struct tipc_node *node; + struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1]; + + if (!info->attrs[TIPC_NLA_LINK]) + return -EINVAL; + + err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX, + info->attrs[TIPC_NLA_LINK], + tipc_nl_link_policy); + if (err) + return err; + + if (!attrs[TIPC_NLA_LINK_NAME]) + return -EINVAL; + + link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]); + + if (strcmp(link_name, tipc_bclink_name) == 0) { + err = tipc_bclink_reset_stats(); + if (err) + return err; + return 0; + } + + node = tipc_link_find_owner(link_name, &bearer_id); + if (!node) + return -EINVAL; + + tipc_node_lock(node); + + link = node->links[bearer_id]; + if (!link) { + tipc_node_unlock(node); + return -EINVAL; + } + + link_reset_statistics(link); + + tipc_node_unlock(node); + + return 0; +} diff --git a/net/tipc/link.h b/net/tipc/link.h index b567a34..55812e8 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -37,6 +37,7 @@ #ifndef _TIPC_LINK_H #define _TIPC_LINK_H +#include <net/genetlink.h> #include "msg.h" #include "node.h" @@ -118,20 +119,13 @@ struct tipc_stats { * @max_pkt: current maximum packet size for this link * @max_pkt_target: desired maximum packet size for this link * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target) - * @out_queue_size: # of messages in outbound message queue - * @first_out: ptr to first outbound message in queue - * @last_out: ptr to last outbound message in queue + * @outqueue: outbound message queue * @next_out_no: next sequence number to use for outbound messages * @last_retransmitted: sequence number of most recently retransmitted message * @stale_count: # of identical retransmit requests made by peer * @next_in_no: next sequence number to expect for inbound messages - * @deferred_inqueue_sz: # of messages in inbound message queue - * @oldest_deferred_in: ptr to first inbound message in queue - * @newest_deferred_in: ptr to last inbound message in queue + * @deferred_queue: deferred queue saved OOS b'cast message received from node * @unacked_window: # of inbound messages rx'd without ack'ing back to peer - * @proto_msg_queue: ptr to (single) outbound control message - * @retransm_queue_size: number of messages to retransmit - * @retransm_queue_head: sequence number of first message to retransmit * @next_out: ptr to first unsent outbound message in queue * @waiting_sks: linked list of sockets waiting for link congestion to abate * @long_msg_seq_no: next identifier to use for outbound fragmented messages @@ -175,24 +169,17 @@ struct tipc_link { u32 max_pkt_probes; /* Sending */ - u32 out_queue_size; - struct sk_buff *first_out; - struct sk_buff *last_out; + struct sk_buff_head outqueue; u32 next_out_no; u32 last_retransmitted; u32 stale_count; /* Reception */ u32 next_in_no; - u32 deferred_inqueue_sz; - struct sk_buff *oldest_deferred_in; - struct sk_buff *newest_deferred_in; + struct sk_buff_head deferred_queue; u32 unacked_window; /* Congestion handling */ - struct sk_buff *proto_msg_queue; - u32 retransm_queue_size; - u32 retransm_queue_head; struct sk_buff *next_out; struct sk_buff_head waiting_sks; @@ -226,18 +213,26 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, void tipc_link_reset_all(struct tipc_node *node); void tipc_link_reset(struct tipc_link *l_ptr); void tipc_link_reset_list(unsigned int bearer_id); -int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); -int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf); +int tipc_link_xmit_skb(struct sk_buff *skb, u32 dest, u32 selector); +int tipc_link_xmit(struct sk_buff_head *list, u32 dest, u32 selector); +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list); u32 tipc_link_get_max_pkt(u32 dest, u32 selector); void tipc_link_bundle_rcv(struct sk_buff *buf); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); -void tipc_link_push_queue(struct tipc_link *l_ptr); -u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, - struct sk_buff *buf); +void tipc_link_push_packets(struct tipc_link *l_ptr); +u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf); void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *start, u32 retransmits); +struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list, + const struct sk_buff *skb); + +int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb); +int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info); +int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info); +int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info); +int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); /* * Link sequence number manipulation routines (uses modulo 2**16 arithmetic) @@ -252,18 +247,14 @@ static inline u32 mod(u32 x) return x & 0xffffu; } -static inline int between(u32 lower, u32 upper, u32 n) +static inline int less_eq(u32 left, u32 right) { - if ((lower < n) && (n < upper)) - return 1; - if ((upper < lower) && ((n > lower) || (n < upper))) - return 1; - return 0; + return mod(right - left) < 32768u; } -static inline int less_eq(u32 left, u32 right) +static inline int more(u32 left, u32 right) { - return mod(right - left) < 32768u; + return !less_eq(left, right); } static inline int less(u32 left, u32 right) @@ -302,7 +293,7 @@ static inline int link_reset_reset(struct tipc_link *l_ptr) static inline int link_congested(struct tipc_link *l_ptr) { - return l_ptr->out_queue_size >= l_ptr->queue_limit[0]; + return skb_queue_len(&l_ptr->outqueue) >= l_ptr->queue_limit[0]; } #endif diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 74745a4..a687b30 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -91,7 +91,7 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, * @*headbuf: in: NULL for first frag, otherwise value returned from prev call * out: set when successful non-complete reassembly, otherwise NULL * @*buf: in: the buffer to append. Always defined - * out: head buf after sucessful complete reassembly, otherwise NULL + * out: head buf after successful complete reassembly, otherwise NULL * Returns 1 when reassembly complete, otherwise 0 */ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) @@ -162,15 +162,16 @@ err: /** * tipc_msg_build - create buffer chain containing specified header and data * @mhdr: Message header, to be prepended to data - * @iov: User data + * @m: User message * @offset: Posision in iov to start copying from * @dsz: Total length of user data * @pktmax: Max packet size that can be used - * @chain: Buffer or chain of buffers to be returned to caller + * @list: Buffer or chain of buffers to be returned to caller + * * Returns message data size or errno: -ENOMEM, -EFAULT */ -int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, - int offset, int dsz, int pktmax , struct sk_buff **chain) +int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, + int dsz, int pktmax, struct sk_buff_head *list) { int mhsz = msg_hdr_sz(mhdr); int msz = mhsz + dsz; @@ -179,22 +180,22 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, int pktrem = pktmax; int drem = dsz; struct tipc_msg pkthdr; - struct sk_buff *buf, *prev; + struct sk_buff *skb; char *pktpos; int rc; - uint chain_sz = 0; + msg_set_size(mhdr, msz); /* No fragmentation needed? */ if (likely(msz <= pktmax)) { - buf = tipc_buf_acquire(msz); - *chain = buf; - if (unlikely(!buf)) + skb = tipc_buf_acquire(msz); + if (unlikely(!skb)) return -ENOMEM; - skb_copy_to_linear_data(buf, mhdr, mhsz); - pktpos = buf->data + mhsz; - TIPC_SKB_CB(buf)->chain_sz = 1; - if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz)) + __skb_queue_tail(list, skb); + skb_copy_to_linear_data(skb, mhdr, mhsz); + pktpos = skb->data + mhsz; + if (!dsz || !memcpy_fromiovecend(pktpos, m->msg_iter.iov, offset, + dsz)) return dsz; rc = -EFAULT; goto error; @@ -207,15 +208,15 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, msg_set_fragm_no(&pkthdr, pktno); /* Prepare first fragment */ - *chain = buf = tipc_buf_acquire(pktmax); - if (!buf) + skb = tipc_buf_acquire(pktmax); + if (!skb) return -ENOMEM; - chain_sz = 1; - pktpos = buf->data; - skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); + __skb_queue_tail(list, skb); + pktpos = skb->data; + skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE); pktpos += INT_H_SIZE; pktrem -= INT_H_SIZE; - skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz); + skb_copy_to_linear_data_offset(skb, INT_H_SIZE, mhdr, mhsz); pktpos += mhsz; pktrem -= mhsz; @@ -223,7 +224,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, if (drem < pktrem) pktrem = drem; - if (memcpy_fromiovecend(pktpos, iov, offset, pktrem)) { + if (memcpy_fromiovecend(pktpos, m->msg_iter.iov, offset, pktrem)) { rc = -EFAULT; goto error; } @@ -238,43 +239,41 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, pktsz = drem + INT_H_SIZE; else pktsz = pktmax; - prev = buf; - buf = tipc_buf_acquire(pktsz); - if (!buf) { + skb = tipc_buf_acquire(pktsz); + if (!skb) { rc = -ENOMEM; goto error; } - chain_sz++; - prev->next = buf; + __skb_queue_tail(list, skb); msg_set_type(&pkthdr, FRAGMENT); msg_set_size(&pkthdr, pktsz); msg_set_fragm_no(&pkthdr, ++pktno); - skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); - pktpos = buf->data + INT_H_SIZE; + skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE); + pktpos = skb->data + INT_H_SIZE; pktrem = pktsz - INT_H_SIZE; } while (1); - TIPC_SKB_CB(*chain)->chain_sz = chain_sz; - msg_set_type(buf_msg(buf), LAST_FRAGMENT); + msg_set_type(buf_msg(skb), LAST_FRAGMENT); return dsz; error: - kfree_skb_list(*chain); - *chain = NULL; + __skb_queue_purge(list); + __skb_queue_head_init(list); return rc; } /** * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one - * @bbuf: the existing buffer ("bundle") - * @buf: buffer to be appended + * @list: the buffer chain of the existing buffer ("bundle") + * @skb: buffer to be appended * @mtu: max allowable size for the bundle buffer * Consumes buffer if successful * Returns true if bundling could be performed, otherwise false */ -bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu) +bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu) { - struct tipc_msg *bmsg = buf_msg(bbuf); - struct tipc_msg *msg = buf_msg(buf); + struct sk_buff *bskb = skb_peek_tail(list); + struct tipc_msg *bmsg = buf_msg(bskb); + struct tipc_msg *msg = buf_msg(skb); unsigned int bsz = msg_size(bmsg); unsigned int msz = msg_size(msg); u32 start = align(bsz); @@ -289,35 +288,36 @@ bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu) return false; if (likely(msg_user(bmsg) != MSG_BUNDLER)) return false; - if (likely(msg_type(bmsg) != BUNDLE_OPEN)) + if (likely(!TIPC_SKB_CB(bskb)->bundling)) return false; - if (unlikely(skb_tailroom(bbuf) < (pad + msz))) + if (unlikely(skb_tailroom(bskb) < (pad + msz))) return false; if (unlikely(max < (start + msz))) return false; - skb_put(bbuf, pad + msz); - skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz); + skb_put(bskb, pad + msz); + skb_copy_to_linear_data_offset(bskb, start, skb->data, msz); msg_set_size(bmsg, start + msz); msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1); - bbuf->next = buf->next; - kfree_skb(buf); + kfree_skb(skb); return true; } /** * tipc_msg_make_bundle(): Create bundle buf and append message to its tail - * @buf: buffer to be appended and replaced - * @mtu: max allowable size for the bundle buffer, inclusive header + * @list: the buffer chain + * @skb: buffer to be appended and replaced + * @mtu: max allowable size for the bundle buffer, inclusive header * @dnode: destination node for message. (Not always present in header) * Replaces buffer if successful - * Returns true if sucess, otherwise false + * Returns true if success, otherwise false */ -bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode) +bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb, + u32 mtu, u32 dnode) { - struct sk_buff *bbuf; + struct sk_buff *bskb; struct tipc_msg *bmsg; - struct tipc_msg *msg = buf_msg(*buf); + struct tipc_msg *msg = buf_msg(skb); u32 msz = msg_size(msg); u32 max = mtu - INT_H_SIZE; @@ -330,20 +330,19 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode) if (msz > (max / 2)) return false; - bbuf = tipc_buf_acquire(max); - if (!bbuf) + bskb = tipc_buf_acquire(max); + if (!bskb) return false; - skb_trim(bbuf, INT_H_SIZE); - bmsg = buf_msg(bbuf); - tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode); + skb_trim(bskb, INT_H_SIZE); + bmsg = buf_msg(bskb); + tipc_msg_init(bmsg, MSG_BUNDLER, 0, INT_H_SIZE, dnode); msg_set_seqno(bmsg, msg_seqno(msg)); msg_set_ack(bmsg, msg_ack(msg)); msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); - bbuf->next = (*buf)->next; - tipc_msg_bundle(bbuf, *buf, mtu); - *buf = bbuf; - return true; + TIPC_SKB_CB(bskb)->bundling = true; + __skb_queue_tail(list, bskb); + return tipc_msg_bundle(list, skb, mtu); } /** @@ -429,22 +428,23 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode) /* tipc_msg_reassemble() - clone a buffer chain of fragments and * reassemble the clones into one message */ -struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain) +struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list) { - struct sk_buff *buf = chain; - struct sk_buff *frag = buf; + struct sk_buff *skb; + struct sk_buff *frag = NULL; struct sk_buff *head = NULL; int hdr_sz; /* Copy header if single buffer */ - if (!buf->next) { - hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf)); - return __pskb_copy(buf, hdr_sz, GFP_ATOMIC); + if (skb_queue_len(list) == 1) { + skb = skb_peek(list); + hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb)); + return __pskb_copy(skb, hdr_sz, GFP_ATOMIC); } /* Clone all fragments and reassemble */ - while (buf) { - frag = skb_clone(buf, GFP_ATOMIC); + skb_queue_walk(list, skb) { + frag = skb_clone(skb, GFP_ATOMIC); if (!frag) goto error; frag->next = NULL; @@ -452,7 +452,6 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain) break; if (!head) goto error; - buf = buf->next; } return frag; error: diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 0ea7b69..d5c83d7 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -464,11 +464,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) #define FRAGMENT 1 #define LAST_FRAGMENT 2 -/* Bundling protocol message types - */ -#define BUNDLE_OPEN 0 -#define BUNDLE_CLOSED 1 - /* * Link management protocol message types */ @@ -739,13 +734,14 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); -bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu); +bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu); -bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode); +bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb, + u32 mtu, u32 dnode); -int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, - int offset, int dsz, int mtu , struct sk_buff **chain); +int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, + int dsz, int mtu, struct sk_buff_head *list); -struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain); +struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); #endif diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 376d2bb..ba6083d 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -38,39 +38,6 @@ #include "link.h" #include "name_distr.h" -/** - * struct publ_list - list of publications made by this node - * @list: circular list of publications - * @list_size: number of entries in list - */ -struct publ_list { - struct list_head list; - u32 size; -}; - -static struct publ_list publ_zone = { - .list = LIST_HEAD_INIT(publ_zone.list), - .size = 0, -}; - -static struct publ_list publ_cluster = { - .list = LIST_HEAD_INIT(publ_cluster.list), - .size = 0, -}; - -static struct publ_list publ_node = { - .list = LIST_HEAD_INIT(publ_node.list), - .size = 0, -}; - -static struct publ_list *publ_lists[] = { - NULL, - &publ_zone, /* publ_lists[TIPC_ZONE_SCOPE] */ - &publ_cluster, /* publ_lists[TIPC_CLUSTER_SCOPE] */ - &publ_node /* publ_lists[TIPC_NODE_SCOPE] */ -}; - - int sysctl_tipc_named_timeout __read_mostly = 2000; /** @@ -114,9 +81,9 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest) return buf; } -void named_cluster_distribute(struct sk_buff *buf) +void named_cluster_distribute(struct sk_buff *skb) { - struct sk_buff *obuf; + struct sk_buff *oskb; struct tipc_node *node; u32 dnode; @@ -127,15 +94,15 @@ void named_cluster_distribute(struct sk_buff *buf) continue; if (!tipc_node_active_links(node)) continue; - obuf = skb_copy(buf, GFP_ATOMIC); - if (!obuf) + oskb = skb_copy(skb, GFP_ATOMIC); + if (!oskb) break; - msg_set_destnode(buf_msg(obuf), dnode); - tipc_link_xmit(obuf, dnode, dnode); + msg_set_destnode(buf_msg(oskb), dnode); + tipc_link_xmit_skb(oskb, dnode, dnode); } rcu_read_unlock(); - kfree_skb(buf); + kfree_skb(skb); } /** @@ -146,8 +113,8 @@ struct sk_buff *tipc_named_publish(struct publication *publ) struct sk_buff *buf; struct distr_item *item; - list_add_tail(&publ->local_list, &publ_lists[publ->scope]->list); - publ_lists[publ->scope]->size++; + list_add_tail_rcu(&publ->local_list, + &tipc_nametbl->publ_list[publ->scope]); if (publ->scope == TIPC_NODE_SCOPE) return NULL; @@ -172,7 +139,6 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ) struct distr_item *item; list_del(&publ->local_list); - publ_lists[publ->scope]->size--; if (publ->scope == TIPC_NODE_SCOPE) return NULL; @@ -190,32 +156,28 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ) /** * named_distribute - prepare name info for bulk distribution to another node - * @msg_list: list of messages (buffers) to be returned from this function + * @list: list of messages (buffers) to be returned from this function * @dnode: node to be updated * @pls: linked list of publication items to be packed into buffer chain */ -static void named_distribute(struct list_head *msg_list, u32 dnode, - struct publ_list *pls) +static void named_distribute(struct sk_buff_head *list, u32 dnode, + struct list_head *pls) { struct publication *publ; - struct sk_buff *buf = NULL; + struct sk_buff *skb = NULL; struct distr_item *item = NULL; - uint dsz = pls->size * ITEM_SIZE; uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE; - uint rem = dsz; - uint msg_rem = 0; + uint msg_rem = msg_dsz; - list_for_each_entry(publ, &pls->list, local_list) { + list_for_each_entry(publ, pls, local_list) { /* Prepare next buffer: */ - if (!buf) { - msg_rem = min_t(uint, rem, msg_dsz); - rem -= msg_rem; - buf = named_prepare_buf(PUBLICATION, msg_rem, dnode); - if (!buf) { + if (!skb) { + skb = named_prepare_buf(PUBLICATION, msg_rem, dnode); + if (!skb) { pr_warn("Bulk publication failure\n"); return; } - item = (struct distr_item *)msg_data(buf_msg(buf)); + item = (struct distr_item *)msg_data(buf_msg(skb)); } /* Pack publication into message: */ @@ -225,10 +187,16 @@ static void named_distribute(struct list_head *msg_list, u32 dnode, /* Append full buffer to list: */ if (!msg_rem) { - list_add_tail((struct list_head *)buf, msg_list); - buf = NULL; + __skb_queue_tail(list, skb); + skb = NULL; + msg_rem = msg_dsz; } } + if (skb) { + msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem)); + skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem)); + __skb_queue_tail(list, skb); + } } /** @@ -236,36 +204,68 @@ static void named_distribute(struct list_head *msg_list, u32 dnode, */ void tipc_named_node_up(u32 dnode) { - LIST_HEAD(msg_list); - struct sk_buff *buf_chain; - - read_lock_bh(&tipc_nametbl_lock); - named_distribute(&msg_list, dnode, &publ_cluster); - named_distribute(&msg_list, dnode, &publ_zone); - read_unlock_bh(&tipc_nametbl_lock); - - /* Convert circular list to linear list and send: */ - buf_chain = (struct sk_buff *)msg_list.next; - ((struct sk_buff *)msg_list.prev)->next = NULL; - tipc_link_xmit(buf_chain, dnode, dnode); + struct sk_buff_head head; + + __skb_queue_head_init(&head); + + rcu_read_lock(); + named_distribute(&head, dnode, + &tipc_nametbl->publ_list[TIPC_CLUSTER_SCOPE]); + named_distribute(&head, dnode, + &tipc_nametbl->publ_list[TIPC_ZONE_SCOPE]); + rcu_read_unlock(); + + tipc_link_xmit(&head, dnode, dnode); +} + +static void tipc_publ_subscribe(struct publication *publ, u32 addr) +{ + struct tipc_node *node; + + if (in_own_node(addr)) + return; + + node = tipc_node_find(addr); + if (!node) { + pr_warn("Node subscription rejected, unknown node 0x%x\n", + addr); + return; + } + + tipc_node_lock(node); + list_add_tail(&publ->nodesub_list, &node->publ_list); + tipc_node_unlock(node); +} + +static void tipc_publ_unsubscribe(struct publication *publ, u32 addr) +{ + struct tipc_node *node; + + node = tipc_node_find(addr); + if (!node) + return; + + tipc_node_lock(node); + list_del_init(&publ->nodesub_list); + tipc_node_unlock(node); } /** - * named_purge_publ - remove publication associated with a failed node + * tipc_publ_purge - remove publication associated with a failed node * * Invoked for each publication issued by a newly failed node. * Removes publication structure from name table & deletes it. */ -static void named_purge_publ(struct publication *publ) +static void tipc_publ_purge(struct publication *publ, u32 addr) { struct publication *p; - write_lock_bh(&tipc_nametbl_lock); + spin_lock_bh(&tipc_nametbl_lock); p = tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node, publ->ref, publ->key); if (p) - tipc_nodesub_unsubscribe(&p->subscr); - write_unlock_bh(&tipc_nametbl_lock); + tipc_publ_unsubscribe(p, addr); + spin_unlock_bh(&tipc_nametbl_lock); if (p != publ) { pr_err("Unable to remove publication from failed node\n" @@ -274,7 +274,15 @@ static void named_purge_publ(struct publication *publ) publ->key); } - kfree(p); + kfree_rcu(p, rcu); +} + +void tipc_publ_notify(struct list_head *nsub_list, u32 addr) +{ + struct publication *publ, *tmp; + + list_for_each_entry_safe(publ, tmp, nsub_list, nodesub_list) + tipc_publ_purge(publ, addr); } /** @@ -294,9 +302,7 @@ static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype) TIPC_CLUSTER_SCOPE, node, ntohl(i->ref), ntohl(i->key)); if (publ) { - tipc_nodesub_subscribe(&publ->subscr, node, publ, - (net_ev_handler) - named_purge_publ); + tipc_publ_subscribe(publ, node); return true; } } else if (dtype == WITHDRAWAL) { @@ -304,8 +310,8 @@ static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype) node, ntohl(i->ref), ntohl(i->key)); if (publ) { - tipc_nodesub_unsubscribe(&publ->subscr); - kfree(publ); + tipc_publ_unsubscribe(publ, node); + kfree_rcu(publ, rcu); return true; } } else { @@ -370,14 +376,14 @@ void tipc_named_rcv(struct sk_buff *buf) u32 count = msg_data_sz(msg) / ITEM_SIZE; u32 node = msg_orignode(msg); - write_lock_bh(&tipc_nametbl_lock); + spin_lock_bh(&tipc_nametbl_lock); while (count--) { if (!tipc_update_nametbl(item, node, msg_type(msg))) tipc_named_add_backlog(item, msg_type(msg), node); item++; } tipc_named_process_backlog(); - write_unlock_bh(&tipc_nametbl_lock); + spin_unlock_bh(&tipc_nametbl_lock); kfree_skb(buf); } @@ -393,11 +399,12 @@ void tipc_named_reinit(void) struct publication *publ; int scope; - write_lock_bh(&tipc_nametbl_lock); + spin_lock_bh(&tipc_nametbl_lock); for (scope = TIPC_ZONE_SCOPE; scope <= TIPC_NODE_SCOPE; scope++) - list_for_each_entry(publ, &publ_lists[scope]->list, local_list) + list_for_each_entry_rcu(publ, &tipc_nametbl->publ_list[scope], + local_list) publ->node = tipc_own_addr; - write_unlock_bh(&tipc_nametbl_lock); + spin_unlock_bh(&tipc_nametbl_lock); } diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h index b9e75fe..cef55ce 100644 --- a/net/tipc/name_distr.h +++ b/net/tipc/name_distr.h @@ -74,5 +74,6 @@ void tipc_named_node_up(u32 dnode); void tipc_named_rcv(struct sk_buff *buf); void tipc_named_reinit(void); void tipc_named_process_backlog(void); +void tipc_publ_notify(struct list_head *nsub_list, u32 addr); #endif diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 3a6a0a7..c8df022 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -1,8 +1,8 @@ /* * net/tipc/name_table.c: TIPC name table code * - * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2004-2008, 2010-2011, Wind River Systems + * Copyright (c) 2000-2006, 2014, Ericsson AB + * Copyright (c) 2004-2008, 2010-2014, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -42,6 +42,12 @@ #define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ +static const struct nla_policy +tipc_nl_name_table_policy[TIPC_NLA_NAME_TABLE_MAX + 1] = { + [TIPC_NLA_NAME_TABLE_UNSPEC] = { .type = NLA_UNSPEC }, + [TIPC_NLA_NAME_TABLE_PUBL] = { .type = NLA_NESTED } +}; + /** * struct name_info - name sequence publication info * @node_list: circular list of publications made by own node @@ -86,6 +92,7 @@ struct sub_seq { * @ns_list: links to adjacent name sequences in hash chain * @subscriptions: list of subscriptions for this 'type' * @lock: spinlock controlling access to publication lists of all sub-sequences + * @rcu: RCU callback head used for deferred freeing */ struct name_seq { u32 type; @@ -95,21 +102,11 @@ struct name_seq { struct hlist_node ns_list; struct list_head subscriptions; spinlock_t lock; + struct rcu_head rcu; }; -/** - * struct name_table - table containing all existing port name publications - * @types: pointer to fixed-sized array of name sequence lists, - * accessed via hashing on 'type'; name sequence lists are *not* sorted - * @local_publ_count: number of publications issued by this node - */ -struct name_table { - struct hlist_head *types; - u32 local_publ_count; -}; - -static struct name_table table; -DEFINE_RWLOCK(tipc_nametbl_lock); +struct name_table *tipc_nametbl; +DEFINE_SPINLOCK(tipc_nametbl_lock); static int hash(int x) { @@ -136,9 +133,7 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper, publ->node = node; publ->ref = port_ref; publ->key = key; - INIT_LIST_HEAD(&publ->local_list); INIT_LIST_HEAD(&publ->pport_list); - INIT_LIST_HEAD(&publ->subscr.nodesub_list); return publ; } @@ -173,22 +168,10 @@ static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_hea nseq->alloc = 1; INIT_HLIST_NODE(&nseq->ns_list); INIT_LIST_HEAD(&nseq->subscriptions); - hlist_add_head(&nseq->ns_list, seq_head); + hlist_add_head_rcu(&nseq->ns_list, seq_head); return nseq; } -/* - * nameseq_delete_empty - deletes a name sequence structure if now unused - */ -static void nameseq_delete_empty(struct name_seq *seq) -{ - if (!seq->first_free && list_empty(&seq->subscriptions)) { - hlist_del_init(&seq->ns_list); - kfree(seq->sseqs); - kfree(seq); - } -} - /** * nameseq_find_subseq - find sub-sequence (if any) matching a name instance * @@ -469,8 +452,8 @@ static struct name_seq *nametbl_find_seq(u32 type) struct hlist_head *seq_head; struct name_seq *ns; - seq_head = &table.types[hash(type)]; - hlist_for_each_entry(ns, seq_head, ns_list) { + seq_head = &tipc_nametbl->seq_hlist[hash(type)]; + hlist_for_each_entry_rcu(ns, seq_head, ns_list) { if (ns->type == type) return ns; } @@ -481,7 +464,9 @@ static struct name_seq *nametbl_find_seq(u32 type) struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper, u32 scope, u32 node, u32 port, u32 key) { + struct publication *publ; struct name_seq *seq = nametbl_find_seq(type); + int index = hash(type); if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE) || (lower > upper)) { @@ -491,12 +476,16 @@ struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper, } if (!seq) - seq = tipc_nameseq_create(type, &table.types[hash(type)]); + seq = tipc_nameseq_create(type, + &tipc_nametbl->seq_hlist[index]); if (!seq) return NULL; - return tipc_nameseq_insert_publ(seq, type, lower, upper, + spin_lock_bh(&seq->lock); + publ = tipc_nameseq_insert_publ(seq, type, lower, upper, scope, node, port, key); + spin_unlock_bh(&seq->lock); + return publ; } struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower, @@ -508,8 +497,16 @@ struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower, if (!seq) return NULL; + spin_lock_bh(&seq->lock); publ = tipc_nameseq_remove_publ(seq, lower, node, ref, key); - nameseq_delete_empty(seq); + if (!seq->first_free && list_empty(&seq->subscriptions)) { + hlist_del_init_rcu(&seq->ns_list); + kfree(seq->sseqs); + spin_unlock_bh(&seq->lock); + kfree_rcu(seq, rcu); + return publ; + } + spin_unlock_bh(&seq->lock); return publ; } @@ -538,14 +535,14 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode) if (!tipc_in_scope(*destnode, tipc_own_addr)) return 0; - read_lock_bh(&tipc_nametbl_lock); + rcu_read_lock(); seq = nametbl_find_seq(type); if (unlikely(!seq)) goto not_found; + spin_lock_bh(&seq->lock); sseq = nameseq_find_subseq(seq, instance); if (unlikely(!sseq)) - goto not_found; - spin_lock_bh(&seq->lock); + goto no_match; info = sseq->info; /* Closest-First Algorithm */ @@ -595,7 +592,7 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode) no_match: spin_unlock_bh(&seq->lock); not_found: - read_unlock_bh(&tipc_nametbl_lock); + rcu_read_unlock(); *destnode = node; return ref; } @@ -621,13 +618,12 @@ int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit, struct name_info *info; int res = 0; - read_lock_bh(&tipc_nametbl_lock); + rcu_read_lock(); seq = nametbl_find_seq(type); if (!seq) goto exit; spin_lock_bh(&seq->lock); - sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); sseq_stop = seq->sseqs + seq->first_free; for (; sseq != sseq_stop; sseq++) { @@ -645,10 +641,9 @@ int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit, if (info->cluster_list_size != info->node_list_size) res = 1; } - spin_unlock_bh(&seq->lock); exit: - read_unlock_bh(&tipc_nametbl_lock); + rcu_read_unlock(); return res; } @@ -661,22 +656,23 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, struct publication *publ; struct sk_buff *buf = NULL; - if (table.local_publ_count >= TIPC_MAX_PUBLICATIONS) { + spin_lock_bh(&tipc_nametbl_lock); + if (tipc_nametbl->local_publ_count >= TIPC_MAX_PUBLICATIONS) { pr_warn("Publication failed, local publication limit reached (%u)\n", TIPC_MAX_PUBLICATIONS); + spin_unlock_bh(&tipc_nametbl_lock); return NULL; } - write_lock_bh(&tipc_nametbl_lock); publ = tipc_nametbl_insert_publ(type, lower, upper, scope, tipc_own_addr, port_ref, key); if (likely(publ)) { - table.local_publ_count++; + tipc_nametbl->local_publ_count++; buf = tipc_named_publish(publ); /* Any pending external events? */ tipc_named_process_backlog(); } - write_unlock_bh(&tipc_nametbl_lock); + spin_unlock_bh(&tipc_nametbl_lock); if (buf) named_cluster_distribute(buf); @@ -689,27 +685,28 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key) { struct publication *publ; - struct sk_buff *buf; + struct sk_buff *skb = NULL; - write_lock_bh(&tipc_nametbl_lock); + spin_lock_bh(&tipc_nametbl_lock); publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key); if (likely(publ)) { - table.local_publ_count--; - buf = tipc_named_withdraw(publ); + tipc_nametbl->local_publ_count--; + skb = tipc_named_withdraw(publ); /* Any pending external events? */ tipc_named_process_backlog(); - write_unlock_bh(&tipc_nametbl_lock); list_del_init(&publ->pport_list); - kfree(publ); + kfree_rcu(publ, rcu); + } else { + pr_err("Unable to remove local publication\n" + "(type=%u, lower=%u, ref=%u, key=%u)\n", + type, lower, ref, key); + } + spin_unlock_bh(&tipc_nametbl_lock); - if (buf) - named_cluster_distribute(buf); + if (skb) { + named_cluster_distribute(skb); return 1; } - write_unlock_bh(&tipc_nametbl_lock); - pr_err("Unable to remove local publication\n" - "(type=%u, lower=%u, ref=%u, key=%u)\n", - type, lower, ref, key); return 0; } @@ -719,12 +716,14 @@ int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key) void tipc_nametbl_subscribe(struct tipc_subscription *s) { u32 type = s->seq.type; + int index = hash(type); struct name_seq *seq; - write_lock_bh(&tipc_nametbl_lock); + spin_lock_bh(&tipc_nametbl_lock); seq = nametbl_find_seq(type); if (!seq) - seq = tipc_nameseq_create(type, &table.types[hash(type)]); + seq = tipc_nameseq_create(type, + &tipc_nametbl->seq_hlist[index]); if (seq) { spin_lock_bh(&seq->lock); tipc_nameseq_subscribe(seq, s); @@ -733,7 +732,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s) pr_warn("Failed to create subscription for {%u,%u,%u}\n", s->seq.type, s->seq.lower, s->seq.upper); } - write_unlock_bh(&tipc_nametbl_lock); + spin_unlock_bh(&tipc_nametbl_lock); } /** @@ -743,18 +742,23 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s) { struct name_seq *seq; - write_lock_bh(&tipc_nametbl_lock); + spin_lock_bh(&tipc_nametbl_lock); seq = nametbl_find_seq(s->seq.type); if (seq != NULL) { spin_lock_bh(&seq->lock); list_del_init(&s->nameseq_list); - spin_unlock_bh(&seq->lock); - nameseq_delete_empty(seq); + if (!seq->first_free && list_empty(&seq->subscriptions)) { + hlist_del_init_rcu(&seq->ns_list); + kfree(seq->sseqs); + spin_unlock_bh(&seq->lock); + kfree_rcu(seq, rcu); + } else { + spin_unlock_bh(&seq->lock); + } } - write_unlock_bh(&tipc_nametbl_lock); + spin_unlock_bh(&tipc_nametbl_lock); } - /** * subseq_list - print specified sub-sequence contents into the given buffer */ @@ -876,8 +880,8 @@ static int nametbl_list(char *buf, int len, u32 depth_info, lowbound = 0; upbound = ~0; for (i = 0; i < TIPC_NAMETBL_SIZE; i++) { - seq_head = &table.types[i]; - hlist_for_each_entry(seq, seq_head, ns_list) { + seq_head = &tipc_nametbl->seq_hlist[i]; + hlist_for_each_entry_rcu(seq, seq_head, ns_list) { ret += nameseq_list(seq, buf + ret, len - ret, depth, seq->type, lowbound, upbound, i); @@ -892,8 +896,8 @@ static int nametbl_list(char *buf, int len, u32 depth_info, } ret += nametbl_header(buf + ret, len - ret, depth); i = hash(type); - seq_head = &table.types[i]; - hlist_for_each_entry(seq, seq_head, ns_list) { + seq_head = &tipc_nametbl->seq_hlist[i]; + hlist_for_each_entry_rcu(seq, seq_head, ns_list) { if (seq->type == type) { ret += nameseq_list(seq, buf + ret, len - ret, depth, type, @@ -925,11 +929,11 @@ struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space) pb = TLV_DATA(rep_tlv); pb_len = ULTRA_STRING_MAX_LEN; argv = (struct tipc_name_table_query *)TLV_DATA(req_tlv_area); - read_lock_bh(&tipc_nametbl_lock); + rcu_read_lock(); str_len = nametbl_list(pb, pb_len, ntohl(argv->depth), ntohl(argv->type), ntohl(argv->lowbound), ntohl(argv->upbound)); - read_unlock_bh(&tipc_nametbl_lock); + rcu_read_unlock(); str_len += 1; /* for "\0" */ skb_put(buf, TLV_SPACE(str_len)); TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); @@ -939,12 +943,18 @@ struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space) int tipc_nametbl_init(void) { - table.types = kcalloc(TIPC_NAMETBL_SIZE, sizeof(struct hlist_head), - GFP_ATOMIC); - if (!table.types) + int i; + + tipc_nametbl = kzalloc(sizeof(*tipc_nametbl), GFP_ATOMIC); + if (!tipc_nametbl) return -ENOMEM; - table.local_publ_count = 0; + for (i = 0; i < TIPC_NAMETBL_SIZE; i++) + INIT_HLIST_HEAD(&tipc_nametbl->seq_hlist[i]); + + INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_ZONE_SCOPE]); + INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_CLUSTER_SCOPE]); + INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_NODE_SCOPE]); return 0; } @@ -959,17 +969,19 @@ static void tipc_purge_publications(struct name_seq *seq) struct sub_seq *sseq; struct name_info *info; - if (!seq->sseqs) { - nameseq_delete_empty(seq); - return; - } + spin_lock_bh(&seq->lock); sseq = seq->sseqs; info = sseq->info; list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) { tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node, publ->ref, publ->key); - kfree(publ); + kfree_rcu(publ, rcu); } + hlist_del_init_rcu(&seq->ns_list); + kfree(seq->sseqs); + spin_unlock_bh(&seq->lock); + + kfree_rcu(seq, rcu); } void tipc_nametbl_stop(void) @@ -977,21 +989,202 @@ void tipc_nametbl_stop(void) u32 i; struct name_seq *seq; struct hlist_head *seq_head; - struct hlist_node *safe; /* Verify name table is empty and purge any lingering * publications, then release the name table */ - write_lock_bh(&tipc_nametbl_lock); + spin_lock_bh(&tipc_nametbl_lock); for (i = 0; i < TIPC_NAMETBL_SIZE; i++) { - if (hlist_empty(&table.types[i])) + if (hlist_empty(&tipc_nametbl->seq_hlist[i])) continue; - seq_head = &table.types[i]; - hlist_for_each_entry_safe(seq, safe, seq_head, ns_list) { + seq_head = &tipc_nametbl->seq_hlist[i]; + hlist_for_each_entry_rcu(seq, seq_head, ns_list) { tipc_purge_publications(seq); } } - kfree(table.types); - table.types = NULL; - write_unlock_bh(&tipc_nametbl_lock); + spin_unlock_bh(&tipc_nametbl_lock); + + synchronize_net(); + kfree(tipc_nametbl); + +} + +static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, + struct name_seq *seq, + struct sub_seq *sseq, u32 *last_publ) +{ + void *hdr; + struct nlattr *attrs; + struct nlattr *publ; + struct publication *p; + + if (*last_publ) { + list_for_each_entry(p, &sseq->info->zone_list, zone_list) + if (p->key == *last_publ) + break; + if (p->key != *last_publ) + return -EPIPE; + } else { + p = list_first_entry(&sseq->info->zone_list, struct publication, + zone_list); + } + + list_for_each_entry_from(p, &sseq->info->zone_list, zone_list) { + *last_publ = p->key; + + hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, + &tipc_genl_v2_family, NLM_F_MULTI, + TIPC_NL_NAME_TABLE_GET); + if (!hdr) + return -EMSGSIZE; + + attrs = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE); + if (!attrs) + goto msg_full; + + publ = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE_PUBL); + if (!publ) + goto attr_msg_full; + + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, seq->type)) + goto publ_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sseq->lower)) + goto publ_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sseq->upper)) + goto publ_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_SCOPE, p->scope)) + goto publ_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_NODE, p->node)) + goto publ_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_REF, p->ref)) + goto publ_msg_full; + if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key)) + goto publ_msg_full; + + nla_nest_end(msg->skb, publ); + nla_nest_end(msg->skb, attrs); + genlmsg_end(msg->skb, hdr); + } + *last_publ = 0; + + return 0; + +publ_msg_full: + nla_nest_cancel(msg->skb, publ); +attr_msg_full: + nla_nest_cancel(msg->skb, attrs); +msg_full: + genlmsg_cancel(msg->skb, hdr); + + return -EMSGSIZE; +} + +static int __tipc_nl_subseq_list(struct tipc_nl_msg *msg, struct name_seq *seq, + u32 *last_lower, u32 *last_publ) +{ + struct sub_seq *sseq; + struct sub_seq *sseq_start; + int err; + + if (*last_lower) { + sseq_start = nameseq_find_subseq(seq, *last_lower); + if (!sseq_start) + return -EPIPE; + } else { + sseq_start = seq->sseqs; + } + + for (sseq = sseq_start; sseq != &seq->sseqs[seq->first_free]; sseq++) { + err = __tipc_nl_add_nametable_publ(msg, seq, sseq, last_publ); + if (err) { + *last_lower = sseq->lower; + return err; + } + } + *last_lower = 0; + + return 0; +} + +static int __tipc_nl_seq_list(struct tipc_nl_msg *msg, u32 *last_type, + u32 *last_lower, u32 *last_publ) +{ + struct hlist_head *seq_head; + struct name_seq *seq = NULL; + int err; + int i; + + if (*last_type) + i = hash(*last_type); + else + i = 0; + + for (; i < TIPC_NAMETBL_SIZE; i++) { + seq_head = &tipc_nametbl->seq_hlist[i]; + + if (*last_type) { + seq = nametbl_find_seq(*last_type); + if (!seq) + return -EPIPE; + } else { + hlist_for_each_entry_rcu(seq, seq_head, ns_list) + break; + if (!seq) + continue; + } + + hlist_for_each_entry_from_rcu(seq, ns_list) { + spin_lock_bh(&seq->lock); + err = __tipc_nl_subseq_list(msg, seq, last_lower, + last_publ); + + if (err) { + *last_type = seq->type; + spin_unlock_bh(&seq->lock); + return err; + } + spin_unlock_bh(&seq->lock); + } + *last_type = 0; + } + return 0; +} + +int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) +{ + int err; + int done = cb->args[3]; + u32 last_type = cb->args[0]; + u32 last_lower = cb->args[1]; + u32 last_publ = cb->args[2]; + struct tipc_nl_msg msg; + + if (done) + return 0; + + msg.skb = skb; + msg.portid = NETLINK_CB(cb->skb).portid; + msg.seq = cb->nlh->nlmsg_seq; + + rcu_read_lock(); + err = __tipc_nl_seq_list(&msg, &last_type, &last_lower, &last_publ); + if (!err) { + done = 1; + } else if (err != -EMSGSIZE) { + /* We never set seq or call nl_dump_check_consistent() this + * means that setting prev_seq here will cause the consistence + * check to fail in the netlink callback handler. Resulting in + * the NLMSG_DONE message having the NLM_F_DUMP_INTR flag set if + * we got an error. + */ + cb->prev_seq = 1; + } + rcu_read_unlock(); + + cb->args[0] = last_type; + cb->args[1] = last_lower; + cb->args[2] = last_publ; + cb->args[3] = done; + + return skb->len; } diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index f02f48b..5f0dee9 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -1,7 +1,7 @@ /* * net/tipc/name_table.h: Include file for TIPC name table code * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2006, 2014, Ericsson AB * Copyright (c) 2004-2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -37,15 +37,15 @@ #ifndef _TIPC_NAME_TABLE_H #define _TIPC_NAME_TABLE_H -#include "node_subscr.h" - struct tipc_subscription; struct tipc_port_list; /* * TIPC name types reserved for internal TIPC use (both current and planned) */ -#define TIPC_ZM_SRV 3 /* zone master service name type */ +#define TIPC_ZM_SRV 3 /* zone master service name type */ +#define TIPC_PUBL_SCOPE_NUM (TIPC_NODE_SCOPE + 1) +#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ /** * struct publication - info about a published (name or) name sequence @@ -56,12 +56,13 @@ struct tipc_port_list; * @node: network address of publishing port's node * @ref: publishing port * @key: publication key - * @subscr: subscription to "node down" event (for off-node publications only) + * @nodesub_list: subscription to "node down" event (off-node publication only) * @local_list: adjacent entries in list of publications made by this node * @pport_list: adjacent entries in list of publications made by this port * @node_list: adjacent matching name seq publications with >= node scope * @cluster_list: adjacent matching name seq publications with >= cluster scope * @zone_list: adjacent matching name seq publications with >= zone scope + * @rcu: RCU callback head used for deferred freeing * * Note that the node list, cluster list, and zone list are circular lists. */ @@ -73,16 +74,31 @@ struct publication { u32 node; u32 ref; u32 key; - struct tipc_node_subscr subscr; + struct list_head nodesub_list; struct list_head local_list; struct list_head pport_list; struct list_head node_list; struct list_head cluster_list; struct list_head zone_list; + struct rcu_head rcu; +}; + +/** + * struct name_table - table containing all existing port name publications + * @seq_hlist: name sequence hash lists + * @publ_list: pulication lists + * @local_publ_count: number of publications issued by this node + */ +struct name_table { + struct hlist_head seq_hlist[TIPC_NAMETBL_SIZE]; + struct list_head publ_list[TIPC_PUBL_SCOPE_NUM]; + u32 local_publ_count; }; +extern spinlock_t tipc_nametbl_lock; +extern struct name_table *tipc_nametbl; -extern rwlock_t tipc_nametbl_lock; +int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space); u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *node); diff --git a/net/tipc/net.c b/net/tipc/net.c index 93b9944..cf13df3 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -42,6 +42,11 @@ #include "node.h" #include "config.h" +static const struct nla_policy tipc_nl_net_policy[TIPC_NLA_NET_MAX + 1] = { + [TIPC_NLA_NET_UNSPEC] = { .type = NLA_UNSPEC }, + [TIPC_NLA_NET_ID] = { .type = NLA_U32 } +}; + /* * The TIPC locking policy is designed to ensure a very fine locking * granularity, permitting complete parallel access to individual @@ -138,3 +143,104 @@ void tipc_net_stop(void) pr_info("Left network mode\n"); } + +static int __tipc_nl_add_net(struct tipc_nl_msg *msg) +{ + void *hdr; + struct nlattr *attrs; + + hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family, + NLM_F_MULTI, TIPC_NL_NET_GET); + if (!hdr) + return -EMSGSIZE; + + attrs = nla_nest_start(msg->skb, TIPC_NLA_NET); + if (!attrs) + goto msg_full; + + if (nla_put_u32(msg->skb, TIPC_NLA_NET_ID, tipc_net_id)) + goto attr_msg_full; + + nla_nest_end(msg->skb, attrs); + genlmsg_end(msg->skb, hdr); + + return 0; + +attr_msg_full: + nla_nest_cancel(msg->skb, attrs); +msg_full: + genlmsg_cancel(msg->skb, hdr); + + return -EMSGSIZE; +} + +int tipc_nl_net_dump(struct sk_buff *skb, struct netlink_callback *cb) +{ + int err; + int done = cb->args[0]; + struct tipc_nl_msg msg; + + if (done) + return 0; + + msg.skb = skb; + msg.portid = NETLINK_CB(cb->skb).portid; + msg.seq = cb->nlh->nlmsg_seq; + + err = __tipc_nl_add_net(&msg); + if (err) + goto out; + + done = 1; +out: + cb->args[0] = done; + + return skb->len; +} + +int tipc_nl_net_set(struct sk_buff *skb, struct genl_info *info) +{ + int err; + struct nlattr *attrs[TIPC_NLA_NET_MAX + 1]; + + if (!info->attrs[TIPC_NLA_NET]) + return -EINVAL; + + err = nla_parse_nested(attrs, TIPC_NLA_NET_MAX, + info->attrs[TIPC_NLA_NET], + tipc_nl_net_policy); + if (err) + return err; + + if (attrs[TIPC_NLA_NET_ID]) { + u32 val; + + /* Can't change net id once TIPC has joined a network */ + if (tipc_own_addr) + return -EPERM; + + val = nla_get_u32(attrs[TIPC_NLA_NET_ID]); + if (val < 1 || val > 9999) + return -EINVAL; + + tipc_net_id = val; + } + + if (attrs[TIPC_NLA_NET_ADDR]) { + u32 addr; + + /* Can't change net addr once TIPC has joined a network */ + if (tipc_own_addr) + return -EPERM; + + addr = nla_get_u32(attrs[TIPC_NLA_NET_ADDR]); + if (!tipc_addr_node_valid(addr)) + return -EINVAL; + + rtnl_lock(); + tipc_net_start(addr); + rtnl_unlock(); + } + + return 0; +} diff --git a/net/tipc/net.h b/net/tipc/net.h index 59ef338..a81c1b9 100644 --- a/net/tipc/net.h +++ b/net/tipc/net.h @@ -1,7 +1,7 @@ /* * net/tipc/net.h: Include file for TIPC network routing code * - * Copyright (c) 1995-2006, Ericsson AB + * Copyright (c) 1995-2006, 2014, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -37,7 +37,13 @@ #ifndef _TIPC_NET_H #define _TIPC_NET_H +#include <net/genetlink.h> + int tipc_net_start(u32 addr); + void tipc_net_stop(void); +int tipc_nl_net_dump(struct sk_buff *skb, struct netlink_callback *cb); +int tipc_nl_net_set(struct sk_buff *skb, struct genl_info *info); + #endif diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c index ad844d3..b891e39 100644 --- a/net/tipc/netlink.c +++ b/net/tipc/netlink.c @@ -1,7 +1,7 @@ /* * net/tipc/netlink.c: TIPC configuration handling * - * Copyright (c) 2005-2006, Ericsson AB + * Copyright (c) 2005-2006, 2014, Ericsson AB * Copyright (c) 2005-2007, Wind River Systems * All rights reserved. * @@ -36,6 +36,12 @@ #include "core.h" #include "config.h" +#include "socket.h" +#include "name_table.h" +#include "bearer.h" +#include "link.h" +#include "node.h" +#include "net.h" #include <net/genetlink.h> static int handle_cmd(struct sk_buff *skb, struct genl_info *info) @@ -68,6 +74,19 @@ static int handle_cmd(struct sk_buff *skb, struct genl_info *info) return 0; } +static const struct nla_policy tipc_nl_policy[TIPC_NLA_MAX + 1] = { + [TIPC_NLA_UNSPEC] = { .type = NLA_UNSPEC, }, + [TIPC_NLA_BEARER] = { .type = NLA_NESTED, }, + [TIPC_NLA_SOCK] = { .type = NLA_NESTED, }, + [TIPC_NLA_PUBL] = { .type = NLA_NESTED, }, + [TIPC_NLA_LINK] = { .type = NLA_NESTED, }, + [TIPC_NLA_MEDIA] = { .type = NLA_NESTED, }, + [TIPC_NLA_NODE] = { .type = NLA_NESTED, }, + [TIPC_NLA_NET] = { .type = NLA_NESTED, }, + [TIPC_NLA_NAME_TABLE] = { .type = NLA_NESTED, } +}; + +/* Legacy ASCII API */ static struct genl_family tipc_genl_family = { .id = GENL_ID_GENERATE, .name = TIPC_GENL_NAME, @@ -76,6 +95,7 @@ static struct genl_family tipc_genl_family = { .maxattr = 0, }; +/* Legacy ASCII API */ static struct genl_ops tipc_genl_ops[] = { { .cmd = TIPC_GENL_CMD, @@ -83,12 +103,122 @@ static struct genl_ops tipc_genl_ops[] = { }, }; +/* Users of the legacy API (tipc-config) can't handle that we add operations, + * so we have a separate genl handling for the new API. + */ +struct genl_family tipc_genl_v2_family = { + .id = GENL_ID_GENERATE, + .name = TIPC_GENL_V2_NAME, + .version = TIPC_GENL_V2_VERSION, + .hdrsize = 0, + .maxattr = TIPC_NLA_MAX, +}; + +static const struct genl_ops tipc_genl_v2_ops[] = { + { + .cmd = TIPC_NL_BEARER_DISABLE, + .doit = tipc_nl_bearer_disable, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_BEARER_ENABLE, + .doit = tipc_nl_bearer_enable, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_BEARER_GET, + .doit = tipc_nl_bearer_get, + .dumpit = tipc_nl_bearer_dump, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_BEARER_SET, + .doit = tipc_nl_bearer_set, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_SOCK_GET, + .dumpit = tipc_nl_sk_dump, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_PUBL_GET, + .dumpit = tipc_nl_publ_dump, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_LINK_GET, + .doit = tipc_nl_link_get, + .dumpit = tipc_nl_link_dump, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_LINK_SET, + .doit = tipc_nl_link_set, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_LINK_RESET_STATS, + .doit = tipc_nl_link_reset_stats, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_MEDIA_GET, + .doit = tipc_nl_media_get, + .dumpit = tipc_nl_media_dump, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_MEDIA_SET, + .doit = tipc_nl_media_set, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_NODE_GET, + .dumpit = tipc_nl_node_dump, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_NET_GET, + .dumpit = tipc_nl_net_dump, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_NET_SET, + .doit = tipc_nl_net_set, + .policy = tipc_nl_policy, + }, + { + .cmd = TIPC_NL_NAME_TABLE_GET, + .dumpit = tipc_nl_name_table_dump, + .policy = tipc_nl_policy, + } +}; + +int tipc_nlmsg_parse(const struct nlmsghdr *nlh, struct nlattr ***attr) +{ + u32 maxattr = tipc_genl_v2_family.maxattr; + + *attr = tipc_genl_v2_family.attrbuf; + if (!*attr) + return -EOPNOTSUPP; + + return nlmsg_parse(nlh, GENL_HDRLEN, *attr, maxattr, tipc_nl_policy); +} + int tipc_netlink_start(void) { int res; res = genl_register_family_with_ops(&tipc_genl_family, tipc_genl_ops); if (res) { + pr_err("Failed to register legacy interface\n"); + return res; + } + + res = genl_register_family_with_ops(&tipc_genl_v2_family, + tipc_genl_v2_ops); + if (res) { pr_err("Failed to register netlink interface\n"); return res; } @@ -98,4 +228,5 @@ int tipc_netlink_start(void) void tipc_netlink_stop(void) { genl_unregister_family(&tipc_genl_family); + genl_unregister_family(&tipc_genl_v2_family); } diff --git a/net/tipc/node_subscr.h b/net/tipc/netlink.h index d91b8cc..1425c68 100644 --- a/net/tipc/node_subscr.h +++ b/net/tipc/netlink.h @@ -1,8 +1,7 @@ /* - * net/tipc/node_subscr.h: Include file for TIPC "node down" subscription handling + * net/tipc/netlink.h: Include file for TIPC netlink code * - * Copyright (c) 1995-2006, Ericsson AB - * Copyright (c) 2005, 2010-2011, Wind River Systems + * Copyright (c) 2014, Ericsson AB * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,30 +33,16 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#ifndef _TIPC_NODE_SUBSCR_H -#define _TIPC_NODE_SUBSCR_H +#ifndef _TIPC_NETLINK_H +#define _TIPC_NETLINK_H -#include "addr.h" +extern struct genl_family tipc_genl_v2_family; +int tipc_nlmsg_parse(const struct nlmsghdr *nlh, struct nlattr ***buf); -typedef void (*net_ev_handler) (void *usr_handle); - -/** - * struct tipc_node_subscr - "node down" subscription entry - * @node: ptr to node structure of interest (or NULL, if none) - * @handle_node_down: routine to invoke when node fails - * @usr_handle: argument to pass to routine when node fails - * @nodesub_list: adjacent entries in list of subscriptions for the node - */ -struct tipc_node_subscr { - struct tipc_node *node; - net_ev_handler handle_node_down; - void *usr_handle; - struct list_head nodesub_list; +struct tipc_nl_msg { + struct sk_buff *skb; + u32 portid; + u32 seq; }; -void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr, - void *usr_handle, net_ev_handler handle_down); -void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub); -void tipc_nodesub_notify(struct list_head *nsub_list); - #endif diff --git a/net/tipc/node.c b/net/tipc/node.c index 5781634..8d353ec 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -58,6 +58,12 @@ struct tipc_sock_conn { struct list_head list; }; +static const struct nla_policy tipc_nl_node_policy[TIPC_NLA_NODE_MAX + 1] = { + [TIPC_NLA_NODE_UNSPEC] = { .type = NLA_UNSPEC }, + [TIPC_NLA_NODE_ADDR] = { .type = NLA_U32 }, + [TIPC_NLA_NODE_UP] = { .type = NLA_FLAG } +}; + /* * A trivial power-of-two bitmask technique is used for speed, since this * operation is done for every incoming TIPC packet. The number of hash table @@ -107,9 +113,10 @@ struct tipc_node *tipc_node_create(u32 addr) spin_lock_init(&n_ptr->lock); INIT_HLIST_NODE(&n_ptr->hash); INIT_LIST_HEAD(&n_ptr->list); - INIT_LIST_HEAD(&n_ptr->nsub); + INIT_LIST_HEAD(&n_ptr->publ_list); INIT_LIST_HEAD(&n_ptr->conn_sks); - __skb_queue_head_init(&n_ptr->waiting_sks); + skb_queue_head_init(&n_ptr->waiting_sks); + __skb_queue_head_init(&n_ptr->bclink.deferred_queue); hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); @@ -375,8 +382,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Flush broadcast link info associated with lost node */ if (n_ptr->bclink.recv_permitted) { - kfree_skb_list(n_ptr->bclink.deferred_head); - n_ptr->bclink.deferred_size = 0; + __skb_queue_purge(&n_ptr->bclink.deferred_queue); if (n_ptr->bclink.reasm_buf) { kfree_skb(n_ptr->bclink.reasm_buf); @@ -568,7 +574,7 @@ void tipc_node_unlock(struct tipc_node *node) skb_queue_splice_init(&node->waiting_sks, &waiting_sks); if (flags & TIPC_NOTIFY_NODE_DOWN) { - list_replace_init(&node->nsub, &nsub_list); + list_replace_init(&node->publ_list, &nsub_list); list_replace_init(&node->conn_sks, &conn_sks); } node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN | @@ -585,7 +591,7 @@ void tipc_node_unlock(struct tipc_node *node) tipc_node_abort_sock_conns(&conn_sks); if (!list_empty(&nsub_list)) - tipc_nodesub_notify(&nsub_list); + tipc_publ_notify(&nsub_list, addr); if (flags & TIPC_WAKEUP_BCAST_USERS) tipc_bclink_wakeup_users(); @@ -601,3 +607,93 @@ void tipc_node_unlock(struct tipc_node *node) tipc_nametbl_withdraw(TIPC_LINK_STATE, addr, link_id, addr); } + +/* Caller should hold node lock for the passed node */ +static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node) +{ + void *hdr; + struct nlattr *attrs; + + hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family, + NLM_F_MULTI, TIPC_NL_NODE_GET); + if (!hdr) + return -EMSGSIZE; + + attrs = nla_nest_start(msg->skb, TIPC_NLA_NODE); + if (!attrs) + goto msg_full; + + if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr)) + goto attr_msg_full; + if (tipc_node_is_up(node)) + if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP)) + goto attr_msg_full; + + nla_nest_end(msg->skb, attrs); + genlmsg_end(msg->skb, hdr); + + return 0; + +attr_msg_full: + nla_nest_cancel(msg->skb, attrs); +msg_full: + genlmsg_cancel(msg->skb, hdr); + + return -EMSGSIZE; +} + +int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb) +{ + int err; + int done = cb->args[0]; + int last_addr = cb->args[1]; + struct tipc_node *node; + struct tipc_nl_msg msg; + + if (done) + return 0; + + msg.skb = skb; + msg.portid = NETLINK_CB(cb->skb).portid; + msg.seq = cb->nlh->nlmsg_seq; + + rcu_read_lock(); + + if (last_addr && !tipc_node_find(last_addr)) { + rcu_read_unlock(); + /* We never set seq or call nl_dump_check_consistent() this + * means that setting prev_seq here will cause the consistence + * check to fail in the netlink callback handler. Resulting in + * the NLMSG_DONE message having the NLM_F_DUMP_INTR flag set if + * the node state changed while we released the lock. + */ + cb->prev_seq = 1; + return -EPIPE; + } + + list_for_each_entry_rcu(node, &tipc_node_list, list) { + if (last_addr) { + if (node->addr == last_addr) + last_addr = 0; + else + continue; + } + + tipc_node_lock(node); + err = __tipc_nl_add_node(&msg, node); + if (err) { + last_addr = node->addr; + tipc_node_unlock(node); + goto out; + } + + tipc_node_unlock(node); + } + done = 1; +out: + cb->args[0] = done; + cb->args[1] = last_addr; + rcu_read_unlock(); + + return skb->len; +} diff --git a/net/tipc/node.h b/net/tipc/node.h index 04e9145..cbe0e95 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -1,7 +1,7 @@ /* * net/tipc/node.h: Include file for TIPC node management routines * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2006, 2014, Ericsson AB * Copyright (c) 2005, 2010-2014, Wind River Systems * All rights reserved. * @@ -37,7 +37,6 @@ #ifndef _TIPC_NODE_H #define _TIPC_NODE_H -#include "node_subscr.h" #include "addr.h" #include "net.h" #include "bearer.h" @@ -72,9 +71,7 @@ enum { * @last_in: sequence # of last in-sequence b'cast message received from node * @last_sent: sequence # of last b'cast message sent by node * @oos_state: state tracker for handling OOS b'cast messages - * @deferred_size: number of OOS b'cast messages in deferred queue - * @deferred_head: oldest OOS b'cast message received from node - * @deferred_tail: newest OOS b'cast message received from node + * @deferred_queue: deferred queue saved OOS b'cast message received from node * @reasm_buf: broadcast reassembly queue head from node * @recv_permitted: true if node is allowed to receive b'cast messages */ @@ -84,8 +81,7 @@ struct tipc_node_bclink { u32 last_sent; u32 oos_state; u32 deferred_size; - struct sk_buff *deferred_head; - struct sk_buff *deferred_tail; + struct sk_buff_head deferred_queue; struct sk_buff *reasm_buf; bool recv_permitted; }; @@ -104,7 +100,7 @@ struct tipc_node_bclink { * @link_cnt: number of links to node * @signature: node instance identifier * @link_id: local and remote bearer ids of changing link, if any - * @nsub: list of "node down" subscriptions monitoring node + * @publ_list: list of publications * @rcu: rcu struct for tipc_node */ struct tipc_node { @@ -121,7 +117,7 @@ struct tipc_node { int working_links; u32 signature; u32 link_id; - struct list_head nsub; + struct list_head publ_list; struct sk_buff_head waiting_sks; struct list_head conn_sks; struct rcu_head rcu; @@ -145,6 +141,8 @@ void tipc_node_unlock(struct tipc_node *node); int tipc_node_add_conn(u32 dnode, u32 port, u32 peer_port); void tipc_node_remove_conn(u32 dnode, u32 port); +int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb); + static inline void tipc_node_lock(struct tipc_node *node) { spin_lock_bh(&node->lock); diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c deleted file mode 100644 index 2d13eea..0000000 --- a/net/tipc/node_subscr.c +++ /dev/null @@ -1,96 +0,0 @@ -/* - * net/tipc/node_subscr.c: TIPC "node down" subscription handling - * - * Copyright (c) 1995-2006, Ericsson AB - * Copyright (c) 2005, 2010-2011, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "core.h" -#include "node_subscr.h" -#include "node.h" - -/** - * tipc_nodesub_subscribe - create "node down" subscription for specified node - */ -void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr, - void *usr_handle, net_ev_handler handle_down) -{ - if (in_own_node(addr)) { - node_sub->node = NULL; - return; - } - - node_sub->node = tipc_node_find(addr); - if (!node_sub->node) { - pr_warn("Node subscription rejected, unknown node 0x%x\n", - addr); - return; - } - node_sub->handle_node_down = handle_down; - node_sub->usr_handle = usr_handle; - - tipc_node_lock(node_sub->node); - list_add_tail(&node_sub->nodesub_list, &node_sub->node->nsub); - tipc_node_unlock(node_sub->node); -} - -/** - * tipc_nodesub_unsubscribe - cancel "node down" subscription (if any) - */ -void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub) -{ - if (!node_sub->node) - return; - - tipc_node_lock(node_sub->node); - list_del_init(&node_sub->nodesub_list); - tipc_node_unlock(node_sub->node); -} - -/** - * tipc_nodesub_notify - notify subscribers that a node is unreachable - * - * Note: node is locked by caller - */ -void tipc_nodesub_notify(struct list_head *nsub_list) -{ - struct tipc_node_subscr *ns, *safe; - net_ev_handler handle_node_down; - - list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) { - handle_node_down = ns->handle_node_down; - if (handle_node_down) { - ns->handle_node_down = NULL; - handle_node_down(ns->usr_handle); - } - } -} diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 51bddc2..4731cad 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -121,6 +121,14 @@ static const struct proto_ops msg_ops; static struct proto tipc_proto; static struct proto tipc_proto_kern; +static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = { + [TIPC_NLA_SOCK_UNSPEC] = { .type = NLA_UNSPEC }, + [TIPC_NLA_SOCK_ADDR] = { .type = NLA_U32 }, + [TIPC_NLA_SOCK_REF] = { .type = NLA_U32 }, + [TIPC_NLA_SOCK_CON] = { .type = NLA_NESTED }, + [TIPC_NLA_SOCK_HAS_PUBL] = { .type = NLA_FLAG } +}; + /* * Revised TIPC socket locking policy: * @@ -236,12 +244,12 @@ static void tsk_advance_rx_queue(struct sock *sk) */ static void tsk_rej_rx_queue(struct sock *sk) { - struct sk_buff *buf; + struct sk_buff *skb; u32 dnode; - while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { - if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) - tipc_link_xmit(buf, dnode, 0); + while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { + if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT)) + tipc_link_xmit_skb(skb, dnode, 0); } } @@ -454,7 +462,7 @@ static int tipc_release(struct socket *sock) { struct sock *sk = sock->sk; struct tipc_sock *tsk; - struct sk_buff *buf; + struct sk_buff *skb; u32 dnode; /* @@ -473,11 +481,11 @@ static int tipc_release(struct socket *sock) */ dnode = tsk_peer_node(tsk); while (sock->state != SS_DISCONNECTING) { - buf = __skb_dequeue(&sk->sk_receive_queue); - if (buf == NULL) + skb = __skb_dequeue(&sk->sk_receive_queue); + if (skb == NULL) break; - if (TIPC_SKB_CB(buf)->handle != NULL) - kfree_skb(buf); + if (TIPC_SKB_CB(skb)->handle != NULL) + kfree_skb(skb); else { if ((sock->state == SS_CONNECTING) || (sock->state == SS_CONNECTED)) { @@ -485,8 +493,8 @@ static int tipc_release(struct socket *sock) tsk->connected = 0; tipc_node_remove_conn(dnode, tsk->ref); } - if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) - tipc_link_xmit(buf, dnode, 0); + if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT)) + tipc_link_xmit_skb(skb, dnode, 0); } } @@ -494,12 +502,12 @@ static int tipc_release(struct socket *sock) tipc_sk_ref_discard(tsk->ref); k_cancel_timer(&tsk->timer); if (tsk->connected) { - buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, + skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode, tipc_own_addr, tsk_peer_port(tsk), tsk->ref, TIPC_ERR_NO_PORT); - if (buf) - tipc_link_xmit(buf, dnode, tsk->ref); + if (skb) + tipc_link_xmit_skb(skb, dnode, tsk->ref); tipc_node_remove_conn(dnode, tsk->ref); } k_term_timer(&tsk->timer); @@ -692,7 +700,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, * tipc_sendmcast - send multicast message * @sock: socket structure * @seq: destination address - * @iov: message data to send + * @msg: message to send * @dsz: total length of message data * @timeo: timeout to wait for wakeup * @@ -700,11 +708,11 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, * Returns the number of bytes sent on success, or errno */ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, - struct iovec *iov, size_t dsz, long timeo) + struct msghdr *msg, size_t dsz, long timeo) { struct sock *sk = sock->sk; struct tipc_msg *mhdr = &tipc_sk(sk)->phdr; - struct sk_buff *buf; + struct sk_buff_head head; uint mtu; int rc; @@ -719,12 +727,13 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, new_mtu: mtu = tipc_bclink_get_mtu(); - rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); + __skb_queue_head_init(&head); + rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head); if (unlikely(rc < 0)) return rc; do { - rc = tipc_bclink_xmit(buf); + rc = tipc_bclink_xmit(&head); if (likely(rc >= 0)) { rc = dsz; break; @@ -736,7 +745,7 @@ new_mtu: tipc_sk(sk)->link_cong = 1; rc = tipc_wait_for_sndmsg(sock, &timeo); if (rc) - kfree_skb_list(buf); + __skb_queue_purge(&head); } while (!rc); return rc; } @@ -818,39 +827,6 @@ exit: return TIPC_OK; } -/** - * dest_name_check - verify user is permitted to send to specified port name - * @dest: destination address - * @m: descriptor for message to be sent - * - * Prevents restricted configuration commands from being issued by - * unauthorized users. - * - * Returns 0 if permission is granted, otherwise errno - */ -static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m) -{ - struct tipc_cfg_msg_hdr hdr; - - if (unlikely(dest->addrtype == TIPC_ADDR_ID)) - return 0; - if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES)) - return 0; - if (likely(dest->addr.name.name.type == TIPC_TOP_SRV)) - return 0; - if (likely(dest->addr.name.name.type != TIPC_CFG_SRV)) - return -EACCES; - - if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr))) - return -EMSGSIZE; - if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr))) - return -EFAULT; - if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN))) - return -EACCES; - - return 0; -} - static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) { struct sock *sk = sock->sk; @@ -897,13 +873,13 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *mhdr = &tsk->phdr; - struct iovec *iov = m->msg_iov; u32 dnode, dport; - struct sk_buff *buf; + struct sk_buff_head head; + struct sk_buff *skb; struct tipc_name_seq *seq = &dest->addr.nameseq; u32 mtu; long timeo; - int rc = -EINVAL; + int rc; if (unlikely(!dest)) return -EDESTADDRREQ; @@ -936,14 +912,11 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, tsk->conn_instance = dest->addr.name.name.instance; } } - rc = dest_name_check(dest, m); - if (rc) - goto exit; timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); if (dest->addrtype == TIPC_ADDR_MCAST) { - rc = tipc_sendmcast(sock, seq, iov, dsz, timeo); + rc = tipc_sendmcast(sock, seq, m, dsz, timeo); goto exit; } else if (dest->addrtype == TIPC_ADDR_NAME) { u32 type = dest->addr.name.name.type; @@ -974,13 +947,15 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, new_mtu: mtu = tipc_node_get_mtu(dnode, tsk->ref); - rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); + __skb_queue_head_init(&head); + rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head); if (rc < 0) goto exit; do { - TIPC_SKB_CB(buf)->wakeup_pending = tsk->link_cong; - rc = tipc_link_xmit(buf, dnode, tsk->ref); + skb = skb_peek(&head); + TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; + rc = tipc_link_xmit(&head, dnode, tsk->ref); if (likely(rc >= 0)) { if (sock->state != SS_READY) sock->state = SS_CONNECTING; @@ -994,7 +969,7 @@ new_mtu: tsk->link_cong = 1; rc = tipc_wait_for_sndmsg(sock, &timeo); if (rc) - kfree_skb_list(buf); + __skb_queue_purge(&head); } while (!rc); exit: if (iocb) @@ -1051,7 +1026,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *mhdr = &tsk->phdr; - struct sk_buff *buf; + struct sk_buff_head head; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); u32 ref = tsk->ref; int rc = -EINVAL; @@ -1086,12 +1061,13 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, next: mtu = tsk->max_pkt; send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); - rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf); + __skb_queue_head_init(&head); + rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head); if (unlikely(rc < 0)) goto exit; do { if (likely(!tsk_conn_cong(tsk))) { - rc = tipc_link_xmit(buf, dnode, ref); + rc = tipc_link_xmit(&head, dnode, ref); if (likely(!rc)) { tsk->sent_unacked++; sent += send; @@ -1109,7 +1085,7 @@ next: } rc = tipc_wait_for_sndpkt(sock, &timeo); if (rc) - kfree_skb_list(buf); + __skb_queue_purge(&head); } while (!rc); exit: if (iocb) @@ -1254,20 +1230,20 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg, static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) { - struct sk_buff *buf = NULL; + struct sk_buff *skb = NULL; struct tipc_msg *msg; u32 peer_port = tsk_peer_port(tsk); u32 dnode = tsk_peer_node(tsk); if (!tsk->connected) return; - buf = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, + skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, tipc_own_addr, peer_port, tsk->ref, TIPC_OK); - if (!buf) + if (!skb) return; - msg = buf_msg(buf); + msg = buf_msg(skb); msg_set_msgcnt(msg, ack); - tipc_link_xmit(buf, dnode, msg_link_selector(msg)); + tipc_link_xmit_skb(skb, dnode, msg_link_selector(msg)); } static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) @@ -1372,8 +1348,7 @@ restart: sz = buf_len; m->msg_flags |= MSG_TRUNC; } - res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg), - m->msg_iov, sz); + res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz); if (res) goto exit; res = sz; @@ -1473,8 +1448,8 @@ restart: needed = (buf_len - sz_copied); sz_to_copy = (sz <= needed) ? sz : needed; - res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset, - m->msg_iov, sz_to_copy); + res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset, + m, sz_to_copy); if (res) goto exit; @@ -1556,7 +1531,7 @@ static void tipc_data_ready(struct sock *sk) * @tsk: TIPC socket * @msg: message * - * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise + * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise */ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) { @@ -1723,20 +1698,20 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf) /** * tipc_backlog_rcv - handle incoming message from backlog queue * @sk: socket - * @buf: message + * @skb: message * * Caller must hold socket lock, but not port lock. * * Returns 0 */ -static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) +static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) { int rc; u32 onode; struct tipc_sock *tsk = tipc_sk(sk); - uint truesize = buf->truesize; + uint truesize = skb->truesize; - rc = filter_rcv(sk, buf); + rc = filter_rcv(sk, skb); if (likely(!rc)) { if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) @@ -1744,25 +1719,25 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) return 0; } - if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc)) + if ((rc < 0) && !tipc_msg_reverse(skb, &onode, -rc)) return 0; - tipc_link_xmit(buf, onode, 0); + tipc_link_xmit_skb(skb, onode, 0); return 0; } /** * tipc_sk_rcv - handle incoming message - * @buf: buffer containing arriving message + * @skb: buffer containing arriving message * Consumes buffer * Returns 0 if success, or errno: -EHOSTUNREACH */ -int tipc_sk_rcv(struct sk_buff *buf) +int tipc_sk_rcv(struct sk_buff *skb) { struct tipc_sock *tsk; struct sock *sk; - u32 dport = msg_destport(buf_msg(buf)); + u32 dport = msg_destport(buf_msg(skb)); int rc = TIPC_OK; uint limit; u32 dnode; @@ -1770,7 +1745,7 @@ int tipc_sk_rcv(struct sk_buff *buf) /* Validate destination and message */ tsk = tipc_sk_get(dport); if (unlikely(!tsk)) { - rc = tipc_msg_eval(buf, &dnode); + rc = tipc_msg_eval(skb, &dnode); goto exit; } sk = &tsk->sk; @@ -1779,12 +1754,12 @@ int tipc_sk_rcv(struct sk_buff *buf) spin_lock_bh(&sk->sk_lock.slock); if (!sock_owned_by_user(sk)) { - rc = filter_rcv(sk, buf); + rc = filter_rcv(sk, skb); } else { if (sk->sk_backlog.len == 0) atomic_set(&tsk->dupl_rcvcnt, 0); - limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); - if (sk_add_backlog(sk, buf, limit)) + limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt); + if (sk_add_backlog(sk, skb, limit)) rc = -TIPC_ERR_OVERLOAD; } spin_unlock_bh(&sk->sk_lock.slock); @@ -1792,10 +1767,10 @@ int tipc_sk_rcv(struct sk_buff *buf) if (likely(!rc)) return 0; exit: - if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc)) + if ((rc < 0) && !tipc_msg_reverse(skb, &dnode, -rc)) return -EHOSTUNREACH; - tipc_link_xmit(buf, dnode, 0); + tipc_link_xmit_skb(skb, dnode, 0); return (rc < 0) ? -EHOSTUNREACH : 0; } @@ -2053,7 +2028,7 @@ static int tipc_shutdown(struct socket *sock, int how) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct sk_buff *buf; + struct sk_buff *skb; u32 dnode; int res; @@ -2068,23 +2043,23 @@ static int tipc_shutdown(struct socket *sock, int how) restart: /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ - buf = __skb_dequeue(&sk->sk_receive_queue); - if (buf) { - if (TIPC_SKB_CB(buf)->handle != NULL) { - kfree_skb(buf); + skb = __skb_dequeue(&sk->sk_receive_queue); + if (skb) { + if (TIPC_SKB_CB(skb)->handle != NULL) { + kfree_skb(skb); goto restart; } - if (tipc_msg_reverse(buf, &dnode, TIPC_CONN_SHUTDOWN)) - tipc_link_xmit(buf, dnode, tsk->ref); + if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN)) + tipc_link_xmit_skb(skb, dnode, tsk->ref); tipc_node_remove_conn(dnode, tsk->ref); } else { dnode = tsk_peer_node(tsk); - buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, + skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode, tipc_own_addr, tsk_peer_port(tsk), tsk->ref, TIPC_CONN_SHUTDOWN); - tipc_link_xmit(buf, dnode, tsk->ref); + tipc_link_xmit_skb(skb, dnode, tsk->ref); } tsk->connected = 0; sock->state = SS_DISCONNECTING; @@ -2113,7 +2088,7 @@ static void tipc_sk_timeout(unsigned long ref) { struct tipc_sock *tsk; struct sock *sk; - struct sk_buff *buf = NULL; + struct sk_buff *skb = NULL; u32 peer_port, peer_node; tsk = tipc_sk_get(ref); @@ -2131,20 +2106,20 @@ static void tipc_sk_timeout(unsigned long ref) if (tsk->probing_state == TIPC_CONN_PROBING) { /* Previous probe not answered -> self abort */ - buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, + skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, 0, tipc_own_addr, peer_node, ref, peer_port, TIPC_ERR_NO_PORT); } else { - buf = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, + skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0, peer_node, tipc_own_addr, peer_port, ref, TIPC_OK); tsk->probing_state = TIPC_CONN_PROBING; k_start_timer(&tsk->timer, tsk->probing_interval); } bh_unlock_sock(sk); - if (buf) - tipc_link_xmit(buf, peer_node, ref); + if (skb) + tipc_link_xmit_skb(skb, peer_node, ref); exit: tipc_sk_put(tsk); } @@ -2802,3 +2777,233 @@ void tipc_socket_stop(void) sock_unregister(tipc_family_ops.family); proto_unregister(&tipc_proto); } + +/* Caller should hold socket lock for the passed tipc socket. */ +static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk) +{ + u32 peer_node; + u32 peer_port; + struct nlattr *nest; + + peer_node = tsk_peer_node(tsk); + peer_port = tsk_peer_port(tsk); + + nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON); + + if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node)) + goto msg_full; + if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port)) + goto msg_full; + + if (tsk->conn_type != 0) { + if (nla_put_flag(skb, TIPC_NLA_CON_FLAG)) + goto msg_full; + if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type)) + goto msg_full; + if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance)) + goto msg_full; + } + nla_nest_end(skb, nest); + + return 0; + +msg_full: + nla_nest_cancel(skb, nest); + + return -EMSGSIZE; +} + +/* Caller should hold socket lock for the passed tipc socket. */ +static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb, + struct tipc_sock *tsk) +{ + int err; + void *hdr; + struct nlattr *attrs; + + hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq, + &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET); + if (!hdr) + goto msg_cancel; + + attrs = nla_nest_start(skb, TIPC_NLA_SOCK); + if (!attrs) + goto genlmsg_cancel; + if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->ref)) + goto attr_msg_cancel; + if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr)) + goto attr_msg_cancel; + + if (tsk->connected) { + err = __tipc_nl_add_sk_con(skb, tsk); + if (err) + goto attr_msg_cancel; + } else if (!list_empty(&tsk->publications)) { + if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL)) + goto attr_msg_cancel; + } + nla_nest_end(skb, attrs); + genlmsg_end(skb, hdr); + + return 0; + +attr_msg_cancel: + nla_nest_cancel(skb, attrs); +genlmsg_cancel: + genlmsg_cancel(skb, hdr); +msg_cancel: + return -EMSGSIZE; +} + +int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb) +{ + int err; + struct tipc_sock *tsk; + u32 prev_ref = cb->args[0]; + u32 ref = prev_ref; + + tsk = tipc_sk_get_next(&ref); + for (; tsk; tsk = tipc_sk_get_next(&ref)) { + lock_sock(&tsk->sk); + err = __tipc_nl_add_sk(skb, cb, tsk); + release_sock(&tsk->sk); + tipc_sk_put(tsk); + if (err) + break; + + prev_ref = ref; + } + + cb->args[0] = prev_ref; + + return skb->len; +} + +/* Caller should hold socket lock for the passed tipc socket. */ +static int __tipc_nl_add_sk_publ(struct sk_buff *skb, + struct netlink_callback *cb, + struct publication *publ) +{ + void *hdr; + struct nlattr *attrs; + + hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq, + &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET); + if (!hdr) + goto msg_cancel; + + attrs = nla_nest_start(skb, TIPC_NLA_PUBL); + if (!attrs) + goto genlmsg_cancel; + + if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key)) + goto attr_msg_cancel; + if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type)) + goto attr_msg_cancel; + if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower)) + goto attr_msg_cancel; + if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper)) + goto attr_msg_cancel; + + nla_nest_end(skb, attrs); + genlmsg_end(skb, hdr); + + return 0; + +attr_msg_cancel: + nla_nest_cancel(skb, attrs); +genlmsg_cancel: + genlmsg_cancel(skb, hdr); +msg_cancel: + return -EMSGSIZE; +} + +/* Caller should hold socket lock for the passed tipc socket. */ +static int __tipc_nl_list_sk_publ(struct sk_buff *skb, + struct netlink_callback *cb, + struct tipc_sock *tsk, u32 *last_publ) +{ + int err; + struct publication *p; + + if (*last_publ) { + list_for_each_entry(p, &tsk->publications, pport_list) { + if (p->key == *last_publ) + break; + } + if (p->key != *last_publ) { + /* We never set seq or call nl_dump_check_consistent() + * this means that setting prev_seq here will cause the + * consistence check to fail in the netlink callback + * handler. Resulting in the last NLMSG_DONE message + * having the NLM_F_DUMP_INTR flag set. + */ + cb->prev_seq = 1; + *last_publ = 0; + return -EPIPE; + } + } else { + p = list_first_entry(&tsk->publications, struct publication, + pport_list); + } + + list_for_each_entry_from(p, &tsk->publications, pport_list) { + err = __tipc_nl_add_sk_publ(skb, cb, p); + if (err) { + *last_publ = p->key; + return err; + } + } + *last_publ = 0; + + return 0; +} + +int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb) +{ + int err; + u32 tsk_ref = cb->args[0]; + u32 last_publ = cb->args[1]; + u32 done = cb->args[2]; + struct tipc_sock *tsk; + + if (!tsk_ref) { + struct nlattr **attrs; + struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1]; + + err = tipc_nlmsg_parse(cb->nlh, &attrs); + if (err) + return err; + + err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX, + attrs[TIPC_NLA_SOCK], + tipc_nl_sock_policy); + if (err) + return err; + + if (!sock[TIPC_NLA_SOCK_REF]) + return -EINVAL; + + tsk_ref = nla_get_u32(sock[TIPC_NLA_SOCK_REF]); + } + + if (done) + return 0; + + tsk = tipc_sk_get(tsk_ref); + if (!tsk) + return -EINVAL; + + lock_sock(&tsk->sk); + err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ); + if (!err) + done = 1; + release_sock(&tsk->sk); + tipc_sk_put(tsk); + + cb->args[0] = tsk_ref; + cb->args[1] = last_publ; + cb->args[2] = done; + + return skb->len; +} diff --git a/net/tipc/socket.h b/net/tipc/socket.h index baa43d0..d340893 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -36,6 +36,7 @@ #define _TIPC_SOCK_H #include <net/sock.h> +#include <net/genetlink.h> #define TIPC_CONNACK_INTV 256 #define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) @@ -47,5 +48,7 @@ void tipc_sk_mcast_rcv(struct sk_buff *buf); void tipc_sk_reinit(void); int tipc_sk_ref_table_init(u32 requested_size, u32 start); void tipc_sk_ref_table_stop(void); +int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb); +int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb); #endif diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 31b5cb2..0344206 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -305,7 +305,6 @@ static int subscr_subscribe(struct tipc_subscr *s, kfree(sub); return -EINVAL; } - INIT_LIST_HEAD(&sub->nameseq_list); list_add(&sub->subscription_list, &subscriber->subscription_list); sub->subscriber = subscriber; sub->swap = swap; |