diff options
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r-- | net/tipc/socket.c | 411 |
1 files changed, 246 insertions, 165 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c index fd5f042..9b4e483 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,8 +1,8 @@ /* * net/tipc/socket.c: TIPC socket API * - * Copyright (c) 2001-2007, Ericsson AB - * Copyright (c) 2004-2008, 2010-2011, Wind River Systems + * Copyright (c) 2001-2007, 2012 Ericsson AB + * Copyright (c) 2004-2008, 2010-2012, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -43,7 +43,7 @@ #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ -#define OVERLOAD_LIMIT_BASE 5000 +#define OVERLOAD_LIMIT_BASE 10000 #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ struct tipc_sock { @@ -62,6 +62,8 @@ struct tipc_sock { static int backlog_rcv(struct sock *sk, struct sk_buff *skb); static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); static void wakeupdispatch(struct tipc_port *tport); +static void tipc_data_ready(struct sock *sk, int len); +static void tipc_write_space(struct sock *sk); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -71,8 +73,6 @@ static struct proto tipc_proto; static int sockets_enabled; -static atomic_t tipc_queue_size = ATOMIC_INIT(0); - /* * Revised TIPC socket locking policy: * @@ -126,7 +126,6 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0); static void advance_rx_queue(struct sock *sk) { kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); - atomic_dec(&tipc_queue_size); } /** @@ -138,10 +137,8 @@ static void discard_rx_queue(struct sock *sk) { struct sk_buff *buf; - while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { - atomic_dec(&tipc_queue_size); + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) kfree_skb(buf); - } } /** @@ -153,10 +150,8 @@ static void reject_rx_queue(struct sock *sk) { struct sk_buff *buf; - while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) tipc_reject_msg(buf, TIPC_ERR_NO_PORT); - atomic_dec(&tipc_queue_size); - } } /** @@ -221,6 +216,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, sock_init_data(sock, sk); sk->sk_backlog_rcv = backlog_rcv; sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2; + sk->sk_data_ready = tipc_data_ready; + sk->sk_write_space = tipc_write_space; tipc_sk(sk)->p = tp_ptr; tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; @@ -276,7 +273,6 @@ static int release(struct socket *sock) buf = __skb_dequeue(&sk->sk_receive_queue); if (buf == NULL) break; - atomic_dec(&tipc_queue_size); if (TIPC_SKB_CB(buf)->handle != 0) kfree_skb(buf); else { @@ -408,7 +404,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr, * socket state flags set * ------------ --------- * unconnected no read flags - * no write flags + * POLLOUT if port is not congested * * connecting POLLIN/POLLRDNORM if ACK/NACK in rx queue * no write flags @@ -435,9 +431,13 @@ static unsigned int poll(struct file *file, struct socket *sock, struct sock *sk = sock->sk; u32 mask = 0; - poll_wait(file, sk_sleep(sk), wait); + sock_poll_wait(file, sk_sleep(sk), wait); switch ((int)sock->state) { + case SS_UNCONNECTED: + if (!tipc_sk_port(sk)->congested) + mask |= POLLOUT; + break; case SS_READY: case SS_CONNECTED: if (!tipc_sk_port(sk)->congested) @@ -775,16 +775,19 @@ exit: static int auto_connect(struct socket *sock, struct tipc_msg *msg) { struct tipc_sock *tsock = tipc_sk(sock->sk); - - if (msg_errcode(msg)) { - sock->state = SS_DISCONNECTING; - return -ECONNREFUSED; - } + struct tipc_port *p_ptr; tsock->peer_name.ref = msg_origport(msg); tsock->peer_name.node = msg_orignode(msg); - tipc_connect2port(tsock->p->ref, &tsock->peer_name); - tipc_set_portimportance(tsock->p->ref, msg_importance(msg)); + p_ptr = tipc_port_deref(tsock->p->ref); + if (!p_ptr) + return -EINVAL; + + __tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name); + + if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE) + return -EINVAL; + msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg)); sock->state = SS_CONNECTED; return 0; } @@ -943,13 +946,6 @@ restart: sz = msg_data_sz(msg); err = msg_errcode(msg); - /* Complete connection setup for an implied connect */ - if (unlikely(sock->state == SS_CONNECTING)) { - res = auto_connect(sock, msg); - if (res) - goto exit; - } - /* Discard an empty non-errored message & try again */ if ((!sz) && (!err)) { advance_rx_queue(sk); @@ -1126,6 +1122,39 @@ exit: } /** + * tipc_write_space - wake up thread if port congestion is released + * @sk: socket + */ +static void tipc_write_space(struct sock *sk) +{ + struct socket_wq *wq; + + rcu_read_lock(); + wq = rcu_dereference(sk->sk_wq); + if (wq_has_sleeper(wq)) + wake_up_interruptible_sync_poll(&wq->wait, POLLOUT | + POLLWRNORM | POLLWRBAND); + rcu_read_unlock(); +} + +/** + * tipc_data_ready - wake up threads to indicate messages have been received + * @sk: socket + * @len: the length of messages + */ +static void tipc_data_ready(struct sock *sk, int len) +{ + struct socket_wq *wq; + + rcu_read_lock(); + wq = rcu_dereference(sk->sk_wq); + if (wq_has_sleeper(wq)) + wake_up_interruptible_sync_poll(&wq->wait, POLLIN | + POLLRDNORM | POLLRDBAND); + rcu_read_unlock(); +} + +/** * rx_queue_full - determine if receive queue can accept another message * @msg: message to be added to queue * @queue_size: current size of queue @@ -1154,6 +1183,83 @@ static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base) } /** + * filter_connect - Handle all incoming messages for a connection-based socket + * @tsock: TIPC socket + * @msg: message + * + * Returns TIPC error status code and socket error status code + * once it encounters some errors + */ +static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) +{ + struct socket *sock = tsock->sk.sk_socket; + struct tipc_msg *msg = buf_msg(*buf); + struct sock *sk = &tsock->sk; + u32 retval = TIPC_ERR_NO_PORT; + int res; + + if (msg_mcast(msg)) + return retval; + + switch ((int)sock->state) { + case SS_CONNECTED: + /* Accept only connection-based messages sent by peer */ + if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) { + if (unlikely(msg_errcode(msg))) { + sock->state = SS_DISCONNECTING; + __tipc_disconnect(tsock->p); + } + retval = TIPC_OK; + } + break; + case SS_CONNECTING: + /* Accept only ACK or NACK message */ + if (unlikely(msg_errcode(msg))) { + sock->state = SS_DISCONNECTING; + sk->sk_err = -ECONNREFUSED; + retval = TIPC_OK; + break; + } + + if (unlikely(!msg_connected(msg))) + break; + + res = auto_connect(sock, msg); + if (res) { + sock->state = SS_DISCONNECTING; + sk->sk_err = res; + retval = TIPC_OK; + break; + } + + /* If an incoming message is an 'ACK-', it should be + * discarded here because it doesn't contain useful + * data. In addition, we should try to wake up + * connect() routine if sleeping. + */ + if (msg_data_sz(msg) == 0) { + kfree_skb(*buf); + *buf = NULL; + if (waitqueue_active(sk_sleep(sk))) + wake_up_interruptible(sk_sleep(sk)); + } + retval = TIPC_OK; + break; + case SS_LISTENING: + case SS_UNCONNECTED: + /* Accept only SYN message */ + if (!msg_connected(msg) && !(msg_errcode(msg))) + retval = TIPC_OK; + break; + case SS_DISCONNECTING: + break; + default: + pr_err("Unknown socket state %u\n", sock->state); + } + return retval; +} + +/** * filter_rcv - validate incoming message * @sk: socket * @buf: message @@ -1170,6 +1276,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) struct socket *sock = sk->sk_socket; struct tipc_msg *msg = buf_msg(buf); u32 recv_q_len; + u32 res = TIPC_OK; /* Reject message if it is wrong sort of message for socket */ if (msg_type(msg) > TIPC_DIRECT_MSG) @@ -1179,32 +1286,12 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) if (msg_connected(msg)) return TIPC_ERR_NO_PORT; } else { - if (msg_mcast(msg)) - return TIPC_ERR_NO_PORT; - if (sock->state == SS_CONNECTED) { - if (!msg_connected(msg) || - !tipc_port_peer_msg(tipc_sk_port(sk), msg)) - return TIPC_ERR_NO_PORT; - } else if (sock->state == SS_CONNECTING) { - if (!msg_connected(msg) && (msg_errcode(msg) == 0)) - return TIPC_ERR_NO_PORT; - } else if (sock->state == SS_LISTENING) { - if (msg_connected(msg) || msg_errcode(msg)) - return TIPC_ERR_NO_PORT; - } else if (sock->state == SS_DISCONNECTING) { - return TIPC_ERR_NO_PORT; - } else /* (sock->state == SS_UNCONNECTED) */ { - if (msg_connected(msg) || msg_errcode(msg)) - return TIPC_ERR_NO_PORT; - } + res = filter_connect(tipc_sk(sk), &buf); + if (res != TIPC_OK || buf == NULL) + return res; } /* Reject message if there isn't room to queue it */ - recv_q_len = (u32)atomic_read(&tipc_queue_size); - if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) { - if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE)) - return TIPC_ERR_OVERLOAD; - } recv_q_len = skb_queue_len(&sk->sk_receive_queue); if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) { if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2)) @@ -1213,17 +1300,9 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) /* Enqueue message (finally!) */ TIPC_SKB_CB(buf)->handle = 0; - atomic_inc(&tipc_queue_size); __skb_queue_tail(&sk->sk_receive_queue, buf); - /* Initiate connection termination for an incoming 'FIN' */ - if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) { - sock->state = SS_DISCONNECTING; - tipc_disconnect_port(tipc_sk_port(sk)); - } - - if (waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + sk->sk_data_ready(sk, 0); return TIPC_OK; } @@ -1290,8 +1369,7 @@ static void wakeupdispatch(struct tipc_port *tport) { struct sock *sk = (struct sock *)tport->usr_handle; - if (waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + sk->sk_write_space(sk); } /** @@ -1309,8 +1387,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, struct sock *sk = sock->sk; struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest; struct msghdr m = {NULL,}; - struct sk_buff *buf; - struct tipc_msg *msg; unsigned int timeout; int res; @@ -1322,26 +1398,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, goto exit; } - /* For now, TIPC does not support the non-blocking form of connect() */ - if (flags & O_NONBLOCK) { - res = -EOPNOTSUPP; - goto exit; - } - - /* Issue Posix-compliant error code if socket is in the wrong state */ - if (sock->state == SS_LISTENING) { - res = -EOPNOTSUPP; - goto exit; - } - if (sock->state == SS_CONNECTING) { - res = -EALREADY; - goto exit; - } - if (sock->state != SS_UNCONNECTED) { - res = -EISCONN; - goto exit; - } - /* * Reject connection attempt using multicast address * @@ -1353,49 +1409,66 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, goto exit; } - /* Reject any messages already in receive queue (very unlikely) */ - reject_rx_queue(sk); + timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout; - /* Send a 'SYN-' to destination */ - m.msg_name = dest; - m.msg_namelen = destlen; - res = send_msg(NULL, sock, &m, 0); - if (res < 0) + switch (sock->state) { + case SS_UNCONNECTED: + /* Send a 'SYN-' to destination */ + m.msg_name = dest; + m.msg_namelen = destlen; + + /* If connect is in non-blocking case, set MSG_DONTWAIT to + * indicate send_msg() is never blocked. + */ + if (!timeout) + m.msg_flags = MSG_DONTWAIT; + + res = send_msg(NULL, sock, &m, 0); + if ((res < 0) && (res != -EWOULDBLOCK)) + goto exit; + + /* Just entered SS_CONNECTING state; the only + * difference is that return value in non-blocking + * case is EINPROGRESS, rather than EALREADY. + */ + res = -EINPROGRESS; + break; + case SS_CONNECTING: + res = -EALREADY; + break; + case SS_CONNECTED: + res = -EISCONN; + break; + default: + res = -EINVAL; goto exit; + } - /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ - timeout = tipc_sk(sk)->conn_timeout; - release_sock(sk); - res = wait_event_interruptible_timeout(*sk_sleep(sk), - (!skb_queue_empty(&sk->sk_receive_queue) || - (sock->state != SS_CONNECTING)), - timeout ? (long)msecs_to_jiffies(timeout) - : MAX_SCHEDULE_TIMEOUT); - lock_sock(sk); + if (sock->state == SS_CONNECTING) { + if (!timeout) + goto exit; - if (res > 0) { - buf = skb_peek(&sk->sk_receive_queue); - if (buf != NULL) { - msg = buf_msg(buf); - res = auto_connect(sock, msg); - if (!res) { - if (!msg_data_sz(msg)) - advance_rx_queue(sk); - } - } else { - if (sock->state == SS_CONNECTED) - res = -EISCONN; + /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ + release_sock(sk); + res = wait_event_interruptible_timeout(*sk_sleep(sk), + sock->state != SS_CONNECTING, + timeout ? (long)msecs_to_jiffies(timeout) + : MAX_SCHEDULE_TIMEOUT); + lock_sock(sk); + if (res <= 0) { + if (res == 0) + res = -ETIMEDOUT; else - res = -ECONNREFUSED; + ; /* leave "res" unchanged */ + goto exit; } - } else { - if (res == 0) - res = -ETIMEDOUT; - else - ; /* leave "res" unchanged */ - sock->state = SS_DISCONNECTING; } + if (unlikely(sock->state == SS_DISCONNECTING)) + res = sock_error(sk); + else + res = 0; + exit: release_sock(sk); return res; @@ -1436,8 +1509,13 @@ static int listen(struct socket *sock, int len) */ static int accept(struct socket *sock, struct socket *new_sock, int flags) { - struct sock *sk = sock->sk; + struct sock *new_sk, *sk = sock->sk; struct sk_buff *buf; + struct tipc_sock *new_tsock; + struct tipc_port *new_tport; + struct tipc_msg *msg; + u32 new_ref; + int res; lock_sock(sk); @@ -1463,48 +1541,51 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) buf = skb_peek(&sk->sk_receive_queue); res = tipc_create(sock_net(sock->sk), new_sock, 0, 0); - if (!res) { - struct sock *new_sk = new_sock->sk; - struct tipc_sock *new_tsock = tipc_sk(new_sk); - struct tipc_port *new_tport = new_tsock->p; - u32 new_ref = new_tport->ref; - struct tipc_msg *msg = buf_msg(buf); - - lock_sock(new_sk); - - /* - * Reject any stray messages received by new socket - * before the socket lock was taken (very, very unlikely) - */ - reject_rx_queue(new_sk); - - /* Connect new socket to it's peer */ - new_tsock->peer_name.ref = msg_origport(msg); - new_tsock->peer_name.node = msg_orignode(msg); - tipc_connect2port(new_ref, &new_tsock->peer_name); - new_sock->state = SS_CONNECTED; - - tipc_set_portimportance(new_ref, msg_importance(msg)); - if (msg_named(msg)) { - new_tport->conn_type = msg_nametype(msg); - new_tport->conn_instance = msg_nameinst(msg); - } + if (res) + goto exit; - /* - * Respond to 'SYN-' by discarding it & returning 'ACK'-. - * Respond to 'SYN+' by queuing it on new socket. - */ - if (!msg_data_sz(msg)) { - struct msghdr m = {NULL,}; + new_sk = new_sock->sk; + new_tsock = tipc_sk(new_sk); + new_tport = new_tsock->p; + new_ref = new_tport->ref; + msg = buf_msg(buf); - advance_rx_queue(sk); - send_packet(NULL, new_sock, &m, 0); - } else { - __skb_dequeue(&sk->sk_receive_queue); - __skb_queue_head(&new_sk->sk_receive_queue, buf); - } - release_sock(new_sk); + /* we lock on new_sk; but lockdep sees the lock on sk */ + lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING); + + /* + * Reject any stray messages received by new socket + * before the socket lock was taken (very, very unlikely) + */ + reject_rx_queue(new_sk); + + /* Connect new socket to it's peer */ + new_tsock->peer_name.ref = msg_origport(msg); + new_tsock->peer_name.node = msg_orignode(msg); + tipc_connect(new_ref, &new_tsock->peer_name); + new_sock->state = SS_CONNECTED; + + tipc_set_portimportance(new_ref, msg_importance(msg)); + if (msg_named(msg)) { + new_tport->conn_type = msg_nametype(msg); + new_tport->conn_instance = msg_nameinst(msg); } + + /* + * Respond to 'SYN-' by discarding it & returning 'ACK'-. + * Respond to 'SYN+' by queuing it on new socket. + */ + if (!msg_data_sz(msg)) { + struct msghdr m = {NULL,}; + + advance_rx_queue(sk); + send_packet(NULL, new_sock, &m, 0); + } else { + __skb_dequeue(&sk->sk_receive_queue); + __skb_queue_head(&new_sk->sk_receive_queue, buf); + } + release_sock(new_sk); + exit: release_sock(sk); return res; @@ -1539,7 +1620,6 @@ restart: /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ buf = __skb_dequeue(&sk->sk_receive_queue); if (buf) { - atomic_dec(&tipc_queue_size); if (TIPC_SKB_CB(buf)->handle != 0) { kfree_skb(buf); goto restart; @@ -1556,10 +1636,11 @@ restart: case SS_DISCONNECTING: - /* Discard any unreceived messages; wake up sleeping tasks */ + /* Discard any unreceived messages */ discard_rx_queue(sk); - if (waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + + /* Wake up anyone sleeping in poll */ + sk->sk_state_change(sk); res = 0; break; @@ -1677,7 +1758,7 @@ static int getsockopt(struct socket *sock, /* no need to set "res", since already 0 at this point */ break; case TIPC_NODE_RECVQ_DEPTH: - value = (u32)atomic_read(&tipc_queue_size); + value = 0; /* was tipc_queue_size, now obsolete */ break; case TIPC_SOCK_RECVQ_DEPTH: value = skb_queue_len(&sk->sk_receive_queue); |