diff options
-rw-r--r-- | include/uapi/linux/tipc.h | 14 | ||||
-rw-r--r-- | net/tipc/Makefile | 2 | ||||
-rw-r--r-- | net/tipc/group.c | 404 | ||||
-rw-r--r-- | net/tipc/group.h | 64 | ||||
-rw-r--r-- | net/tipc/link.c | 3 | ||||
-rw-r--r-- | net/tipc/msg.h | 50 | ||||
-rw-r--r-- | net/tipc/name_table.c | 44 | ||||
-rw-r--r-- | net/tipc/name_table.h | 3 | ||||
-rw-r--r-- | net/tipc/node.h | 3 | ||||
-rw-r--r-- | net/tipc/socket.c | 209 |
10 files changed, 748 insertions, 48 deletions
diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h index 5351b08..5f7b2c4 100644 --- a/include/uapi/linux/tipc.h +++ b/include/uapi/linux/tipc.h @@ -231,6 +231,20 @@ struct sockaddr_tipc { #define TIPC_SOCK_RECVQ_DEPTH 132 /* Default: none (read only) */ #define TIPC_MCAST_BROADCAST 133 /* Default: TIPC selects. No arg */ #define TIPC_MCAST_REPLICAST 134 /* Default: TIPC selects. No arg */ +#define TIPC_GROUP_JOIN 135 /* Takes struct tipc_group_req* */ +#define TIPC_GROUP_LEAVE 136 /* No argument */ + +/* + * Flag values + */ +#define TIPC_GROUP_LOOPBACK 0x1 /* Receive copy of sent msg when match */ + +struct tipc_group_req { + __u32 type; /* group id */ + __u32 instance; /* member id */ + __u32 scope; /* zone/cluster/node */ + __u32 flags; +}; /* * Maximum sizes of TIPC bearer-related names (including terminating NULL) diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 31b9f9c..a3af73e 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -8,7 +8,7 @@ tipc-y += addr.o bcast.o bearer.o \ core.o link.o discover.o msg.o \ name_distr.o subscr.o monitor.o name_table.o net.o \ netlink.o netlink_compat.o node.o socket.o eth_media.o \ - server.o socket.o + server.o socket.o group.o tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o diff --git a/net/tipc/group.c b/net/tipc/group.c new file mode 100644 index 0000000..3f0e1ce --- /dev/null +++ b/net/tipc/group.c @@ -0,0 +1,404 @@ +/* + * net/tipc/group.c: TIPC group messaging code + * + * Copyright (c) 2017, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "core.h" +#include "addr.h" +#include "group.h" +#include "bcast.h" +#include "server.h" +#include "msg.h" +#include "socket.h" +#include "node.h" +#include "name_table.h" +#include "subscr.h" + +#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) +#define ADV_IDLE ADV_UNIT + +enum mbr_state { + MBR_QUARANTINED, + MBR_DISCOVERED, + MBR_JOINING, + MBR_PUBLISHED, + MBR_JOINED, + MBR_LEAVING +}; + +struct tipc_member { + struct rb_node tree_node; + struct list_head list; + u32 node; + u32 port; + enum mbr_state state; + u16 bc_rcv_nxt; +}; + +struct tipc_group { + struct rb_root members; + struct tipc_nlist dests; + struct net *net; + int subid; + u32 type; + u32 instance; + u32 domain; + u32 scope; + u32 portid; + u16 member_cnt; + u16 bc_snd_nxt; + bool loopback; +}; + +static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, + int mtyp, struct sk_buff_head *xmitq); + +u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) +{ + return grp->bc_snd_nxt; +} + +static bool tipc_group_is_receiver(struct tipc_member *m) +{ + return m && m->state >= MBR_JOINED; +} + +int tipc_group_size(struct tipc_group *grp) +{ + return grp->member_cnt; +} + +struct tipc_group *tipc_group_create(struct net *net, u32 portid, + struct tipc_group_req *mreq) +{ + struct tipc_group *grp; + u32 type = mreq->type; + + grp = kzalloc(sizeof(*grp), GFP_ATOMIC); + if (!grp) + return NULL; + tipc_nlist_init(&grp->dests, tipc_own_addr(net)); + grp->members = RB_ROOT; + grp->net = net; + grp->portid = portid; + grp->domain = addr_domain(net, mreq->scope); + grp->type = type; + grp->instance = mreq->instance; + grp->scope = mreq->scope; + grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; + if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) + return grp; + kfree(grp); + return NULL; +} + +void tipc_group_delete(struct net *net, struct tipc_group *grp) +{ + struct rb_root *tree = &grp->members; + struct tipc_member *m, *tmp; + struct sk_buff_head xmitq; + + __skb_queue_head_init(&xmitq); + + rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { + tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); + list_del(&m->list); + kfree(m); + } + tipc_node_distr_xmit(net, &xmitq); + tipc_nlist_purge(&grp->dests); + tipc_topsrv_kern_unsubscr(net, grp->subid); + kfree(grp); +} + +struct tipc_member *tipc_group_find_member(struct tipc_group *grp, + u32 node, u32 port) +{ + struct rb_node *n = grp->members.rb_node; + u64 nkey, key = (u64)node << 32 | port; + struct tipc_member *m; + + while (n) { + m = container_of(n, struct tipc_member, tree_node); + nkey = (u64)m->node << 32 | m->port; + if (key < nkey) + n = n->rb_left; + else if (key > nkey) + n = n->rb_right; + else + return m; + } + return NULL; +} + +static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, + u32 node) +{ + struct tipc_member *m; + struct rb_node *n; + + for (n = rb_first(&grp->members); n; n = rb_next(n)) { + m = container_of(n, struct tipc_member, tree_node); + if (m->node == node) + return m; + } + return NULL; +} + +static void tipc_group_add_to_tree(struct tipc_group *grp, + struct tipc_member *m) +{ + u64 nkey, key = (u64)m->node << 32 | m->port; + struct rb_node **n, *parent = NULL; + struct tipc_member *tmp; + + n = &grp->members.rb_node; + while (*n) { + tmp = container_of(*n, struct tipc_member, tree_node); + parent = *n; + tmp = container_of(parent, struct tipc_member, tree_node); + nkey = (u64)tmp->node << 32 | tmp->port; + if (key < nkey) + n = &(*n)->rb_left; + else if (key > nkey) + n = &(*n)->rb_right; + else + return; + } + rb_link_node(&m->tree_node, parent, n); + rb_insert_color(&m->tree_node, &grp->members); +} + +static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, + u32 node, u32 port, + int state) +{ + struct tipc_member *m; + + m = kzalloc(sizeof(*m), GFP_ATOMIC); + if (!m) + return NULL; + INIT_LIST_HEAD(&m->list); + m->node = node; + m->port = port; + grp->member_cnt++; + tipc_group_add_to_tree(grp, m); + tipc_nlist_add(&grp->dests, m->node); + m->state = state; + return m; +} + +void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) +{ + tipc_group_create_member(grp, node, port, MBR_DISCOVERED); +} + +static void tipc_group_delete_member(struct tipc_group *grp, + struct tipc_member *m) +{ + rb_erase(&m->tree_node, &grp->members); + grp->member_cnt--; + list_del_init(&m->list); + + /* If last member on a node, remove node from dest list */ + if (!tipc_group_find_node(grp, m->node)) + tipc_nlist_del(&grp->dests, m->node); + + kfree(m); +} + +struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) +{ + return &grp->dests; +} + +void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, + int *scope) +{ + seq->type = grp->type; + seq->lower = grp->instance; + seq->upper = grp->instance; + *scope = grp->scope; +} + +void tipc_group_update_bc_members(struct tipc_group *grp) +{ + grp->bc_snd_nxt++; +} + +/* tipc_group_filter_msg() - determine if we should accept arriving message + */ +void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb = __skb_dequeue(inputq); + struct tipc_member *m; + struct tipc_msg *hdr; + u32 node, port; + int mtyp; + + if (!skb) + return; + + hdr = buf_msg(skb); + mtyp = msg_type(hdr); + node = msg_orignode(hdr); + port = msg_origport(hdr); + + if (!msg_in_group(hdr)) + goto drop; + + m = tipc_group_find_member(grp, node, port); + if (!tipc_group_is_receiver(m)) + goto drop; + + __skb_queue_tail(inputq, skb); + + m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; + return; +drop: + kfree_skb(skb); +} + +static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, + int mtyp, struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr; + struct sk_buff *skb; + + skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, + m->node, tipc_own_addr(grp->net), + m->port, grp->portid, 0); + if (!skb) + return; + + hdr = buf_msg(skb); + if (mtyp == GRP_JOIN_MSG) + msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); + __skb_queue_tail(xmitq, skb); +} + +void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, + struct sk_buff_head *xmitq) +{ + u32 node = msg_orignode(hdr); + u32 port = msg_origport(hdr); + struct tipc_member *m; + + if (!grp) + return; + + m = tipc_group_find_member(grp, node, port); + + switch (msg_type(hdr)) { + case GRP_JOIN_MSG: + if (!m) + m = tipc_group_create_member(grp, node, port, + MBR_QUARANTINED); + if (!m) + return; + m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); + + /* Wait until PUBLISH event is received */ + if (m->state == MBR_DISCOVERED) + m->state = MBR_JOINING; + else if (m->state == MBR_PUBLISHED) + m->state = MBR_JOINED; + return; + case GRP_LEAVE_MSG: + if (!m) + return; + + /* Wait until WITHDRAW event is received */ + if (m->state != MBR_LEAVING) { + m->state = MBR_LEAVING; + return; + } + /* Otherwise deliver already received WITHDRAW event */ + tipc_group_delete_member(grp, m); + return; + default: + pr_warn("Received unknown GROUP_PROTO message\n"); + } +} + +/* tipc_group_member_evt() - receive and handle a member up/down event + */ +void tipc_group_member_evt(struct tipc_group *grp, + struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb); + struct tipc_event *evt = (void *)msg_data(hdr); + u32 node = evt->port.node; + u32 port = evt->port.ref; + struct tipc_member *m; + struct net *net; + u32 self; + + if (!grp) + goto drop; + + net = grp->net; + self = tipc_own_addr(net); + if (!grp->loopback && node == self && port == grp->portid) + goto drop; + + m = tipc_group_find_member(grp, node, port); + + if (evt->event == TIPC_PUBLISHED) { + if (!m) + m = tipc_group_create_member(grp, node, port, + MBR_DISCOVERED); + if (!m) + goto drop; + + /* Wait if JOIN message not yet received */ + if (m->state == MBR_DISCOVERED) + m->state = MBR_PUBLISHED; + else + m->state = MBR_JOINED; + tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); + } else if (evt->event == TIPC_WITHDRAWN) { + if (!m) + goto drop; + + /* Keep back event if more messages might be expected */ + if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) + m->state = MBR_LEAVING; + else + tipc_group_delete_member(grp, m); + } +drop: + kfree_skb(skb); +} diff --git a/net/tipc/group.h b/net/tipc/group.h new file mode 100644 index 0000000..9bdf447 --- /dev/null +++ b/net/tipc/group.h @@ -0,0 +1,64 @@ +/* + * net/tipc/group.h: Include file for TIPC group unicast/multicast functions + * + * Copyright (c) 2017, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_GROUP_H +#define _TIPC_GROUP_H + +#include "core.h" + +struct tipc_group; +struct tipc_member; +struct tipc_msg; + +struct tipc_group *tipc_group_create(struct net *net, u32 portid, + struct tipc_group_req *mreq); +void tipc_group_delete(struct net *net, struct tipc_group *grp); +void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port); +struct tipc_nlist *tipc_group_dests(struct tipc_group *grp); +void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, + int *scope); +void tipc_group_filter_msg(struct tipc_group *grp, + struct sk_buff_head *inputq, + struct sk_buff_head *xmitq); +void tipc_group_member_evt(struct tipc_group *grp, + struct sk_buff *skb, + struct sk_buff_head *xmitq); +void tipc_group_proto_rcv(struct tipc_group *grp, + struct tipc_msg *hdr, + struct sk_buff_head *xmitq); +void tipc_group_update_bc_members(struct tipc_group *grp); +u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); +int tipc_group_size(struct tipc_group *grp); +#endif diff --git a/net/tipc/link.c b/net/tipc/link.c index ac0144f..bd25bff 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1046,11 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, case TIPC_MEDIUM_IMPORTANCE: case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: - if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { + if (unlikely(msg_mcast(hdr))) { skb_queue_tail(l->bc_rcvlink->inputq, skb); return true; } case CONN_MANAGER: + case GROUP_PROTOCOL: skb_queue_tail(inputq, skb); return true; case NAME_DISTRIBUTOR: diff --git a/net/tipc/msg.h b/net/tipc/msg.h index be3e38a..dad4009 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1,7 +1,7 @@ /* * net/tipc/msg.h: Include file for TIPC message header routines * - * Copyright (c) 2000-2007, 2014-2015 Ericsson AB + * Copyright (c) 2000-2007, 2014-2017 Ericsson AB * Copyright (c) 2005-2008, 2010-2011, Wind River Systems * All rights reserved. * @@ -61,10 +61,11 @@ struct plist; /* * Payload message types */ -#define TIPC_CONN_MSG 0 -#define TIPC_MCAST_MSG 1 -#define TIPC_NAMED_MSG 2 -#define TIPC_DIRECT_MSG 3 +#define TIPC_CONN_MSG 0 +#define TIPC_MCAST_MSG 1 +#define TIPC_NAMED_MSG 2 +#define TIPC_DIRECT_MSG 3 +#define TIPC_GRP_BCAST_MSG 4 /* * Internal message users @@ -73,6 +74,7 @@ struct plist; #define MSG_BUNDLER 6 #define LINK_PROTOCOL 7 #define CONN_MANAGER 8 +#define GROUP_PROTOCOL 9 #define TUNNEL_PROTOCOL 10 #define NAME_DISTRIBUTOR 11 #define MSG_FRAGMENTER 12 @@ -87,6 +89,7 @@ struct plist; #define BASIC_H_SIZE 32 /* Basic payload message */ #define NAMED_H_SIZE 40 /* Named payload message */ #define MCAST_H_SIZE 44 /* Multicast payload message */ +#define GROUP_H_SIZE 44 /* Group payload message */ #define INT_H_SIZE 40 /* Internal messages */ #define MIN_H_SIZE 24 /* Smallest legal TIPC header size */ #define MAX_H_SIZE 60 /* Largest possible TIPC header size */ @@ -252,6 +255,11 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n) msg_set_bits(m, 1, 29, 0x7, n); } +static inline int msg_in_group(struct tipc_msg *m) +{ + return (msg_type(m) == TIPC_GRP_BCAST_MSG); +} + static inline u32 msg_named(struct tipc_msg *m) { return msg_type(m) == TIPC_NAMED_MSG; @@ -259,7 +267,9 @@ static inline u32 msg_named(struct tipc_msg *m) static inline u32 msg_mcast(struct tipc_msg *m) { - return msg_type(m) == TIPC_MCAST_MSG; + int mtyp = msg_type(m); + + return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG)); } static inline u32 msg_connected(struct tipc_msg *m) @@ -515,6 +525,12 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) #define DSC_RESP_MSG 1 /* + * Group protocol message types + */ +#define GRP_JOIN_MSG 0 +#define GRP_LEAVE_MSG 1 + +/* * Word 1 */ static inline u32 msg_seq_gap(struct tipc_msg *m) @@ -795,6 +811,28 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 0, 0xffff, n); } +static inline u16 msg_grp_bc_syncpt(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + +/* Word 10 + */ +static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) +{ + return msg_bits(m, 10, 16, 0xffff); +} + +static inline void msg_set_grp_bc_seqno(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 10, 16, 0xffff, n); +} + static inline bool msg_peer_link_is_up(struct tipc_msg *m) { if (likely(msg_user(m) != LINK_PROTOCOL)) diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 76bd277..114d72b 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -43,6 +43,7 @@ #include "bcast.h" #include "addr.h" #include "node.h" +#include "group.h" #include <net/genetlink.h> #define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ @@ -596,18 +597,6 @@ not_found: return ref; } -/** - * tipc_nametbl_mc_translate - find multicast destinations - * - * Creates list of all local ports that overlap the given multicast address; - * also determines if any off-node ports overlap. - * - * Note: Publications with a scope narrower than 'limit' are ignored. - * (i.e. local node-scope publications mustn't receive messages arriving - * from another node, even if the multcast link brought it here) - * - * Returns non-zero if any off-node ports overlap - */ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, u32 limit, struct list_head *dports) { @@ -679,6 +668,37 @@ exit: rcu_read_unlock(); } +/* tipc_nametbl_build_group - build list of communication group members + */ +void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, + u32 type, u32 domain) +{ + struct sub_seq *sseq, *stop; + struct name_info *info; + struct publication *p; + struct name_seq *seq; + + rcu_read_lock(); + seq = nametbl_find_seq(net, type); + if (!seq) + goto exit; + + spin_lock_bh(&seq->lock); + sseq = seq->sseqs; + stop = seq->sseqs + seq->first_free; + for (; sseq != stop; sseq++) { + info = sseq->info; + list_for_each_entry(p, &info->zone_list, zone_list) { + if (!tipc_in_scope(domain, p->node)) + continue; + tipc_group_add_member(grp, p->node, p->ref); + } + } + spin_unlock_bh(&seq->lock); +exit: + rcu_read_unlock(); +} + /* * tipc_nametbl_publish - add name publication to network name tables */ diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index d121175..97646b1 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -40,6 +40,7 @@ struct tipc_subscription; struct tipc_plist; struct tipc_nlist; +struct tipc_group; /* * TIPC name types reserved for internal TIPC use (both current and planned) @@ -101,6 +102,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, u32 limit, struct list_head *dports); +void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, + u32 type, u32 domain); void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, u32 upper, u32 domain, struct tipc_nlist *nodes); diff --git a/net/tipc/node.h b/net/tipc/node.h index df2f219..acd58d2 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -48,7 +48,8 @@ enum { TIPC_BCAST_SYNCH = (1 << 1), TIPC_BCAST_STATE_NACK = (1 << 2), TIPC_BLOCK_FLOWCTL = (1 << 3), - TIPC_BCAST_RCAST = (1 << 4) + TIPC_BCAST_RCAST = (1 << 4), + TIPC_MCAST_GROUPS = (1 << 5) }; #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ diff --git a/net/tipc/socket.c b/net/tipc/socket.c index daf7c4d..64bbf9d 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,7 +1,7 @@ /* * net/tipc/socket.c: TIPC socket API * - * Copyright (c) 2001-2007, 2012-2016, Ericsson AB + * Copyright (c) 2001-2007, 2012-2017, Ericsson AB * Copyright (c) 2004-2008, 2010-2013, Wind River Systems * All rights reserved. * @@ -45,6 +45,7 @@ #include "socket.h" #include "bcast.h" #include "netlink.h" +#include "group.h" #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ #define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */ @@ -78,7 +79,7 @@ enum { * @conn_timeout: the time we can wait for an unresponded setup request * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue * @cong_link_cnt: number of congested links - * @sent_unacked: # messages sent by socket, and not yet acked by peer + * @snt_unacked: # messages sent by socket, and not yet acked by peer * @rcv_unacked: # messages read by user, but not yet acked back to peer * @peer: 'connected' peer for dgram/rdm * @node: hash table node @@ -109,6 +110,7 @@ struct tipc_sock { struct rhash_head node; struct tipc_mc_method mc_method; struct rcu_head rcu; + struct tipc_group *group; }; static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb); @@ -123,6 +125,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); +static int tipc_sk_leave(struct tipc_sock *tsk); static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); static int tipc_sk_insert(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk); @@ -559,6 +562,7 @@ static int tipc_release(struct socket *sock) __tipc_shutdown(sock, TIPC_ERR_NO_PORT); sk->sk_shutdown = SHUTDOWN_MASK; + tipc_sk_leave(tsk); tipc_sk_withdraw(tsk, 0, NULL); sk_stop_timer(sk, &sk->sk_timer); tipc_sk_remove(tsk); @@ -601,7 +605,10 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr, res = tipc_sk_withdraw(tsk, 0, NULL); goto exit; } - + if (tsk->group) { + res = -EACCES; + goto exit; + } if (uaddr_len < sizeof(struct sockaddr_tipc)) { res = -EINVAL; goto exit; @@ -698,6 +705,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; u32 mask = 0; sock_poll_wait(file, sk_sleep(sk), wait); @@ -718,8 +726,9 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, mask |= (POLLIN | POLLRDNORM); break; case TIPC_OPEN: - if (!tsk->cong_link_cnt) - mask |= POLLOUT; + if (!grp || tipc_group_size(grp)) + if (!tsk->cong_link_cnt) + mask |= POLLOUT; if (tipc_sk_type_connectionless(sk) && (!skb_queue_empty(&sk->sk_receive_queue))) mask |= (POLLIN | POLLRDNORM); @@ -757,6 +766,9 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, struct tipc_nlist dsts; int rc; + if (tsk->group) + return -EACCES; + /* Block or return if any destination link is congested */ rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); if (unlikely(rc)) @@ -794,6 +806,64 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, } /** + * tipc_send_group_bcast - send message to all members in communication group + * @sk: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + struct sock *sk = sock->sk; + struct net *net = sock_net(sk); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; + struct tipc_nlist *dsts = tipc_group_dests(grp); + struct tipc_mc_method *method = &tsk->mc_method; + struct tipc_msg *hdr = &tsk->phdr; + int mtu = tipc_bcast_get_mtu(net); + struct sk_buff_head pkts; + int rc = -EHOSTUNREACH; + + if (!dsts->local && !dsts->remote) + return -EHOSTUNREACH; + + /* Block or return if any destination link is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); + if (unlikely(rc)) + return rc; + + /* Complete message header */ + msg_set_type(hdr, TIPC_GRP_BCAST_MSG); + msg_set_hdr_sz(hdr, MCAST_H_SIZE); + msg_set_destport(hdr, 0); + msg_set_destnode(hdr, 0); + msg_set_nameinst(hdr, 0); + msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); + + /* Build message as chain of buffers */ + skb_queue_head_init(&pkts); + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) + return rc; + + /* Send message */ + rc = tipc_mcast_xmit(net, &pkts, method, dsts, + &tsk->cong_link_cnt); + if (unlikely(rc)) + return rc; + + /* Update broadcast sequence number */ + tipc_group_update_bc_members(tsk->group); + + return dlen; +} + +/** * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets * @arrvq: queue with arriving messages, to be cloned after destination lookup * @inputq: queue with cloned messages, delivered to socket after dest lookup @@ -803,13 +873,15 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, struct sk_buff_head *inputq) { - struct tipc_msg *msg; - struct list_head dports; - u32 portid; u32 scope = TIPC_CLUSTER_SCOPE; - struct sk_buff_head tmpq; - uint hsz; + u32 self = tipc_own_addr(net); struct sk_buff *skb, *_skb; + u32 lower = 0, upper = ~0; + struct sk_buff_head tmpq; + u32 portid, oport, onode; + struct list_head dports; + struct tipc_msg *msg; + int hsz; __skb_queue_head_init(&tmpq); INIT_LIST_HEAD(&dports); @@ -818,14 +890,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { msg = buf_msg(skb); hsz = skb_headroom(skb) + msg_hdr_sz(msg); - - if (in_own_node(net, msg_orignode(msg))) + oport = msg_origport(msg); + onode = msg_orignode(msg); + if (onode == self) scope = TIPC_NODE_SCOPE; /* Create destination port list and message clones: */ - tipc_nametbl_mc_translate(net, - msg_nametype(msg), msg_namelower(msg), - msg_nameupper(msg), scope, &dports); + if (!msg_in_group(msg)) { + lower = msg_namelower(msg); + upper = msg_nameupper(msg); + } + tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper, + scope, &dports); while (tipc_dest_pop(&dports, NULL, &portid)) { _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); if (_skb) { @@ -895,10 +971,6 @@ exit: kfree_skb(skb); } -static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt) -{ -} - /** * tipc_sendmsg - send message in connectionless manner * @sock: socket structure @@ -934,6 +1006,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); struct list_head *clinks = &tsk->cong_links; bool syn = !tipc_sk_type_connectionless(sk); + struct tipc_group *grp = tsk->group; struct tipc_msg *hdr = &tsk->phdr; struct tipc_name_seq *seq; struct sk_buff_head pkts; @@ -944,6 +1017,9 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; + if (unlikely(grp)) + return tipc_send_group_bcast(sock, m, dlen, timeout); + if (unlikely(!dest)) { dest = &tsk->peer; if (!syn || dest->family != AF_TIPC) @@ -1543,6 +1619,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, struct sk_buff *skb = __skb_dequeue(inputq); struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = buf_msg(skb); + struct tipc_group *grp = tsk->group; switch (msg_user(hdr)) { case CONN_MANAGER: @@ -1553,8 +1630,12 @@ static void tipc_sk_proto_rcv(struct sock *sk, tsk->cong_link_cnt--; sk->sk_write_space(sk); break; + case GROUP_PROTOCOL: + tipc_group_proto_rcv(grp, hdr, xmitq); + break; case TOP_SRV: - tipc_sk_top_evt(tsk, (void *)msg_data(hdr)); + tipc_group_member_evt(tsk->group, skb, xmitq); + skb = NULL; break; default: break; @@ -1699,6 +1780,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, { bool sk_conn = !tipc_sk_type_connectionless(sk); struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; struct tipc_msg *hdr = buf_msg(skb); struct net *net = sock_net(sk); struct sk_buff_head inputq; @@ -1710,15 +1792,19 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(!msg_isdata(hdr))) tipc_sk_proto_rcv(sk, &inputq, xmitq); - else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) + else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG)) return kfree_skb(skb); + if (unlikely(grp)) + tipc_group_filter_msg(grp, &inputq, xmitq); + /* Validate and add to receive buffer if there is space */ while ((skb = __skb_dequeue(&inputq))) { hdr = buf_msg(skb); limit = rcvbuf_limit(sk, skb); if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || - (!sk_conn && msg_connected(hdr))) + (!sk_conn && msg_connected(hdr)) || + (!grp && msg_in_group(hdr))) err = TIPC_ERR_NO_PORT; else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) err = TIPC_ERR_OVERLOAD; @@ -1837,7 +1923,6 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) sock_put(sk); continue; } - /* No destination socket => dequeue skb if still there */ skb = tipc_skb_dequeue(inputq, dport); if (!skb) @@ -1905,6 +1990,11 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest, lock_sock(sk); + if (tsk->group) { + res = -EINVAL; + goto exit; + } + if (dst->family == AF_UNSPEC) { memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc)); if (!tipc_sk_type_connectionless(sk)) @@ -2341,6 +2431,52 @@ void tipc_sk_rht_destroy(struct net *net) rhashtable_destroy(&tn->sk_rht); } +static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq) +{ + struct net *net = sock_net(&tsk->sk); + u32 domain = addr_domain(net, mreq->scope); + struct tipc_group *grp = tsk->group; + struct tipc_msg *hdr = &tsk->phdr; + struct tipc_name_seq seq; + int rc; + + if (mreq->type < TIPC_RESERVED_TYPES) + return -EACCES; + if (grp) + return -EACCES; + grp = tipc_group_create(net, tsk->portid, mreq); + if (!grp) + return -ENOMEM; + tsk->group = grp; + msg_set_lookup_scope(hdr, mreq->scope); + msg_set_nametype(hdr, mreq->type); + msg_set_dest_droppable(hdr, true); + seq.type = mreq->type; + seq.lower = mreq->instance; + seq.upper = seq.lower; + tipc_nametbl_build_group(net, grp, mreq->type, domain); + rc = tipc_sk_publish(tsk, mreq->scope, &seq); + if (rc) + tipc_group_delete(net, grp); + return rc; +} + +static int tipc_sk_leave(struct tipc_sock *tsk) +{ + struct net *net = sock_net(&tsk->sk); + struct tipc_group *grp = tsk->group; + struct tipc_name_seq seq; + int scope; + + if (!grp) + return -EINVAL; + tipc_group_self(grp, &seq, &scope); + tipc_group_delete(net, grp); + tsk->group = NULL; + tipc_sk_withdraw(tsk, scope, &seq); + return 0; +} + /** * tipc_setsockopt - set socket option * @sock: socket structure @@ -2359,6 +2495,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group_req mreq; u32 value = 0; int res = 0; @@ -2374,9 +2511,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, case TIPC_CONN_TIMEOUT: if (ol < sizeof(value)) return -EINVAL; - res = get_user(value, (u32 __user *)ov); - if (res) - return res; + if (get_user(value, (u32 __user *)ov)) + return -EFAULT; + break; + case TIPC_GROUP_JOIN: + if (ol < sizeof(mreq)) + return -EINVAL; + if (copy_from_user(&mreq, ov, sizeof(mreq))) + return -EFAULT; break; default: if (ov || ol) @@ -2409,6 +2551,12 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, tsk->mc_method.rcast = true; tsk->mc_method.mandatory = true; break; + case TIPC_GROUP_JOIN: + res = tipc_sk_join(tsk, &mreq); + break; + case TIPC_GROUP_LEAVE: + res = tipc_sk_leave(tsk); + break; default: res = -EINVAL; } @@ -2436,7 +2584,8 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - int len; + struct tipc_name_seq seq; + int len, scope; u32 value; int res; @@ -2470,6 +2619,12 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, case TIPC_SOCK_RECVQ_DEPTH: value = skb_queue_len(&sk->sk_receive_queue); break; + case TIPC_GROUP_JOIN: + seq.type = 0; + if (tsk->group) + tipc_group_self(tsk->group, &seq, &scope); + value = seq.type; + break; default: res = -EINVAL; } |