summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/tipc/link.c10
-rw-r--r--net/tipc/port.c12
-rw-r--r--net/tipc/port.h16
-rw-r--r--net/tipc/socket.c48
-rw-r--r--net/tipc/socket.h11
5 files changed, 43 insertions, 54 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 96a8072..a081e7d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -332,13 +332,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
{
struct tipc_port *p_ptr;
+ struct tipc_sock *tsk;
spin_lock_bh(&tipc_port_list_lock);
p_ptr = tipc_port_lock(origport);
if (p_ptr) {
if (!list_empty(&p_ptr->wait_list))
goto exit;
- p_ptr->congested = 1;
+ tsk = tipc_port_to_sock(p_ptr);
+ tsk->link_cong = 1;
p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
l_ptr->stats.link_congs++;
@@ -352,6 +354,7 @@ exit:
void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
{
struct tipc_port *p_ptr;
+ struct tipc_sock *tsk;
struct tipc_port *temp_p_ptr;
int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
@@ -367,10 +370,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
wait_list) {
if (win <= 0)
break;
+ tsk = tipc_port_to_sock(p_ptr);
list_del_init(&p_ptr->wait_list);
spin_lock_bh(p_ptr->lock);
- p_ptr->congested = 0;
- tipc_port_wakeup(p_ptr);
+ tsk->link_cong = 0;
+ tipc_sock_wakeup(tsk);
win -= p_ptr->waiting_pkts;
spin_unlock_bh(p_ptr->lock);
}
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 9f53d5a..0d09dcb 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -186,12 +186,6 @@ exit:
tipc_port_list_free(dp);
}
-
-void tipc_port_wakeup(struct tipc_port *port)
-{
- tipc_sock_wakeup(tipc_port_to_sock(port));
-}
-
/* tipc_port_init - intiate TIPC port and lock it
*
* Returns obtained reference if initialization is successful, zero otherwise
@@ -209,7 +203,6 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
}
p_ptr->max_pkt = MAX_PKT_DEFAULT;
- p_ptr->sent = 1;
p_ptr->ref = ref;
INIT_LIST_HEAD(&p_ptr->wait_list);
INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
@@ -459,10 +452,9 @@ void tipc_acknowledge(u32 ref, u32 ack)
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return;
- if (p_ptr->connected) {
- p_ptr->conn_unacked -= ack;
+ if (p_ptr->connected)
buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
- }
+
tipc_port_unlock(p_ptr);
if (!buf)
return;
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 3a4f808..0e47052 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -53,17 +53,13 @@
* @connected: non-zero if port is currently connected to a peer port
* @conn_type: TIPC type used when connection was established
* @conn_instance: TIPC instance used when connection was established
- * @conn_unacked: number of unacknowledged messages received from peer port
* @published: non-zero if port has one or more associated names
- * @congested: non-zero if cannot send because of link or port congestion
* @max_pkt: maximum packet size "hint" used when building messages sent by port
* @ref: unique reference to port in TIPC object registry
* @phdr: preformatted message header used when sending messages
* @port_list: adjacent ports in TIPC's global list of ports
* @wait_list: adjacent ports in list of ports waiting on link congestion
* @waiting_pkts:
- * @sent: # of non-empty messages sent by port
- * @acked: # of non-empty message acknowledgements from connected port's peer
* @publications: list of publications for port
* @pub_count: total # of publications port has made during its lifetime
* @probing_state:
@@ -76,17 +72,13 @@ struct tipc_port {
int connected;
u32 conn_type;
u32 conn_instance;
- u32 conn_unacked;
int published;
- u32 congested;
u32 max_pkt;
u32 ref;
struct tipc_msg phdr;
struct list_head port_list;
struct list_head wait_list;
u32 waiting_pkts;
- u32 sent;
- u32 acked;
struct list_head publications;
u32 pub_count;
u32 probing_state;
@@ -120,8 +112,6 @@ int tipc_port_disconnect(u32 portref);
int tipc_port_shutdown(u32 ref);
-void tipc_port_wakeup(struct tipc_port *port);
-
/*
* The following routines require that the port be locked on entry
*/
@@ -161,12 +151,6 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr)
spin_unlock_bh(p_ptr->lock);
}
-static inline int tipc_port_congested(struct tipc_port *p_ptr)
-{
- return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN);
-}
-
-
static inline u32 tipc_port_peernode(struct tipc_port *p_ptr)
{
return msg_destnode(&p_ptr->phdr);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 1762323..ede78b1 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -207,7 +207,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
- tsk->port.sent = 0;
+ tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0);
tipc_port_unlock(port);
@@ -513,12 +513,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
switch ((int)sock->state) {
case SS_UNCONNECTED:
- if (!tsk->port.congested)
+ if (!tsk->link_cong)
mask |= POLLOUT;
break;
case SS_READY:
case SS_CONNECTED:
- if (!tsk->port.congested)
+ if (!tsk->link_cong && !tipc_sk_conn_cong(tsk))
mask |= POLLOUT;
/* fall thru' */
case SS_CONNECTING:
@@ -546,7 +546,7 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
struct tipc_port *port = &tsk->port;
- int wakeable;
+ int conn_cong;
/* Ignore if connection cannot be validated: */
if (!port->connected || !tipc_port_peer_msg(port, msg))
@@ -555,13 +555,10 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf)
port->probing_state = TIPC_CONN_OK;
if (msg_type(msg) == CONN_ACK) {
- wakeable = tipc_port_congested(port) && port->congested;
- port->acked += msg_msgcnt(msg);
- if (!tipc_port_congested(port)) {
- port->congested = 0;
- if (wakeable)
- tipc_port_wakeup(port);
- }
+ conn_cong = tipc_sk_conn_cong(tsk);
+ tsk->sent_unacked -= msg_msgcnt(msg);
+ if (conn_cong)
+ tipc_sock_wakeup(tsk);
} else if (msg_type(msg) == CONN_PROBE) {
if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
return TIPC_OK;
@@ -626,7 +623,7 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
return sock_intr_errno(*timeo_p);
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
- done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
+ done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
@@ -800,7 +797,6 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
- struct tipc_port *port = &tsk->port;
DEFINE_WAIT(wait);
int done;
@@ -819,7 +815,9 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, timeo_p,
- (!port->congested || !port->connected));
+ (!tsk->link_cong &&
+ !tipc_sk_conn_cong(tsk)) ||
+ !tsk->port.connected);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
@@ -856,7 +854,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
if (unlikely(dest)) {
rc = tipc_sendmsg(iocb, sock, m, dsz);
if (dsz && (dsz == rc))
- tsk->port.sent = 1;
+ tsk->sent_unacked = 1;
return rc;
}
if (dsz > (uint)INT_MAX)
@@ -875,7 +873,6 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
dnode = tipc_port_peernode(port);
- port->congested = 1;
next:
mtu = port->max_pkt;
@@ -884,11 +881,10 @@ next:
if (unlikely(rc < 0))
goto exit;
do {
- port->congested = 1;
- if (likely(!tipc_port_congested(port))) {
+ if (likely(!tipc_sk_conn_cong(tsk))) {
rc = tipc_link_xmit2(buf, dnode, ref);
if (likely(!rc)) {
- port->sent++;
+ tsk->sent_unacked++;
sent += send;
if (sent == dsz)
break;
@@ -903,8 +899,6 @@ next:
}
rc = tipc_wait_for_sndpkt(sock, &timeo);
} while (!rc);
-
- port->congested = 0;
exit:
if (iocb)
release_sock(sk);
@@ -1169,8 +1163,10 @@ restart:
/* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) {
if ((sock->state != SS_READY) &&
- (++port->conn_unacked >= TIPC_CONNACK_INTV))
- tipc_acknowledge(port->ref, port->conn_unacked);
+ (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
+ tipc_acknowledge(port->ref, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+ }
advance_rx_queue(sk);
}
exit:
@@ -1278,8 +1274,10 @@ restart:
/* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) {
- if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV))
- tipc_acknowledge(port->ref, port->conn_unacked);
+ if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
+ tipc_acknowledge(port->ref, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+ }
advance_rx_queue(sk);
}
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 69fd06b..2cdede9 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -48,6 +48,9 @@
* @peer_name: the peer of the connection, if any
* @conn_timeout: the time we can wait for an unresponded setup request
* @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
+ * @link_cong: non-zero if owner must sleep because of link congestion
+ * @sent_unacked: # messages sent by socket, and not yet acked by peer
+ * @rcv_unacked: # messages read by user, but not yet acked back to peer
*/
struct tipc_sock {
@@ -55,6 +58,9 @@ struct tipc_sock {
struct tipc_port port;
unsigned int conn_timeout;
atomic_t dupl_rcvcnt;
+ int link_cong;
+ uint sent_unacked;
+ uint rcv_unacked;
};
static inline struct tipc_sock *tipc_sk(const struct sock *sk)
@@ -72,6 +78,11 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk)
tsk->sk.sk_write_space(&tsk->sk);
}
+static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
+{
+ return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
+}
+
int tipc_sk_rcv(struct sk_buff *buf);
#endif
OpenPOWER on IntegriCloud