diff options
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r-- | net/tipc/bcast.c | 226 |
1 files changed, 78 insertions, 148 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index facc216..1f3b160 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void) return bcl->fsm_msg_cnt; } -/** - * bclink_set_gap - set gap according to contents of current deferred pkt queue - * - * Called with 'node' locked, bc_lock unlocked - */ - -static void bclink_set_gap(struct tipc_node *n_ptr) -{ - struct sk_buff *buf = n_ptr->bclink.deferred_head; - - n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = - mod(n_ptr->bclink.last_in); - if (unlikely(buf != NULL)) - n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1); -} - -/** - * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment - * - * This mechanism endeavours to prevent all nodes in network from trying - * to ACK or NACK at the same time. - * - * Note: TIPC uses a different trigger to distribute ACKs than it does to - * distribute NACKs, but tries to use the same spacing (divide by 16). - */ - -static int bclink_ack_allowed(u32 n) +static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) { - return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag; + node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? + seqno : node->bclink.last_sent; } -/** +/* * tipc_bclink_retransmit_to - get most recent node to request retransmission * * Called with bc_lock locked @@ -300,44 +275,56 @@ exit: spin_unlock_bh(&bc_lock); } -/** - * bclink_send_ack - unicast an ACK msg +/* + * tipc_bclink_update_link_state - update broadcast link state * * tipc_net_lock and node lock set */ -static void bclink_send_ack(struct tipc_node *n_ptr) +void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) { - struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; + struct sk_buff *buf; - if (l_ptr != NULL) - tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); -} + /* Ignore "stale" link state info */ -/** - * bclink_send_nack- broadcast a NACK msg - * - * tipc_net_lock and node lock set - */ + if (less_eq(last_sent, n_ptr->bclink.last_in)) + return; -static void bclink_send_nack(struct tipc_node *n_ptr) -{ - struct sk_buff *buf; - struct tipc_msg *msg; + /* Update link synchronization state; quit if in sync */ + + bclink_update_last_sent(n_ptr, last_sent); + + if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) + return; + + /* Update out-of-sync state; quit if loss is still unconfirmed */ - if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) + if ((++n_ptr->bclink.oos_state) == 1) { + if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) + return; + n_ptr->bclink.oos_state++; + } + + /* Don't NACK if one has been recently sent (or seen) */ + + if (n_ptr->bclink.oos_state & 0x1) return; + /* Send NACK */ + buf = tipc_buf_acquire(INT_H_SIZE); if (buf) { - msg = buf_msg(buf); + struct tipc_msg *msg = buf_msg(buf); + tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, - INT_H_SIZE, n_ptr->addr); + INT_H_SIZE, n_ptr->addr); msg_set_non_seq(msg, 1); msg_set_mc_netid(msg, tipc_net_id); - msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); - msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); - msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); + msg_set_bcast_ack(msg, n_ptr->bclink.last_in); + msg_set_bcgap_after(msg, n_ptr->bclink.last_in); + msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head + ? buf_seqno(n_ptr->bclink.deferred_head) - 1 + : n_ptr->bclink.last_sent); msg_set_bcast_tag(msg, tipc_own_tag); spin_lock_bh(&bc_lock); @@ -346,96 +333,37 @@ static void bclink_send_nack(struct tipc_node *n_ptr) spin_unlock_bh(&bc_lock); buf_discard(buf); - /* - * Ensure we doesn't send another NACK msg to the node - * until 16 more deferred messages arrive from it - * (i.e. helps prevent all nodes from NACK'ing at same time) - */ - - n_ptr->bclink.nack_sync = tipc_own_tag; + n_ptr->bclink.oos_state++; } } -/** - * tipc_bclink_check_gap - send a NACK if a sequence gap exists +/* + * bclink_peek_nack - monitor retransmission requests sent by other nodes * - * tipc_net_lock and node lock set - */ - -void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent) -{ - if (!n_ptr->bclink.supported || - less_eq(last_sent, mod(n_ptr->bclink.last_in))) - return; - - bclink_set_gap(n_ptr); - if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to) - n_ptr->bclink.gap_to = last_sent; - bclink_send_nack(n_ptr); -} - -/** - * tipc_bclink_peek_nack - process a NACK msg meant for another node + * Delay any upcoming NACK by this node if another node has already + * requested the first message this node is going to ask for. * * Only tipc_net_lock set. */ -static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) +static void bclink_peek_nack(struct tipc_msg *msg) { - struct tipc_node *n_ptr = tipc_node_find(dest); - u32 my_after, my_to; + struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg)); - if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) + if (unlikely(!n_ptr)) return; + tipc_node_lock(n_ptr); - /* - * Modify gap to suppress unnecessary NACKs from this node - */ - my_after = n_ptr->bclink.gap_after; - my_to = n_ptr->bclink.gap_to; - - if (less_eq(gap_after, my_after)) { - if (less(my_after, gap_to) && less(gap_to, my_to)) - n_ptr->bclink.gap_after = gap_to; - else if (less_eq(my_to, gap_to)) - n_ptr->bclink.gap_to = n_ptr->bclink.gap_after; - } else if (less_eq(gap_after, my_to)) { - if (less_eq(my_to, gap_to)) - n_ptr->bclink.gap_to = gap_after; - } else { - /* - * Expand gap if missing bufs not in deferred queue: - */ - struct sk_buff *buf = n_ptr->bclink.deferred_head; - u32 prev = n_ptr->bclink.gap_to; - for (; buf; buf = buf->next) { - u32 seqno = buf_seqno(buf); + if (n_ptr->bclink.supported && + (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && + (n_ptr->bclink.last_in == msg_bcgap_after(msg))) + n_ptr->bclink.oos_state = 2; - if (mod(seqno - prev) != 1) { - buf = NULL; - break; - } - if (seqno == gap_after) - break; - prev = seqno; - } - if (buf == NULL) - n_ptr->bclink.gap_to = gap_after; - } - /* - * Some nodes may send a complementary NACK now: - */ - if (bclink_ack_allowed(sender_tag + 1)) { - if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) { - bclink_send_nack(n_ptr); - bclink_set_gap(n_ptr); - } - } tipc_node_unlock(n_ptr); } -/** +/* * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster */ @@ -505,10 +433,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) spin_unlock_bh(&bc_lock); } else { tipc_node_unlock(node); - tipc_bclink_peek_nack(msg_destnode(msg), - msg_bcast_tag(msg), - msg_bcgap_after(msg), - msg_bcgap_to(msg)); + bclink_peek_nack(msg); } goto exit; } @@ -519,16 +444,28 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) next_in = mod(node->bclink.last_in + 1); if (likely(seqno == next_in)) { + bclink_update_last_sent(node, seqno); receive: + node->bclink.last_in = seqno; + node->bclink.oos_state = 0; + spin_lock_bh(&bc_lock); bcl->stats.recv_info++; - node->bclink.last_in++; - bclink_set_gap(node); - if (unlikely(bclink_ack_allowed(seqno))) { - bclink_send_ack(node); + + /* + * Unicast an ACK periodically, ensuring that + * all nodes in the cluster don't ACK at the same time + */ + + if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { + tipc_link_send_proto_msg( + node->active_links[node->addr & 1], + STATE_MSG, 0, 0, 0, 0, 0); bcl->stats.sent_acks++; } + /* Deliver message to destination */ + if (likely(msg_isdata(msg))) { spin_unlock_bh(&bc_lock); tipc_node_unlock(node); @@ -567,9 +504,14 @@ receive: if (unlikely(!tipc_node_is_up(node))) goto unlock; - if (!node->bclink.deferred_head) + if (node->bclink.last_in == node->bclink.last_sent) goto unlock; + if (!node->bclink.deferred_head) { + node->bclink.oos_state = 1; + goto unlock; + } + msg = buf_msg(node->bclink.deferred_head); seqno = msg_seqno(msg); next_in = mod(next_in + 1); @@ -580,31 +522,19 @@ receive: buf = node->bclink.deferred_head; node->bclink.deferred_head = buf->next; + node->bclink.deferred_size--; goto receive; } /* Handle out-of-sequence broadcast message */ if (less(next_in, seqno)) { - u32 gap_after = node->bclink.gap_after; - u32 gap_to = node->bclink.gap_to; - deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, &node->bclink.deferred_tail, buf); - if (deferred) { - node->bclink.nack_sync++; - if (seqno == mod(gap_after + 1)) - node->bclink.gap_after = seqno; - else if (less(gap_after, seqno) && less(seqno, gap_to)) - node->bclink.gap_to = seqno; - } + node->bclink.deferred_size += deferred; + bclink_update_last_sent(node, seqno); buf = NULL; - if (bclink_ack_allowed(node->bclink.nack_sync)) { - if (gap_to != gap_after) - bclink_send_nack(node); - bclink_set_gap(node); - } } else deferred = 0; |