From 5356f3d7d48af72eb2a14b643d5563f068c44fe0 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 5 May 2014 08:56:09 +0800 Subject: tipc: always use tipc_node_lock() to hold node lock Although we obtain node lock with tipc_node_lock() in most time, there are still places where we directly use native spin lock interface to grab node lock. But as we will do more jobs in the future when node lock is released, we should ensure that tipc_node_lock() is always called when node lock is taken. Signed-off-by: Ying Xue Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/link.c | 12 ++++++------ net/tipc/name_distr.c | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index c723ee9..3a80145 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -297,14 +297,14 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) rcu_read_lock(); list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { - spin_lock_bh(&n_ptr->lock); + tipc_node_lock(n_ptr); l_ptr = n_ptr->links[bearer_id]; if (l_ptr) { tipc_link_reset(l_ptr); if (shutting_down || !tipc_node_is_up(n_ptr)) { tipc_node_detach_link(l_ptr->owner, l_ptr); tipc_link_reset_fragments(l_ptr); - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); /* Nobody else can access this link now: */ del_timer_sync(&l_ptr->timer); @@ -312,12 +312,12 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) } else { /* Detach/delete when failover is finished: */ l_ptr->flags |= LINK_STOPPED; - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); del_timer_sync(&l_ptr->timer); } continue; } - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); } rcu_read_unlock(); } @@ -474,11 +474,11 @@ void tipc_link_reset_list(unsigned int bearer_id) rcu_read_lock(); list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { - spin_lock_bh(&n_ptr->lock); + tipc_node_lock(n_ptr); l_ptr = n_ptr->links[bearer_id]; if (l_ptr) tipc_link_reset(l_ptr); - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); } rcu_read_unlock(); } diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 974a73f..84652632 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -135,18 +135,18 @@ void named_cluster_distribute(struct sk_buff *buf) rcu_read_lock(); list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { - spin_lock_bh(&n_ptr->lock); + tipc_node_lock(n_ptr); l_ptr = n_ptr->active_links[n_ptr->addr & 1]; if (l_ptr) { buf_copy = skb_copy(buf, GFP_ATOMIC); if (!buf_copy) { - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); break; } msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); __tipc_link_xmit(l_ptr, buf_copy); } - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); } rcu_read_unlock(); -- cgit v1.1 From 486f930ac546914550b84abbc227867cc1be1f95 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 5 May 2014 08:56:10 +0800 Subject: tipc: adjust order of variables in tipc_node structure Move more frequently used variables up to the head of tipc_node structure, hopefully improving a bit performance. Signed-off-by: Ying Xue Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.h | 63 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/net/tipc/node.h b/net/tipc/node.h index 411b191..bb7f708 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -53,56 +53,61 @@ #define WAIT_NODE_DOWN 0x0004 /* wait until peer node is declared down */ /** + * struct tipc_node_bclink - TIPC node bclink structure + * @acked: sequence # of last outbound b'cast message acknowledged by node + * @last_in: sequence # of last in-sequence b'cast message received from node + * @last_sent: sequence # of last b'cast message sent by node + * @oos_state: state tracker for handling OOS b'cast messages + * @deferred_size: number of OOS b'cast messages in deferred queue + * @deferred_head: oldest OOS b'cast message received from node + * @deferred_tail: newest OOS b'cast message received from node + * @reasm_head: broadcast reassembly queue head from node + * @reasm_tail: last broadcast fragment received from node + * @recv_permitted: true if node is allowed to receive b'cast messages + */ +struct tipc_node_bclink { + u32 acked; + u32 last_in; + u32 last_sent; + u32 oos_state; + u32 deferred_size; + struct sk_buff *deferred_head; + struct sk_buff *deferred_tail; + struct sk_buff *reasm_head; + struct sk_buff *reasm_tail; + bool recv_permitted; +}; + +/** * struct tipc_node - TIPC node structure * @addr: network address of node * @lock: spinlock governing access to structure * @hash: links to adjacent nodes in unsorted hash chain - * @list: links to adjacent nodes in sorted list of cluster's nodes - * @nsub: list of "node down" subscriptions monitoring node * @active_links: pointers to active links to node * @links: pointers to all links to node - * @working_links: number of working links to node (both active and standby) * @block_setup: bit mask of conditions preventing link establishment to node + * @bclink: broadcast-related info + * @list: links to adjacent nodes in sorted list of cluster's nodes + * @working_links: number of working links to node (both active and standby) * @link_cnt: number of links to node * @signature: node instance identifier - * @bclink: broadcast-related info + * @nsub: list of "node down" subscriptions monitoring node * @rcu: rcu struct for tipc_node - * @acked: sequence # of last outbound b'cast message acknowledged by node - * @last_in: sequence # of last in-sequence b'cast message received from node - * @last_sent: sequence # of last b'cast message sent by node - * @oos_state: state tracker for handling OOS b'cast messages - * @deferred_size: number of OOS b'cast messages in deferred queue - * @deferred_head: oldest OOS b'cast message received from node - * @deferred_tail: newest OOS b'cast message received from node - * @reasm_head: broadcast reassembly queue head from node - * @reasm_tail: last broadcast fragment received from node - * @recv_permitted: true if node is allowed to receive b'cast messages */ struct tipc_node { u32 addr; spinlock_t lock; struct hlist_node hash; - struct list_head list; - struct list_head nsub; struct tipc_link *active_links[2]; struct tipc_link *links[MAX_BEARERS]; + int block_setup; + struct tipc_node_bclink bclink; + struct list_head list; int link_cnt; int working_links; - int block_setup; u32 signature; + struct list_head nsub; struct rcu_head rcu; - struct { - u32 acked; - u32 last_in; - u32 last_sent; - u32 oos_state; - u32 deferred_size; - struct sk_buff *deferred_head; - struct sk_buff *deferred_tail; - struct sk_buff *reasm_head; - struct sk_buff *reasm_tail; - bool recv_permitted; - } bclink; }; extern struct list_head tipc_node_list; -- cgit v1.1 From 10f465c4966fbc8f50a59480d37a3451f6f3d564 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 5 May 2014 08:56:11 +0800 Subject: tipc: rename setup_blocked variable of node struct to flags Rename setup_blocked variable of node struct to a more common name called "flags", which will be used to represent kinds of node states. Signed-off-by: Ying Xue Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/link.c | 20 ++++++++++---------- net/tipc/node.c | 6 +++--- net/tipc/node.h | 24 ++++++++++++++++++------ 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 3a80145..ac074aa 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1495,14 +1495,14 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) goto unlock_discard; /* Verify that communication with node is currently allowed */ - if ((n_ptr->block_setup & WAIT_PEER_DOWN) && - msg_user(msg) == LINK_PROTOCOL && - (msg_type(msg) == RESET_MSG || - msg_type(msg) == ACTIVATE_MSG) && - !msg_redundant_link(msg)) - n_ptr->block_setup &= ~WAIT_PEER_DOWN; - - if (n_ptr->block_setup) + if ((n_ptr->flags & TIPC_NODE_DOWN) && + msg_user(msg) == LINK_PROTOCOL && + (msg_type(msg) == RESET_MSG || + msg_type(msg) == ACTIVATE_MSG) && + !msg_redundant_link(msg)) + n_ptr->flags &= ~TIPC_NODE_DOWN; + + if (tipc_node_blocked(n_ptr)) goto unlock_discard; /* Validate message sequence number info */ @@ -1744,7 +1744,7 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, return; /* Abort non-RESET send if communication with node is prohibited */ - if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG)) + if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG)) return; /* Create protocol message with "out-of-sequence" sequence number */ @@ -1859,7 +1859,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) * peer has lost contact -- don't allow peer's links * to reactivate before we recognize loss & clean up */ - l_ptr->owner->block_setup = WAIT_NODE_DOWN; + l_ptr->owner->flags = TIPC_NODE_RESET; } link_state_event(l_ptr, RESET_MSG); diff --git a/net/tipc/node.c b/net/tipc/node.c index 6d6543e..2b0a084 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -108,7 +108,7 @@ struct tipc_node *tipc_node_create(u32 addr) break; } list_add_tail_rcu(&n_ptr->list, &temp_node->list); - n_ptr->block_setup = WAIT_PEER_DOWN; + n_ptr->flags = TIPC_NODE_DOWN; n_ptr->signature = INVALID_NODE_SIG; tipc_num_nodes++; @@ -280,7 +280,7 @@ static void node_name_purge_complete(unsigned long node_addr) n_ptr = tipc_node_find(node_addr); if (n_ptr) { tipc_node_lock(n_ptr); - n_ptr->block_setup &= ~WAIT_NAMES_GONE; + n_ptr->flags &= ~TIPC_NAMES_GONE; tipc_node_unlock(n_ptr); } } @@ -324,7 +324,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) tipc_nodesub_notify(n_ptr); /* Prevent re-contact with node until cleanup is done */ - n_ptr->block_setup = WAIT_PEER_DOWN | WAIT_NAMES_GONE; + n_ptr->flags = TIPC_NODE_DOWN | TIPC_NAMES_GONE; tipc_k_signal((Handler)node_name_purge_complete, n_ptr->addr); } diff --git a/net/tipc/node.h b/net/tipc/node.h index bb7f708..242b918 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -47,10 +47,16 @@ */ #define INVALID_NODE_SIG 0x10000 -/* Flags used to block (re)establishment of contact with a neighboring node */ -#define WAIT_PEER_DOWN 0x0001 /* wait to see that peer's links are down */ -#define WAIT_NAMES_GONE 0x0002 /* wait for peer's publications to be purged */ -#define WAIT_NODE_DOWN 0x0004 /* wait until peer node is declared down */ +/* Flags used to block (re)establishment of contact with a neighboring node + * TIPC_NODE_DOWN: indicate node is down + * TIPC_NAMES_GONE: indicate the node's publications are purged + * TIPC_NODE_RESET: indicate node is reset + */ +enum { + TIPC_NODE_DOWN = (1 << 1), + TIPC_NAMES_GONE = (1 << 2), + TIPC_NODE_RESET = (1 << 3) +}; /** * struct tipc_node_bclink - TIPC node bclink structure @@ -85,7 +91,7 @@ struct tipc_node_bclink { * @hash: links to adjacent nodes in unsorted hash chain * @active_links: pointers to active links to node * @links: pointers to all links to node - * @block_setup: bit mask of conditions preventing link establishment to node + * @flags: bit mask of conditions preventing link establishment to node * @bclink: broadcast-related info * @list: links to adjacent nodes in sorted list of cluster's nodes * @working_links: number of working links to node (both active and standby) @@ -100,7 +106,7 @@ struct tipc_node { struct hlist_node hash; struct tipc_link *active_links[2]; struct tipc_link *links[MAX_BEARERS]; - int block_setup; + unsigned int flags; struct tipc_node_bclink bclink; struct list_head list; int link_cnt; @@ -135,4 +141,10 @@ static inline void tipc_node_unlock(struct tipc_node *n_ptr) spin_unlock_bh(&n_ptr->lock); } +static inline bool tipc_node_blocked(struct tipc_node *node) +{ + return (node->flags & (TIPC_NODE_DOWN | TIPC_NAMES_GONE | + TIPC_NODE_RESET)); +} + #endif -- cgit v1.1 From 9db9fdd1983eb960182d72f95d77b91b3a5173d0 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 5 May 2014 08:56:12 +0800 Subject: tipc: avoid to asynchronously notify subscriptions Postpone the actions of notifying subscriptions until after node lock is released, avoiding to asynchronously execute registered handlers when node is lost. Signed-off-by: Ying Xue Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 23 +++++++++++++++++++++-- net/tipc/node.h | 15 +++++++-------- net/tipc/node_subscr.c | 9 ++++----- net/tipc/node_subscr.h | 2 +- 4 files changed, 33 insertions(+), 16 deletions(-) diff --git a/net/tipc/node.c b/net/tipc/node.c index 2b0a084..befbcc9 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -321,10 +321,10 @@ static void node_lost_contact(struct tipc_node *n_ptr) } /* Notify subscribers */ - tipc_nodesub_notify(n_ptr); + n_ptr->flags = TIPC_NODE_LOST; /* Prevent re-contact with node until cleanup is done */ - n_ptr->flags = TIPC_NODE_DOWN | TIPC_NAMES_GONE; + n_ptr->flags |= TIPC_NODE_DOWN | TIPC_NAMES_GONE; tipc_k_signal((Handler)node_name_purge_complete, n_ptr->addr); } @@ -465,3 +465,22 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len) tipc_node_unlock(node); return -EINVAL; } + +void tipc_node_unlock(struct tipc_node *node) +{ + LIST_HEAD(nsub_list); + + if (likely(!node->flags)) { + spin_unlock_bh(&node->lock); + return; + } + + if (node->flags & TIPC_NODE_LOST) { + list_replace_init(&node->nsub, &nsub_list); + node->flags &= ~TIPC_NODE_LOST; + } + spin_unlock_bh(&node->lock); + + if (!list_empty(&nsub_list)) + tipc_nodesub_notify(&nsub_list); +} diff --git a/net/tipc/node.h b/net/tipc/node.h index 242b918..fd86726 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -51,11 +51,14 @@ * TIPC_NODE_DOWN: indicate node is down * TIPC_NAMES_GONE: indicate the node's publications are purged * TIPC_NODE_RESET: indicate node is reset + * TIPC_NODE_LOST: indicate node is lost and it's used to notify subscriptions + * when node lock is released */ enum { TIPC_NODE_DOWN = (1 << 1), TIPC_NAMES_GONE = (1 << 2), - TIPC_NODE_RESET = (1 << 3) + TIPC_NODE_RESET = (1 << 3), + TIPC_NODE_LOST = (1 << 4) }; /** @@ -130,15 +133,11 @@ int tipc_node_is_up(struct tipc_node *n_ptr); struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space); struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space); int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len); +void tipc_node_unlock(struct tipc_node *node); -static inline void tipc_node_lock(struct tipc_node *n_ptr) +static inline void tipc_node_lock(struct tipc_node *node) { - spin_lock_bh(&n_ptr->lock); -} - -static inline void tipc_node_unlock(struct tipc_node *n_ptr) -{ - spin_unlock_bh(&n_ptr->lock); + spin_lock_bh(&node->lock); } static inline bool tipc_node_blocked(struct tipc_node *node) diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c index 8a7384c..7c59ab1 100644 --- a/net/tipc/node_subscr.c +++ b/net/tipc/node_subscr.c @@ -81,14 +81,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub) * * Note: node is locked by caller */ -void tipc_nodesub_notify(struct tipc_node *node) +void tipc_nodesub_notify(struct list_head *nsub_list) { - struct tipc_node_subscr *ns; + struct tipc_node_subscr *ns, *safe; - list_for_each_entry(ns, &node->nsub, nodesub_list) { + list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) { if (ns->handle_node_down) { - tipc_k_signal((Handler)ns->handle_node_down, - (unsigned long)ns->usr_handle); + ns->handle_node_down(ns->usr_handle); ns->handle_node_down = NULL; } } diff --git a/net/tipc/node_subscr.h b/net/tipc/node_subscr.h index c95d207..d91b8cc 100644 --- a/net/tipc/node_subscr.h +++ b/net/tipc/node_subscr.h @@ -58,6 +58,6 @@ struct tipc_node_subscr { void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr, void *usr_handle, net_ev_handler handle_down); void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub); -void tipc_nodesub_notify(struct tipc_node *node); +void tipc_nodesub_notify(struct list_head *nsub_list); #endif -- cgit v1.1 From 9d561949685749be3d97239eab7d85aa78718108 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 5 May 2014 08:56:13 +0800 Subject: tipc: remove TIPC_NAMES_GONE node flag Since previously what all publications pertaining to the lost node were removed from name table was finished in tasklet context asynchronously, we need to TIPC_NAMES_GONE flag indicating whether the node cleanup work is finished or not. But now as the cleanup work has been finished when node lock is released, the flag becomes meaningless for us. Signed-off-by: Ying Xue Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/node.c | 22 ++++------------------ net/tipc/node.h | 8 +++----- 2 files changed, 7 insertions(+), 23 deletions(-) diff --git a/net/tipc/node.c b/net/tipc/node.c index befbcc9..c3a36bb 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -273,18 +273,6 @@ static void node_established_contact(struct tipc_node *n_ptr) tipc_bclink_add_node(n_ptr->addr); } -static void node_name_purge_complete(unsigned long node_addr) -{ - struct tipc_node *n_ptr; - - n_ptr = tipc_node_find(node_addr); - if (n_ptr) { - tipc_node_lock(n_ptr); - n_ptr->flags &= ~TIPC_NAMES_GONE; - tipc_node_unlock(n_ptr); - } -} - static void node_lost_contact(struct tipc_node *n_ptr) { char addr_string[16]; @@ -320,12 +308,10 @@ static void node_lost_contact(struct tipc_node *n_ptr) tipc_link_reset_fragments(l_ptr); } - /* Notify subscribers */ - n_ptr->flags = TIPC_NODE_LOST; - - /* Prevent re-contact with node until cleanup is done */ - n_ptr->flags |= TIPC_NODE_DOWN | TIPC_NAMES_GONE; - tipc_k_signal((Handler)node_name_purge_complete, n_ptr->addr); + /* Notify subscribers and prevent re-contact with node until + * cleanup is done. + */ + n_ptr->flags = TIPC_NODE_DOWN | TIPC_NODE_LOST; } struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space) diff --git a/net/tipc/node.h b/net/tipc/node.h index fd86726..4bd5eff 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -49,16 +49,14 @@ /* Flags used to block (re)establishment of contact with a neighboring node * TIPC_NODE_DOWN: indicate node is down - * TIPC_NAMES_GONE: indicate the node's publications are purged * TIPC_NODE_RESET: indicate node is reset * TIPC_NODE_LOST: indicate node is lost and it's used to notify subscriptions * when node lock is released */ enum { TIPC_NODE_DOWN = (1 << 1), - TIPC_NAMES_GONE = (1 << 2), - TIPC_NODE_RESET = (1 << 3), - TIPC_NODE_LOST = (1 << 4) + TIPC_NODE_RESET = (1 << 2), + TIPC_NODE_LOST = (1 << 3) }; /** @@ -142,7 +140,7 @@ static inline void tipc_node_lock(struct tipc_node *node) static inline bool tipc_node_blocked(struct tipc_node *node) { - return (node->flags & (TIPC_NODE_DOWN | TIPC_NAMES_GONE | + return (node->flags & (TIPC_NODE_DOWN | TIPC_NODE_LOST | TIPC_NODE_RESET)); } -- cgit v1.1 From ca0c42732c512a12fabe677594840f31861dd31a Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 5 May 2014 08:56:14 +0800 Subject: tipc: avoid to asynchronously deliver name tables to peer node Postpone the actions of delivering name tables until after node lock is released, avoiding to do it under asynchronous context. Signed-off-by: Ying Xue Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/name_distr.c | 52 ++------------------------------------------------- net/tipc/name_distr.h | 30 ++++++++++++++++++++++++++++- net/tipc/node.c | 16 +++++++++++++++- net/tipc/node.h | 8 ++++++-- 4 files changed, 52 insertions(+), 54 deletions(-) diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 84652632..8ce7309 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -38,34 +38,6 @@ #include "link.h" #include "name_distr.h" -#define ITEM_SIZE sizeof(struct distr_item) - -/** - * struct distr_item - publication info distributed to other nodes - * @type: name sequence type - * @lower: name sequence lower bound - * @upper: name sequence upper bound - * @ref: publishing port reference - * @key: publication key - * - * ===> All fields are stored in network byte order. <=== - * - * First 3 fields identify (name or) name sequence being published. - * Reference field uniquely identifies port that published name sequence. - * Key field uniquely identifies publication, in the event a port has - * multiple publications of the same name sequence. - * - * Note: There is no field that identifies the publishing node because it is - * the same for all items contained within a publication message. - */ -struct distr_item { - __be32 type; - __be32 lower; - __be32 upper; - __be32 ref; - __be32 key; -}; - /** * struct publ_list - list of publications made by this node * @list: circular list of publications @@ -239,29 +211,9 @@ static void named_distribute(struct list_head *message_list, u32 node, /** * tipc_named_node_up - tell specified node about all publications by this node */ -void tipc_named_node_up(unsigned long nodearg) +void tipc_named_node_up(u32 max_item_buf, u32 node) { - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; - struct list_head message_list; - u32 node = (u32)nodearg; - u32 max_item_buf = 0; - - /* compute maximum amount of publication data to send per message */ - n_ptr = tipc_node_find(node); - if (n_ptr) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[0]; - if (l_ptr) - max_item_buf = ((l_ptr->max_pkt - INT_H_SIZE) / - ITEM_SIZE) * ITEM_SIZE; - tipc_node_unlock(n_ptr); - } - if (!max_item_buf) - return; - - /* create list of publication messages, then send them as a unit */ - INIT_LIST_HEAD(&message_list); + LIST_HEAD(message_list); read_lock_bh(&tipc_nametbl_lock); named_distribute(&message_list, node, &publ_cluster, max_item_buf); diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h index 47ff829..b2eed4e 100644 --- a/net/tipc/name_distr.h +++ b/net/tipc/name_distr.h @@ -39,10 +39,38 @@ #include "name_table.h" +#define ITEM_SIZE sizeof(struct distr_item) + +/** + * struct distr_item - publication info distributed to other nodes + * @type: name sequence type + * @lower: name sequence lower bound + * @upper: name sequence upper bound + * @ref: publishing port reference + * @key: publication key + * + * ===> All fields are stored in network byte order. <=== + * + * First 3 fields identify (name or) name sequence being published. + * Reference field uniquely identifies port that published name sequence. + * Key field uniquely identifies publication, in the event a port has + * multiple publications of the same name sequence. + * + * Note: There is no field that identifies the publishing node because it is + * the same for all items contained within a publication message. + */ +struct distr_item { + __be32 type; + __be32 lower; + __be32 upper; + __be32 ref; + __be32 key; +}; + struct sk_buff *tipc_named_publish(struct publication *publ); struct sk_buff *tipc_named_withdraw(struct publication *publ); void named_cluster_distribute(struct sk_buff *buf); -void tipc_named_node_up(unsigned long node); +void tipc_named_node_up(u32 max_item_buf, u32 node); void tipc_named_rcv(struct sk_buff *buf); void tipc_named_reinit(void); diff --git a/net/tipc/node.c b/net/tipc/node.c index c3a36bb..74efebc 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -267,7 +267,7 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) static void node_established_contact(struct tipc_node *n_ptr) { - tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr); + n_ptr->flags |= TIPC_NODE_UP; n_ptr->bclink.oos_state = 0; n_ptr->bclink.acked = tipc_bclink_get_last_sent(); tipc_bclink_add_node(n_ptr->addr); @@ -455,6 +455,9 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len) void tipc_node_unlock(struct tipc_node *node) { LIST_HEAD(nsub_list); + struct tipc_link *link; + int pkt_sz = 0; + u32 addr = 0; if (likely(!node->flags)) { spin_unlock_bh(&node->lock); @@ -465,8 +468,19 @@ void tipc_node_unlock(struct tipc_node *node) list_replace_init(&node->nsub, &nsub_list); node->flags &= ~TIPC_NODE_LOST; } + if (node->flags & TIPC_NODE_UP) { + link = node->active_links[0]; + node->flags &= ~TIPC_NODE_UP; + if (link) { + pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) * + ITEM_SIZE; + addr = node->addr; + } + } spin_unlock_bh(&node->lock); if (!list_empty(&nsub_list)) tipc_nodesub_notify(&nsub_list); + if (pkt_sz) + tipc_named_node_up(pkt_sz, addr); } diff --git a/net/tipc/node.h b/net/tipc/node.h index 4bd5eff..38f710f 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -48,15 +48,19 @@ #define INVALID_NODE_SIG 0x10000 /* Flags used to block (re)establishment of contact with a neighboring node - * TIPC_NODE_DOWN: indicate node is down + * TIPC_NODE_DOWN: indicate node is down and it's used to block the node's + * links until RESET or ACTIVE message arrives * TIPC_NODE_RESET: indicate node is reset * TIPC_NODE_LOST: indicate node is lost and it's used to notify subscriptions * when node lock is released + * TIPC_NODE_UP: indicate node is up and it's used to deliver local name table + * when node lock is released */ enum { TIPC_NODE_DOWN = (1 << 1), TIPC_NODE_RESET = (1 << 2), - TIPC_NODE_LOST = (1 << 3) + TIPC_NODE_LOST = (1 << 3), + TIPC_NODE_UP = (1 << 4) }; /** -- cgit v1.1 From d69afc90b8d47e471d2870f090f662e569b08407 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 5 May 2014 08:56:15 +0800 Subject: tipc: define new functions to operate bc_lock As we are going to do more jobs when bc_lock is released, the two operations of holding/releasing the lock should be encapsulated with functions. In addition, we move bc_lock spin lock into tipc_bclink structure avoiding to define the global variable. Signed-off-by: Ying Xue Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/bcast.c | 96 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 53 insertions(+), 43 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 119a59b..9eceaa7 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -71,7 +71,7 @@ struct tipc_bcbearer_pair { * Note: The fields labelled "temporary" are incorporated into the bearer * to avoid consuming potentially limited stack space through the use of * large local variables within multicast routines. Concurrent access is - * prevented through use of the spinlock "bc_lock". + * prevented through use of the spinlock "bclink_lock". */ struct tipc_bcbearer { struct tipc_bearer bearer; @@ -84,6 +84,7 @@ struct tipc_bcbearer { /** * struct tipc_bclink - link used for broadcast messages + * @lock: spinlock governing access to structure * @link: (non-standard) broadcast link structure * @node: (non-standard) node structure representing b'cast link's peer node * @bcast_nodes: map of broadcast-capable nodes @@ -92,6 +93,7 @@ struct tipc_bcbearer { * Handles sequence numbering, fragmentation, bundling, etc. */ struct tipc_bclink { + spinlock_t lock; struct tipc_link link; struct tipc_node node; struct tipc_node_map bcast_nodes; @@ -105,8 +107,6 @@ static struct tipc_bcbearer *bcbearer = &bcast_bearer; static struct tipc_bclink *bclink = &bcast_link; static struct tipc_link *bcl = &bcast_link.link; -static DEFINE_SPINLOCK(bc_lock); - const char tipc_bclink_name[] = "broadcast-link"; static void tipc_nmap_diff(struct tipc_node_map *nm_a, @@ -115,6 +115,16 @@ static void tipc_nmap_diff(struct tipc_node_map *nm_a, static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); +static void tipc_bclink_lock(void) +{ + spin_lock_bh(&bclink->lock); +} + +static void tipc_bclink_unlock(void) +{ + spin_unlock_bh(&bclink->lock); +} + static u32 bcbuf_acks(struct sk_buff *buf) { return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; @@ -132,16 +142,16 @@ static void bcbuf_decr_acks(struct sk_buff *buf) void tipc_bclink_add_node(u32 addr) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_nmap_add(&bclink->bcast_nodes, addr); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } void tipc_bclink_remove_node(u32 addr) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_nmap_remove(&bclink->bcast_nodes, addr); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } static void bclink_set_last_sent(void) @@ -167,7 +177,7 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) /** * tipc_bclink_retransmit_to - get most recent node to request retransmission * - * Called with bc_lock locked + * Called with bclink_lock locked */ struct tipc_node *tipc_bclink_retransmit_to(void) { @@ -179,7 +189,7 @@ struct tipc_node *tipc_bclink_retransmit_to(void) * @after: sequence number of last packet to *not* retransmit * @to: sequence number of last packet to retransmit * - * Called with bc_lock locked + * Called with bclink_lock locked */ static void bclink_retransmit_pkt(u32 after, u32 to) { @@ -196,7 +206,7 @@ static void bclink_retransmit_pkt(u32 after, u32 to) * @n_ptr: node that sent acknowledgement info * @acked: broadcast sequence # that has been acknowledged * - * Node is locked, bc_lock unlocked. + * Node is locked, bclink_lock unlocked. */ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) { @@ -204,8 +214,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) struct sk_buff *next; unsigned int released = 0; - spin_lock_bh(&bc_lock); - + tipc_bclink_lock(); /* Bail out if tx queue is empty (no clean up is required) */ crs = bcl->first_out; if (!crs) @@ -269,7 +278,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) if (unlikely(released && !list_empty(&bcl->waiting_ports))) tipc_link_wakeup_ports(bcl, 0); exit: - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } /** @@ -322,10 +331,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) ? buf_seqno(n_ptr->bclink.deferred_head) - 1 : n_ptr->bclink.last_sent); - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_bearer_send(MAX_BEARERS, buf, NULL); bcl->stats.sent_nacks++; - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); kfree_skb(buf); n_ptr->bclink.oos_state++; @@ -362,7 +371,7 @@ int tipc_bclink_xmit(struct sk_buff *buf) { int res; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); if (!bclink->bcast_nodes.count) { res = msg_data_sz(buf_msg(buf)); @@ -377,14 +386,14 @@ int tipc_bclink_xmit(struct sk_buff *buf) bcl->stats.accu_queue_sz += bcl->out_queue_size; } exit: - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return res; } /** * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet * - * Called with both sending node's lock and bc_lock taken. + * Called with both sending node's lock and bclink_lock taken. */ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) { @@ -439,12 +448,12 @@ void tipc_bclink_rcv(struct sk_buff *buf) if (msg_destnode(msg) == tipc_own_addr) { tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); tipc_node_unlock(node); - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bcl->stats.recv_nacks++; bclink->retransmit_to = node; bclink_retransmit_pkt(msg_bcgap_after(msg), msg_bcgap_to(msg)); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } else { tipc_node_unlock(node); bclink_peek_nack(msg); @@ -462,20 +471,20 @@ receive: /* Deliver message to destination */ if (likely(msg_isdata(msg))) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); if (likely(msg_mcast(msg))) tipc_port_mcast_rcv(buf, NULL); else kfree_skb(buf); } else if (msg_user(msg) == MSG_BUNDLER) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); bcl->stats.recv_bundles++; bcl->stats.recv_bundled += msg_msgcnt(msg); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); tipc_link_bundle_rcv(buf); } else if (msg_user(msg) == MSG_FRAGMENTER) { @@ -485,28 +494,28 @@ receive: &buf); if (ret == LINK_REASM_ERROR) goto unlock; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); bcl->stats.recv_fragments++; if (ret == LINK_REASM_COMPLETE) { bcl->stats.recv_fragmented++; /* Point msg to inner header */ msg = buf_msg(buf); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); goto receive; } - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); } else if (msg_user(msg) == NAME_DISTRIBUTOR) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); tipc_named_rcv(buf); } else { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); kfree_skb(buf); } @@ -552,14 +561,14 @@ receive: } else deferred = 0; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); if (deferred) bcl->stats.deferred_recv++; else bcl->stats.duplicates++; - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); unlock: tipc_node_unlock(node); @@ -663,7 +672,7 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action) int b_index; int pri; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); if (action) tipc_nmap_add(nm_ptr, node); @@ -710,7 +719,7 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action) bp_curr++; } - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } @@ -722,7 +731,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size) if (!bcl) return 0; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); s = &bcl->stats; @@ -751,7 +760,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size) s->queue_sz_counts ? (s->accu_queue_sz / s->queue_sz_counts) : 0); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return ret; } @@ -760,9 +769,9 @@ int tipc_bclink_reset_stats(void) if (!bcl) return -ENOPROTOOPT; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); memset(&bcl->stats, 0, sizeof(bcl->stats)); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return 0; } @@ -773,9 +782,9 @@ int tipc_bclink_set_queue_limits(u32 limit) if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) return -EINVAL; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_link_set_queue_limits(bcl, limit); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return 0; } @@ -785,6 +794,7 @@ void tipc_bclink_init(void) bcbearer->media.send_msg = tipc_bcbearer_send; sprintf(bcbearer->media.name, "tipc-broadcast"); + spin_lock_init(&bclink->lock); INIT_LIST_HEAD(&bcl->waiting_ports); bcl->next_out_no = 1; spin_lock_init(&bclink->node.lock); @@ -799,9 +809,9 @@ void tipc_bclink_init(void) void tipc_bclink_stop(void) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_link_purge_queues(bcl); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); RCU_INIT_POINTER(bearer_list[BCBEARER], NULL); memset(bclink, 0, sizeof(*bclink)); -- cgit v1.1 From eb8b00f5f248c50603bca383792ac3a618297be0 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 5 May 2014 08:56:16 +0800 Subject: tipc: convert allocations of global variables associated with bclink Convert allocations of global variables associated with bclink from static way to dynamical way for the convenience of bclink instance initialisation. Meanwhile, this also helps TIPC support name space in the future easily. Signed-off-by: Ying Xue Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/bcast.c | 28 +++++++++++++++++++--------- net/tipc/bcast.h | 2 +- net/tipc/config.c | 6 ++++-- net/tipc/net.c | 9 +++++++-- net/tipc/net.h | 2 +- 5 files changed, 32 insertions(+), 15 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 9eceaa7..ef8cff4 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -100,12 +100,9 @@ struct tipc_bclink { struct tipc_node *retransmit_to; }; -static struct tipc_bcbearer bcast_bearer; -static struct tipc_bclink bcast_link; - -static struct tipc_bcbearer *bcbearer = &bcast_bearer; -static struct tipc_bclink *bclink = &bcast_link; -static struct tipc_link *bcl = &bcast_link.link; +static struct tipc_bcbearer *bcbearer; +static struct tipc_bclink *bclink; +static struct tipc_link *bcl; const char tipc_bclink_name[] = "broadcast-link"; @@ -788,8 +785,19 @@ int tipc_bclink_set_queue_limits(u32 limit) return 0; } -void tipc_bclink_init(void) +int tipc_bclink_init(void) { + bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); + if (!bcbearer) + return -ENOMEM; + + bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); + if (!bclink) { + kfree(bcbearer); + return -ENOMEM; + } + + bcl = &bclink->link; bcbearer->bearer.media = &bcbearer->media; bcbearer->media.send_msg = tipc_bcbearer_send; sprintf(bcbearer->media.name, "tipc-broadcast"); @@ -805,6 +813,7 @@ void tipc_bclink_init(void) rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer); bcl->state = WORKING_WORKING; strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); + return 0; } void tipc_bclink_stop(void) @@ -814,8 +823,9 @@ void tipc_bclink_stop(void) tipc_bclink_unlock(); RCU_INIT_POINTER(bearer_list[BCBEARER], NULL); - memset(bclink, 0, sizeof(*bclink)); - memset(bcbearer, 0, sizeof(*bcbearer)); + synchronize_net(); + kfree(bcbearer); + kfree(bclink); } /** diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 7c1ef1b..ea162c7 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -81,7 +81,7 @@ static inline int tipc_nmap_equal(struct tipc_node_map *nm_a, void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port); void tipc_port_list_free(struct tipc_port_list *pl_ptr); -void tipc_bclink_init(void); +int tipc_bclink_init(void); void tipc_bclink_stop(void); void tipc_bclink_add_node(u32 addr); void tipc_bclink_remove_node(u32 addr); diff --git a/net/tipc/config.c b/net/tipc/config.c index 251f5a2..2b42403 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -177,8 +177,10 @@ static struct sk_buff *cfg_set_own_addr(void) if (tipc_own_addr) return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED " (cannot change node address once assigned)"); - tipc_net_start(addr); - return tipc_cfg_reply_none(); + if (!tipc_net_start(addr)) + return tipc_cfg_reply_none(); + + return tipc_cfg_reply_error_string("cannot change to network mode"); } static struct sk_buff *cfg_set_max_ports(void) diff --git a/net/tipc/net.c b/net/tipc/net.c index 75bb390..f8fc95d 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -164,20 +164,25 @@ void tipc_net_route_msg(struct sk_buff *buf) tipc_link_xmit(buf, dnode, msg_link_selector(msg)); } -void tipc_net_start(u32 addr) +int tipc_net_start(u32 addr) { char addr_string[16]; + int res; tipc_own_addr = addr; tipc_named_reinit(); tipc_port_reinit(); - tipc_bclink_init(); + res = tipc_bclink_init(); + if (res) + return res; + tipc_nametbl_publish(TIPC_CFG_SRV, tipc_own_addr, tipc_own_addr, TIPC_ZONE_SCOPE, 0, tipc_own_addr); pr_info("Started in network mode\n"); pr_info("Own node address %s, network identity %u\n", tipc_addr_string_fill(addr_string, tipc_own_addr), tipc_net_id); + return 0; } void tipc_net_stop(void) diff --git a/net/tipc/net.h b/net/tipc/net.h index f781cae..c6c2b46 100644 --- a/net/tipc/net.h +++ b/net/tipc/net.h @@ -39,7 +39,7 @@ void tipc_net_route_msg(struct sk_buff *buf); -void tipc_net_start(u32 addr); +int tipc_net_start(u32 addr); void tipc_net_stop(void); #endif -- cgit v1.1 From 3f5a12bd9f9a61d8a12f9adf778b14e4bb8ca050 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 5 May 2014 08:56:17 +0800 Subject: tipc: avoid to asynchronously reset all links Postpone the actions of resetting all links until after bclink lock is released, avoiding to asynchronously reset all links. Signed-off-by: Ying Xue Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/bcast.c | 21 +++++++++++++++++++++ net/tipc/bcast.h | 2 ++ net/tipc/link.c | 22 ++++++++-------------- net/tipc/link.h | 1 + 4 files changed, 32 insertions(+), 14 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index ef8cff4..a0978d0 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -87,6 +87,7 @@ struct tipc_bcbearer { * @lock: spinlock governing access to structure * @link: (non-standard) broadcast link structure * @node: (non-standard) node structure representing b'cast link's peer node + * @flags: represent bclink states * @bcast_nodes: map of broadcast-capable nodes * @retransmit_to: node that most recently requested a retransmit * @@ -96,6 +97,7 @@ struct tipc_bclink { spinlock_t lock; struct tipc_link link; struct tipc_node node; + unsigned int flags; struct tipc_node_map bcast_nodes; struct tipc_node *retransmit_to; }; @@ -119,7 +121,26 @@ static void tipc_bclink_lock(void) static void tipc_bclink_unlock(void) { + struct tipc_node *node = NULL; + + if (likely(!bclink->flags)) { + spin_unlock_bh(&bclink->lock); + return; + } + + if (bclink->flags & TIPC_BCLINK_RESET) { + bclink->flags &= ~TIPC_BCLINK_RESET; + node = tipc_bclink_retransmit_to(); + } spin_unlock_bh(&bclink->lock); + + if (node) + tipc_link_reset_all(node); +} + +void tipc_bclink_set_flags(unsigned int flags) +{ + bclink->flags |= flags; } static u32 bcbuf_acks(struct sk_buff *buf) diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index ea162c7..00330c4 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -39,6 +39,7 @@ #define MAX_NODES 4096 #define WSIZE 32 +#define TIPC_BCLINK_RESET 1 /** * struct tipc_node_map - set of node identifiers @@ -83,6 +84,7 @@ void tipc_port_list_free(struct tipc_port_list *pl_ptr); int tipc_bclink_init(void); void tipc_bclink_stop(void); +void tipc_bclink_set_flags(unsigned int flags); void tipc_bclink_add_node(u32 addr); void tipc_bclink_remove_node(u32 addr); struct tipc_node *tipc_bclink_retransmit_to(void); diff --git a/net/tipc/link.c b/net/tipc/link.c index ac074aa..dce2bef 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1259,29 +1259,24 @@ void tipc_link_push_queue(struct tipc_link *l_ptr) } while (!res); } -static void link_reset_all(unsigned long addr) +void tipc_link_reset_all(struct tipc_node *node) { - struct tipc_node *n_ptr; char addr_string[16]; u32 i; - n_ptr = tipc_node_find((u32)addr); - if (!n_ptr) - return; /* node no longer exists */ - - tipc_node_lock(n_ptr); + tipc_node_lock(node); pr_warn("Resetting all links to %s\n", - tipc_addr_string_fill(addr_string, n_ptr->addr)); + tipc_addr_string_fill(addr_string, node->addr)); for (i = 0; i < MAX_BEARERS; i++) { - if (n_ptr->links[i]) { - link_print(n_ptr->links[i], "Resetting link\n"); - tipc_link_reset(n_ptr->links[i]); + if (node->links[i]) { + link_print(node->links[i], "Resetting link\n"); + tipc_link_reset(node->links[i]); } } - tipc_node_unlock(n_ptr); + tipc_node_unlock(node); } static void link_retransmit_failure(struct tipc_link *l_ptr, @@ -1318,10 +1313,9 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, n_ptr->bclink.oos_state, n_ptr->bclink.last_sent); - tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); - tipc_node_unlock(n_ptr); + tipc_bclink_set_flags(TIPC_BCLINK_RESET); l_ptr->stale_count = 0; } } diff --git a/net/tipc/link.h b/net/tipc/link.h index 4b556c1..7ba73fa 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -230,6 +230,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space); struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space); +void tipc_link_reset_all(struct tipc_node *node); void tipc_link_reset(struct tipc_link *l_ptr); void tipc_link_reset_list(unsigned int bearer_id); int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); -- cgit v1.1 From 52ff872055e06af10f94b8853c946f07ed8a0672 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 5 May 2014 08:56:18 +0800 Subject: tipc: purge signal handler infrastructure In the previous commits of this series, we removed all asynchronous actions which were based on the tasklet handler - "tipc_k_signal()". So the moment has now come when we can completely remove the tasklet handler infrastructure. That is done with this commit. Signed-off-by: Ying Xue Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/Makefile | 2 +- net/tipc/core.c | 7 --- net/tipc/core.h | 6 +-- net/tipc/handler.c | 134 ----------------------------------------------------- 4 files changed, 2 insertions(+), 147 deletions(-) delete mode 100644 net/tipc/handler.c diff --git a/net/tipc/Makefile b/net/tipc/Makefile index b282f71..a080c66 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -5,7 +5,7 @@ obj-$(CONFIG_TIPC) := tipc.o tipc-y += addr.o bcast.o bearer.o config.o \ - core.o handler.o link.o discover.o msg.o \ + core.o link.o discover.o msg.o \ name_distr.o subscr.o name_table.o net.o \ netlink.o node.o node_subscr.o port.o ref.o \ socket.o log.o eth_media.o server.o diff --git a/net/tipc/core.c b/net/tipc/core.c index 50d5742..57f8ae9 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -80,7 +80,6 @@ struct sk_buff *tipc_buf_acquire(u32 size) */ static void tipc_core_stop(void) { - tipc_handler_stop(); tipc_net_stop(); tipc_bearer_cleanup(); tipc_netlink_stop(); @@ -100,10 +99,6 @@ static int tipc_core_start(void) get_random_bytes(&tipc_random, sizeof(tipc_random)); - err = tipc_handler_start(); - if (err) - goto out_handler; - err = tipc_ref_table_init(tipc_max_ports, tipc_random); if (err) goto out_reftbl; @@ -146,8 +141,6 @@ out_netlink: out_nametbl: tipc_ref_table_stop(); out_reftbl: - tipc_handler_stop(); -out_handler: return err; } diff --git a/net/tipc/core.h b/net/tipc/core.h index 36cbf15..ae55d37 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -89,8 +89,6 @@ extern int tipc_random __read_mostly; /* * Routines available to privileged subsystems */ -int tipc_handler_start(void); -void tipc_handler_stop(void); int tipc_netlink_start(void); void tipc_netlink_stop(void); int tipc_socket_init(void); @@ -109,12 +107,10 @@ void tipc_unregister_sysctl(void); #endif /* - * TIPC timer and signal code + * TIPC timer code */ typedef void (*Handler) (unsigned long); -u32 tipc_k_signal(Handler routine, unsigned long argument); - /** * k_init_timer - initialize a timer * @timer: pointer to timer structure diff --git a/net/tipc/handler.c b/net/tipc/handler.c deleted file mode 100644 index 1fabf16..0000000 --- a/net/tipc/handler.c +++ /dev/null @@ -1,134 +0,0 @@ -/* - * net/tipc/handler.c: TIPC signal handling - * - * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2005, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "core.h" - -struct queue_item { - struct list_head next_signal; - void (*handler) (unsigned long); - unsigned long data; -}; - -static struct kmem_cache *tipc_queue_item_cache; -static struct list_head signal_queue_head; -static DEFINE_SPINLOCK(qitem_lock); -static int handler_enabled __read_mostly; - -static void process_signal_queue(unsigned long dummy); - -static DECLARE_TASKLET_DISABLED(tipc_tasklet, process_signal_queue, 0); - - -unsigned int tipc_k_signal(Handler routine, unsigned long argument) -{ - struct queue_item *item; - - spin_lock_bh(&qitem_lock); - if (!handler_enabled) { - spin_unlock_bh(&qitem_lock); - return -ENOPROTOOPT; - } - - item = kmem_cache_alloc(tipc_queue_item_cache, GFP_ATOMIC); - if (!item) { - pr_err("Signal queue out of memory\n"); - spin_unlock_bh(&qitem_lock); - return -ENOMEM; - } - item->handler = routine; - item->data = argument; - list_add_tail(&item->next_signal, &signal_queue_head); - spin_unlock_bh(&qitem_lock); - tasklet_schedule(&tipc_tasklet); - return 0; -} - -static void process_signal_queue(unsigned long dummy) -{ - struct queue_item *__volatile__ item; - struct list_head *l, *n; - - spin_lock_bh(&qitem_lock); - list_for_each_safe(l, n, &signal_queue_head) { - item = list_entry(l, struct queue_item, next_signal); - list_del(&item->next_signal); - spin_unlock_bh(&qitem_lock); - item->handler(item->data); - spin_lock_bh(&qitem_lock); - kmem_cache_free(tipc_queue_item_cache, item); - } - spin_unlock_bh(&qitem_lock); -} - -int tipc_handler_start(void) -{ - tipc_queue_item_cache = - kmem_cache_create("tipc_queue_items", sizeof(struct queue_item), - 0, SLAB_HWCACHE_ALIGN, NULL); - if (!tipc_queue_item_cache) - return -ENOMEM; - - INIT_LIST_HEAD(&signal_queue_head); - tasklet_enable(&tipc_tasklet); - handler_enabled = 1; - return 0; -} - -void tipc_handler_stop(void) -{ - struct list_head *l, *n; - struct queue_item *item; - - spin_lock_bh(&qitem_lock); - if (!handler_enabled) { - spin_unlock_bh(&qitem_lock); - return; - } - handler_enabled = 0; - spin_unlock_bh(&qitem_lock); - - tasklet_kill(&tipc_tasklet); - - spin_lock_bh(&qitem_lock); - list_for_each_safe(l, n, &signal_queue_head) { - item = list_entry(l, struct queue_item, next_signal); - list_del(&item->next_signal); - kmem_cache_free(tipc_queue_item_cache, item); - } - spin_unlock_bh(&qitem_lock); - - kmem_cache_destroy(tipc_queue_item_cache); -} -- cgit v1.1