diff options
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r-- | net/tipc/link.c | 190 |
1 files changed, 86 insertions, 104 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c index ddee498..9e94bf9 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -171,14 +171,17 @@ int tipc_link_is_active(struct tipc_link *l_ptr) */ static void link_timeout(struct tipc_link *l_ptr) { + struct sk_buff *skb; + tipc_node_lock(l_ptr->owner); /* update counters used in statistical profiling of send traffic */ - l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; + l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue); l_ptr->stats.queue_sz_counts++; - if (l_ptr->first_out) { - struct tipc_msg *msg = buf_msg(l_ptr->first_out); + skb = skb_peek(&l_ptr->outqueue); + if (skb) { + struct tipc_msg *msg = buf_msg(skb); u32 length = msg_size(msg); if ((msg_user(msg) == MSG_FRAGMENTER) && @@ -206,7 +209,6 @@ static void link_timeout(struct tipc_link *l_ptr) } /* do all other link processing performed on a periodic basis */ - link_state_event(l_ptr, TIMEOUT_EVT); if (l_ptr->next_out) @@ -289,6 +291,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, link_init_max_pkt(l_ptr); l_ptr->next_out_no = 1; + __skb_queue_head_init(&l_ptr->outqueue); __skb_queue_head_init(&l_ptr->waiting_sks); link_reset_statistics(l_ptr); @@ -367,7 +370,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport, */ static void link_prepare_wakeup(struct tipc_link *link) { - uint pend_qsz = link->out_queue_size; + uint pend_qsz = skb_queue_len(&link->outqueue); struct sk_buff *skb, *tmp; skb_queue_walk_safe(&link->waiting_sks, skb, tmp) { @@ -380,17 +383,6 @@ static void link_prepare_wakeup(struct tipc_link *link) } /** - * link_release_outqueue - purge link's outbound message queue - * @l_ptr: pointer to link - */ -static void link_release_outqueue(struct tipc_link *l_ptr) -{ - kfree_skb_list(l_ptr->first_out); - l_ptr->first_out = NULL; - l_ptr->out_queue_size = 0; -} - -/** * tipc_link_reset_fragments - purge link's inbound message fragments queue * @l_ptr: pointer to link */ @@ -407,7 +399,7 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr) void tipc_link_purge_queues(struct tipc_link *l_ptr) { kfree_skb_list(l_ptr->oldest_deferred_in); - kfree_skb_list(l_ptr->first_out); + __skb_queue_purge(&l_ptr->outqueue); tipc_link_reset_fragments(l_ptr); } @@ -440,14 +432,12 @@ void tipc_link_reset(struct tipc_link *l_ptr) } /* Clean up all queues: */ - link_release_outqueue(l_ptr); + __skb_queue_purge(&l_ptr->outqueue); kfree_skb_list(l_ptr->oldest_deferred_in); if (!skb_queue_empty(&l_ptr->waiting_sks)) { skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); owner->action_flags |= TIPC_WAKEUP_USERS; } - l_ptr->last_out = NULL; - l_ptr->first_out = NULL; l_ptr->next_out = NULL; l_ptr->unacked_window = 0; l_ptr->checkpoint = 1; @@ -703,18 +693,17 @@ drop: /** * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked * @link: link to use - * @buf: chain of buffers containing message + * @skb: chain of buffers containing message * Consumes the buffer chain, except when returning -ELINKCONG * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket * user data messages) or -EHOSTUNREACH (all other messages/senders) * Only the socket functions tipc_send_stream() and tipc_send_packet() need * to act on the return value, since they may need to do more send attempts. */ -int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb) { - struct tipc_msg *msg = buf_msg(buf); + struct tipc_msg *msg = buf_msg(skb); uint psz = msg_size(msg); - uint qsz = link->out_queue_size; uint sndlim = link->queue_limit[0]; uint imp = tipc_msg_tot_importance(msg); uint mtu = link->max_pkt; @@ -722,58 +711,50 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) uint seqno = link->next_out_no; uint bc_last_in = link->owner->bclink.last_in; struct tipc_media_addr *addr = &link->media_addr; - struct sk_buff *next = buf->next; + struct sk_buff_head *outqueue = &link->outqueue; + struct sk_buff *next; /* Match queue limits against msg importance: */ - if (unlikely(qsz >= link->queue_limit[imp])) - return tipc_link_cong(link, buf); + if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp])) + return tipc_link_cong(link, skb); /* Has valid packet limit been used ? */ if (unlikely(psz > mtu)) { - kfree_skb_list(buf); + kfree_skb_list(skb); return -EMSGSIZE; } /* Prepare each packet for sending, and add to outqueue: */ - while (buf) { - next = buf->next; - msg = buf_msg(buf); + while (skb) { + next = skb->next; + msg = buf_msg(skb); msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); msg_set_bcast_ack(msg, bc_last_in); - if (!link->first_out) { - link->first_out = buf; - } else if (qsz < sndlim) { - link->last_out->next = buf; - } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { + if (skb_queue_len(outqueue) < sndlim) { + __skb_queue_tail(outqueue, skb); + tipc_bearer_send(link->bearer_id, skb, addr); + link->next_out = NULL; + link->unacked_window = 0; + } else if (tipc_msg_bundle(outqueue, skb, mtu)) { link->stats.sent_bundled++; - buf = next; - next = buf->next; + skb = next; continue; - } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { + } else if (tipc_msg_make_bundle(outqueue, skb, mtu, + link->addr)) { link->stats.sent_bundled++; link->stats.sent_bundles++; - link->last_out->next = buf; if (!link->next_out) - link->next_out = buf; + link->next_out = skb_peek_tail(outqueue); } else { - link->last_out->next = buf; + __skb_queue_tail(outqueue, skb); if (!link->next_out) - link->next_out = buf; - } - - /* Send packet if possible: */ - if (likely(++qsz <= sndlim)) { - tipc_bearer_send(link->bearer_id, buf, addr); - link->next_out = next; - link->unacked_window = 0; + link->next_out = skb; } seqno++; - link->last_out = buf; - buf = next; + skb = next; } link->next_out_no = seqno; - link->out_queue_size = qsz; return 0; } @@ -851,6 +832,14 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) kfree_skb(buf); } +struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list, + const struct sk_buff *skb) +{ + if (skb_queue_is_last(list, skb)) + return NULL; + return skb->next; +} + /* * tipc_link_push_packets - push unsent packets to bearer * @@ -861,15 +850,15 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) */ void tipc_link_push_packets(struct tipc_link *l_ptr) { - struct sk_buff *skb; + struct sk_buff_head *outqueue = &l_ptr->outqueue; + struct sk_buff *skb = l_ptr->next_out; struct tipc_msg *msg; u32 next, first; - while (l_ptr->next_out) { - skb = l_ptr->next_out; + skb_queue_walk_from(outqueue, skb) { msg = buf_msg(skb); next = msg_seqno(msg); - first = buf_seqno(l_ptr->first_out); + first = buf_seqno(skb_peek(outqueue)); if (mod(next - first) < l_ptr->queue_limit[0]) { msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); @@ -878,7 +867,7 @@ void tipc_link_push_packets(struct tipc_link *l_ptr) TIPC_SKB_CB(skb)->bundling = false; tipc_bearer_send(l_ptr->bearer_id, skb, &l_ptr->media_addr); - l_ptr->next_out = skb->next; + l_ptr->next_out = tipc_skb_queue_next(outqueue, skb); } else { break; } @@ -946,20 +935,20 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, } } -void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, +void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, u32 retransmits) { struct tipc_msg *msg; - if (!buf) + if (!skb) return; - msg = buf_msg(buf); + msg = buf_msg(skb); /* Detect repeated retransmit failures */ if (l_ptr->last_retransmitted == msg_seqno(msg)) { if (++l_ptr->stale_count > 100) { - link_retransmit_failure(l_ptr, buf); + link_retransmit_failure(l_ptr, skb); return; } } else { @@ -967,12 +956,13 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, l_ptr->stale_count = 1; } - while (retransmits && (buf != l_ptr->next_out) && buf) { - msg = buf_msg(buf); + skb_queue_walk_from(&l_ptr->outqueue, skb) { + if (!retransmits || skb == l_ptr->next_out) + break; + msg = buf_msg(skb); msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); - buf = buf->next; + tipc_bearer_send(l_ptr->bearer_id, skb, &l_ptr->media_addr); retransmits--; l_ptr->stats.retransmitted++; } @@ -1067,12 +1057,12 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) while (head) { struct tipc_node *n_ptr; struct tipc_link *l_ptr; - struct sk_buff *crs; struct sk_buff *buf = head; + struct sk_buff *skb1, *tmp; struct tipc_msg *msg; u32 seq_no; u32 ackd; - u32 released = 0; + u32 released; head = head->next; buf->next = NULL; @@ -1131,17 +1121,14 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (n_ptr->bclink.recv_permitted) tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); - crs = l_ptr->first_out; - while ((crs != l_ptr->next_out) && - less_eq(buf_seqno(crs), ackd)) { - struct sk_buff *next = crs->next; - kfree_skb(crs); - crs = next; - released++; - } - if (released) { - l_ptr->first_out = crs; - l_ptr->out_queue_size -= released; + released = 0; + skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) { + if (skb1 == l_ptr->next_out || + more(buf_seqno(skb1), ackd)) + break; + __skb_unlink(skb1, &l_ptr->outqueue); + kfree_skb(skb1); + released = 1; } /* Try sending any messages link endpoint has pending */ @@ -1590,7 +1577,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) } if (msg_seq_gap(msg)) { l_ptr->stats.recv_nacks++; - tipc_link_retransmit(l_ptr, l_ptr->first_out, + tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue), msg_seq_gap(msg)); } break; @@ -1637,10 +1624,10 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, */ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) { - u32 msgcount = l_ptr->out_queue_size; - struct sk_buff *crs = l_ptr->first_out; + u32 msgcount = skb_queue_len(&l_ptr->outqueue); struct tipc_link *tunnel = l_ptr->owner->active_links[0]; struct tipc_msg tunnel_hdr; + struct sk_buff *skb; int split_bundles; if (!tunnel) @@ -1651,14 +1638,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); msg_set_msgcnt(&tunnel_hdr, msgcount); - if (!l_ptr->first_out) { - struct sk_buff *buf; - - buf = tipc_buf_acquire(INT_H_SIZE); - if (buf) { - skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); + if (skb_queue_empty(&l_ptr->outqueue)) { + skb = tipc_buf_acquire(INT_H_SIZE); + if (skb) { + skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); msg_set_size(&tunnel_hdr, INT_H_SIZE); - __tipc_link_xmit(tunnel, buf); + __tipc_link_xmit(tunnel, skb); } else { pr_warn("%sunable to send changeover msg\n", link_co_err); @@ -1669,8 +1654,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) split_bundles = (l_ptr->owner->active_links[0] != l_ptr->owner->active_links[1]); - while (crs) { - struct tipc_msg *msg = buf_msg(crs); + skb_queue_walk(&l_ptr->outqueue, skb) { + struct tipc_msg *msg = buf_msg(skb); if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { struct tipc_msg *m = msg_get_wrapped(msg); @@ -1688,7 +1673,6 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, msg_link_selector(msg)); } - crs = crs->next; } } @@ -1704,17 +1688,16 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *tunnel) { - struct sk_buff *iter; + struct sk_buff *skb; struct tipc_msg tunnel_hdr; tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); - msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); + msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue)); msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); - iter = l_ptr->first_out; - while (iter) { - struct sk_buff *outbuf; - struct tipc_msg *msg = buf_msg(iter); + skb_queue_walk(&l_ptr->outqueue, skb) { + struct sk_buff *outskb; + struct tipc_msg *msg = buf_msg(skb); u32 length = msg_size(msg); if (msg_user(msg) == MSG_BUNDLER) @@ -1722,19 +1705,18 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); msg_set_size(&tunnel_hdr, length + INT_H_SIZE); - outbuf = tipc_buf_acquire(length + INT_H_SIZE); - if (outbuf == NULL) { + outskb = tipc_buf_acquire(length + INT_H_SIZE); + if (outskb == NULL) { pr_warn("%sunable to send duplicate msg\n", link_co_err); return; } - skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, + skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data, length); - __tipc_link_xmit(tunnel, outbuf); + __tipc_link_xmit(tunnel, outskb); if (!tipc_link_is_up(l_ptr)) return; - iter = iter->next; } } |