summaryrefslogtreecommitdiffstats
path: root/net/tipc/group.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/group.c')
-rw-r--r--net/tipc/group.c371
1 files changed, 197 insertions, 174 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 5f4ffae..122162a 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -49,8 +49,6 @@
#define ADV_ACTIVE (ADV_UNIT * 12)
enum mbr_state {
- MBR_QUARANTINED,
- MBR_DISCOVERED,
MBR_JOINING,
MBR_PUBLISHED,
MBR_JOINED,
@@ -64,8 +62,7 @@ enum mbr_state {
struct tipc_member {
struct rb_node tree_node;
struct list_head list;
- struct list_head congested;
- struct sk_buff *event_msg;
+ struct list_head small_win;
struct sk_buff_head deferredq;
struct tipc_group *group;
u32 node;
@@ -77,21 +74,18 @@ struct tipc_member {
u16 bc_rcv_nxt;
u16 bc_syncpt;
u16 bc_acked;
- bool usr_pending;
};
struct tipc_group {
struct rb_root members;
- struct list_head congested;
+ struct list_head small_win;
struct list_head pending;
struct list_head active;
- struct list_head reclaiming;
struct tipc_nlist dests;
struct net *net;
int subid;
u32 type;
u32 instance;
- u32 domain;
u32 scope;
u32 portid;
u16 member_cnt;
@@ -99,6 +93,7 @@ struct tipc_group {
u16 max_active;
u16 bc_snd_nxt;
u16 bc_ackers;
+ bool *open;
bool loopback;
bool events;
};
@@ -106,6 +101,16 @@ struct tipc_group {
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
int mtyp, struct sk_buff_head *xmitq);
+static void tipc_group_open(struct tipc_member *m, bool *wakeup)
+{
+ *wakeup = false;
+ if (list_empty(&m->small_win))
+ return;
+ list_del_init(&m->small_win);
+ *m->group->open = true;
+ *wakeup = true;
+}
+
static void tipc_group_decr_active(struct tipc_group *grp,
struct tipc_member *m)
{
@@ -137,14 +142,14 @@ u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
return grp->bc_snd_nxt;
}
-static bool tipc_group_is_enabled(struct tipc_member *m)
+static bool tipc_group_is_receiver(struct tipc_member *m)
{
- return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
+ return m && m->state != MBR_JOINING && m->state != MBR_LEAVING;
}
-static bool tipc_group_is_receiver(struct tipc_member *m)
+static bool tipc_group_is_sender(struct tipc_member *m)
{
- return m && m->state >= MBR_JOINED;
+ return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED;
}
u32 tipc_group_exclude(struct tipc_group *grp)
@@ -160,8 +165,11 @@ int tipc_group_size(struct tipc_group *grp)
}
struct tipc_group *tipc_group_create(struct net *net, u32 portid,
- struct tipc_group_req *mreq)
+ struct tipc_group_req *mreq,
+ bool *group_is_open)
{
+ u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS;
+ bool global = mreq->scope != TIPC_NODE_SCOPE;
struct tipc_group *grp;
u32 type = mreq->type;
@@ -169,25 +177,41 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
if (!grp)
return NULL;
tipc_nlist_init(&grp->dests, tipc_own_addr(net));
- INIT_LIST_HEAD(&grp->congested);
+ INIT_LIST_HEAD(&grp->small_win);
INIT_LIST_HEAD(&grp->active);
INIT_LIST_HEAD(&grp->pending);
- INIT_LIST_HEAD(&grp->reclaiming);
grp->members = RB_ROOT;
grp->net = net;
grp->portid = portid;
- grp->domain = addr_domain(net, mreq->scope);
grp->type = type;
grp->instance = mreq->instance;
grp->scope = mreq->scope;
grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
- if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
+ grp->open = group_is_open;
+ filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE;
+ if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0,
+ filter, &grp->subid))
return grp;
kfree(grp);
return NULL;
}
+void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
+{
+ struct rb_root *tree = &grp->members;
+ struct tipc_member *m, *tmp;
+ struct sk_buff_head xmitq;
+
+ skb_queue_head_init(&xmitq);
+ rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
+ tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
+ tipc_group_update_member(m, 0);
+ }
+ tipc_node_distr_xmit(net, &xmitq);
+ *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
+}
+
void tipc_group_delete(struct net *net, struct tipc_group *grp)
{
struct rb_root *tree = &grp->members;
@@ -233,7 +257,7 @@ static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
struct tipc_member *m;
m = tipc_group_find_member(grp, node, port);
- if (m && tipc_group_is_enabled(m))
+ if (m && tipc_group_is_receiver(m))
return m;
return NULL;
}
@@ -278,7 +302,7 @@ static void tipc_group_add_to_tree(struct tipc_group *grp,
static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
u32 node, u32 port,
- int state)
+ u32 instance, int state)
{
struct tipc_member *m;
@@ -286,11 +310,12 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
if (!m)
return NULL;
INIT_LIST_HEAD(&m->list);
- INIT_LIST_HEAD(&m->congested);
+ INIT_LIST_HEAD(&m->small_win);
__skb_queue_head_init(&m->deferredq);
m->group = grp;
m->node = node;
m->port = port;
+ m->instance = instance;
m->bc_acked = grp->bc_snd_nxt - 1;
grp->member_cnt++;
tipc_group_add_to_tree(grp, m);
@@ -299,9 +324,10 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
return m;
}
-void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
+void tipc_group_add_member(struct tipc_group *grp, u32 node,
+ u32 port, u32 instance)
{
- tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
+ tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED);
}
static void tipc_group_delete_member(struct tipc_group *grp,
@@ -315,7 +341,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,
grp->bc_ackers--;
list_del_init(&m->list);
- list_del_init(&m->congested);
+ list_del_init(&m->small_win);
tipc_group_decr_active(grp, m);
/* If last member on a node, remove node from dest list */
@@ -344,7 +370,7 @@ void tipc_group_update_member(struct tipc_member *m, int len)
struct tipc_group *grp = m->group;
struct tipc_member *_m, *tmp;
- if (!tipc_group_is_enabled(m))
+ if (!tipc_group_is_receiver(m))
return;
m->window -= len;
@@ -352,16 +378,14 @@ void tipc_group_update_member(struct tipc_member *m, int len)
if (m->window >= ADV_IDLE)
return;
- list_del_init(&m->congested);
+ list_del_init(&m->small_win);
- /* Sort member into congested members' list */
- list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
- if (m->window > _m->window)
- continue;
- list_add_tail(&m->congested, &_m->congested);
- return;
+ /* Sort member into small_window members' list */
+ list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) {
+ if (_m->window > m->window)
+ break;
}
- list_add_tail(&m->congested, &grp->congested);
+ list_add_tail(&m->small_win, &_m->small_win);
}
void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
@@ -373,7 +397,7 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
for (n = rb_first(&grp->members); n; n = rb_next(n)) {
m = container_of(n, struct tipc_member, tree_node);
- if (tipc_group_is_enabled(m)) {
+ if (tipc_group_is_receiver(m)) {
tipc_group_update_member(m, len);
m->bc_acked = prev;
ackers++;
@@ -394,20 +418,20 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
int adv, state;
m = tipc_group_find_dest(grp, dnode, dport);
- *mbr = m;
- if (!m)
+ if (!tipc_group_is_receiver(m)) {
+ *mbr = NULL;
return false;
- if (m->usr_pending)
- return true;
+ }
+ *mbr = m;
+
if (m->window >= len)
return false;
- m->usr_pending = true;
+
+ *grp->open = false;
/* If not fully advertised, do it now to prevent mutual blocking */
adv = m->advertised;
state = m->state;
- if (state < MBR_JOINED)
- return true;
if (state == MBR_JOINED && adv == ADV_IDLE)
return true;
if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
@@ -425,13 +449,14 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
struct tipc_member *m = NULL;
/* If prev bcast was replicast, reject until all receivers have acked */
- if (grp->bc_ackers)
+ if (grp->bc_ackers) {
+ *grp->open = false;
return true;
-
- if (list_empty(&grp->congested))
+ }
+ if (list_empty(&grp->small_win))
return false;
- m = list_first_entry(&grp->congested, struct tipc_member, congested);
+ m = list_first_entry(&grp->small_win, struct tipc_member, small_win);
if (m->window >= len)
return false;
@@ -486,7 +511,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
goto drop;
m = tipc_group_find_member(grp, node, port);
- if (!tipc_group_is_receiver(m))
+ if (!tipc_group_is_sender(m))
goto drop;
if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
@@ -573,24 +598,34 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
switch (m->state) {
case MBR_JOINED:
- /* Reclaim advertised space from least active member */
- if (!list_empty(active) && active_cnt >= reclaim_limit) {
+ /* First, decide if member can go active */
+ if (active_cnt <= max_active) {
+ m->state = MBR_ACTIVE;
+ list_add_tail(&m->list, active);
+ grp->active_cnt++;
+ tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+ } else {
+ m->state = MBR_PENDING;
+ list_add_tail(&m->list, &grp->pending);
+ }
+
+ if (active_cnt < reclaim_limit)
+ break;
+
+ /* Reclaim from oldest active member, if possible */
+ if (!list_empty(active)) {
rm = list_first_entry(active, struct tipc_member, list);
rm->state = MBR_RECLAIMING;
- list_move_tail(&rm->list, &grp->reclaiming);
+ list_del_init(&rm->list);
tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
- }
- /* If max active, become pending and wait for reclaimed space */
- if (active_cnt >= max_active) {
- m->state = MBR_PENDING;
- list_add_tail(&m->list, &grp->pending);
break;
}
- /* Otherwise become active */
- m->state = MBR_ACTIVE;
- list_add_tail(&m->list, &grp->active);
- grp->active_cnt++;
- /* Fall through */
+ /* Nobody to reclaim from; - revert oldest pending to JOINED */
+ pm = list_first_entry(&grp->pending, struct tipc_member, list);
+ list_del_init(&pm->list);
+ pm->state = MBR_JOINED;
+ tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
+ break;
case MBR_ACTIVE:
if (!list_is_last(&m->list, &grp->active))
list_move_tail(&m->list, &grp->active);
@@ -602,12 +637,12 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
if (m->advertised > ADV_IDLE)
break;
m->state = MBR_JOINED;
+ grp->active_cnt--;
if (m->advertised < ADV_IDLE) {
pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
}
- grp->active_cnt--;
- list_del_init(&m->list);
+
if (list_empty(&grp->pending))
return;
@@ -619,7 +654,6 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
break;
case MBR_RECLAIMING:
- case MBR_DISCOVERED:
case MBR_JOINING:
case MBR_LEAVING:
default:
@@ -627,6 +661,40 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
}
}
+static void tipc_group_create_event(struct tipc_group *grp,
+ struct tipc_member *m,
+ u32 event, u16 seqno,
+ struct sk_buff_head *inputq)
+{ u32 dnode = tipc_own_addr(grp->net);
+ struct tipc_event evt;
+ struct sk_buff *skb;
+ struct tipc_msg *hdr;
+
+ evt.event = event;
+ evt.found_lower = m->instance;
+ evt.found_upper = m->instance;
+ evt.port.ref = m->port;
+ evt.port.node = m->node;
+ evt.s.seq.type = grp->type;
+ evt.s.seq.lower = m->instance;
+ evt.s.seq.upper = m->instance;
+
+ skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT,
+ GROUP_H_SIZE, sizeof(evt), dnode, m->node,
+ grp->portid, m->port, 0);
+ if (!skb)
+ return;
+
+ hdr = buf_msg(skb);
+ msg_set_nametype(hdr, grp->type);
+ msg_set_grp_evt(hdr, event);
+ msg_set_dest_droppable(hdr, true);
+ msg_set_grp_bc_seqno(hdr, seqno);
+ memcpy(msg_data(hdr), &evt, sizeof(evt));
+ TIPC_SKB_CB(skb)->orig_member = m->instance;
+ __skb_queue_tail(inputq, skb);
+}
+
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
int mtyp, struct sk_buff_head *xmitq)
{
@@ -672,83 +740,73 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
u32 node = msg_orignode(hdr);
u32 port = msg_origport(hdr);
struct tipc_member *m, *pm;
- struct tipc_msg *ehdr;
u16 remitted, in_flight;
if (!grp)
return;
+ if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net))
+ return;
+
m = tipc_group_find_member(grp, node, port);
switch (msg_type(hdr)) {
case GRP_JOIN_MSG:
if (!m)
m = tipc_group_create_member(grp, node, port,
- MBR_QUARANTINED);
+ 0, MBR_JOINING);
if (!m)
return;
m->bc_syncpt = msg_grp_bc_syncpt(hdr);
m->bc_rcv_nxt = m->bc_syncpt;
m->window += msg_adv_win(hdr);
- /* Wait until PUBLISH event is received */
- if (m->state == MBR_DISCOVERED) {
- m->state = MBR_JOINING;
- } else if (m->state == MBR_PUBLISHED) {
- m->state = MBR_JOINED;
- *usr_wakeup = true;
- m->usr_pending = false;
- tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
- ehdr = buf_msg(m->event_msg);
- msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
- __skb_queue_tail(inputq, m->event_msg);
- }
- list_del_init(&m->congested);
+ /* Wait until PUBLISH event is received if necessary */
+ if (m->state != MBR_PUBLISHED)
+ return;
+
+ /* Member can be taken into service */
+ m->state = MBR_JOINED;
+ tipc_group_open(m, usr_wakeup);
tipc_group_update_member(m, 0);
+ tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+ tipc_group_create_event(grp, m, TIPC_PUBLISHED,
+ m->bc_syncpt, inputq);
return;
case GRP_LEAVE_MSG:
if (!m)
return;
m->bc_syncpt = msg_grp_bc_syncpt(hdr);
list_del_init(&m->list);
- list_del_init(&m->congested);
- *usr_wakeup = true;
-
- /* Wait until WITHDRAW event is received */
- if (m->state != MBR_LEAVING) {
- tipc_group_decr_active(grp, m);
- m->state = MBR_LEAVING;
- return;
- }
- /* Otherwise deliver already received WITHDRAW event */
- ehdr = buf_msg(m->event_msg);
- msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
- __skb_queue_tail(inputq, m->event_msg);
+ tipc_group_open(m, usr_wakeup);
+ tipc_group_decr_active(grp, m);
+ m->state = MBR_LEAVING;
+ tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
+ m->bc_syncpt, inputq);
return;
case GRP_ADV_MSG:
if (!m)
return;
m->window += msg_adv_win(hdr);
- *usr_wakeup = m->usr_pending;
- m->usr_pending = false;
- list_del_init(&m->congested);
+ tipc_group_open(m, usr_wakeup);
return;
case GRP_ACK_MSG:
if (!m)
return;
m->bc_acked = msg_grp_bc_acked(hdr);
if (--grp->bc_ackers)
- break;
+ return;
+ list_del_init(&m->small_win);
+ *m->group->open = true;
*usr_wakeup = true;
- m->usr_pending = false;
+ tipc_group_update_member(m, 0);
return;
case GRP_RECLAIM_MSG:
if (!m)
return;
- *usr_wakeup = m->usr_pending;
- m->usr_pending = false;
tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
m->window = ADV_IDLE;
+ tipc_group_open(m, usr_wakeup);
return;
case GRP_REMIT_MSG:
if (!m || m->state != MBR_RECLAIMING)
@@ -763,18 +821,14 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
m->advertised = ADV_IDLE + in_flight;
return;
}
- /* All messages preceding the REMIT have been read */
- if (m->advertised <= remitted) {
- m->state = MBR_JOINED;
- in_flight = 0;
- }
- /* ..and the REMIT overtaken by more messages => re-advertise */
+ /* This should never happen */
if (m->advertised < remitted)
- tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+ pr_warn_ratelimited("Unexpected REMIT msg\n");
- m->advertised = ADV_IDLE + in_flight;
+ /* All messages preceding the REMIT have been read */
+ m->state = MBR_JOINED;
grp->active_cnt--;
- list_del_init(&m->list);
+ m->advertised = ADV_IDLE;
/* Set oldest pending member to active and advertise */
if (list_empty(&grp->pending))
@@ -796,11 +850,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
void tipc_group_member_evt(struct tipc_group *grp,
bool *usr_wakeup,
int *sk_rcvbuf,
- struct sk_buff *skb,
+ struct tipc_msg *hdr,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
- struct tipc_msg *hdr = buf_msg(skb);
struct tipc_event *evt = (void *)msg_data(hdr);
u32 instance = evt->found_lower;
u32 node = evt->port.node;
@@ -808,89 +861,59 @@ void tipc_group_member_evt(struct tipc_group *grp,
int event = evt->event;
struct tipc_member *m;
struct net *net;
- bool node_up;
u32 self;
if (!grp)
- goto drop;
+ return;
net = grp->net;
self = tipc_own_addr(net);
if (!grp->loopback && node == self && port == grp->portid)
- goto drop;
-
- /* Convert message before delivery to user */
- msg_set_hdr_sz(hdr, GROUP_H_SIZE);
- msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
- msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
- msg_set_origport(hdr, port);
- msg_set_orignode(hdr, node);
- msg_set_nametype(hdr, grp->type);
- msg_set_grp_evt(hdr, event);
+ return;
m = tipc_group_find_member(grp, node, port);
- if (event == TIPC_PUBLISHED) {
- if (!m)
- m = tipc_group_create_member(grp, node, port,
- MBR_DISCOVERED);
- if (!m)
- goto drop;
-
- /* Hold back event if JOIN message not yet received */
- if (m->state == MBR_DISCOVERED) {
- m->event_msg = skb;
- m->state = MBR_PUBLISHED;
- } else {
- msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
- __skb_queue_tail(inputq, skb);
- m->state = MBR_JOINED;
- *usr_wakeup = true;
- m->usr_pending = false;
+ switch (event) {
+ case TIPC_PUBLISHED:
+ /* Send and wait for arrival of JOIN message if necessary */
+ if (!m) {
+ m = tipc_group_create_member(grp, node, port, instance,
+ MBR_PUBLISHED);
+ if (!m)
+ break;
+ tipc_group_update_member(m, 0);
+ tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
+ break;
}
+
+ if (m->state != MBR_JOINING)
+ break;
+
+ /* Member can be taken into service */
m->instance = instance;
- TIPC_SKB_CB(skb)->orig_member = m->instance;
+ m->state = MBR_JOINED;
+ tipc_group_open(m, usr_wakeup);
+ tipc_group_update_member(m, 0);
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
- if (m->window < ADV_IDLE)
- tipc_group_update_member(m, 0);
- else
- list_del_init(&m->congested);
- } else if (event == TIPC_WITHDRAWN) {
+ tipc_group_create_event(grp, m, TIPC_PUBLISHED,
+ m->bc_syncpt, inputq);
+ break;
+ case TIPC_WITHDRAWN:
if (!m)
- goto drop;
-
- TIPC_SKB_CB(skb)->orig_member = m->instance;
+ break;
- *usr_wakeup = true;
- m->usr_pending = false;
- node_up = tipc_node_is_up(net, node);
- m->event_msg = NULL;
-
- if (node_up) {
- /* Hold back event if a LEAVE msg should be expected */
- if (m->state != MBR_LEAVING) {
- m->event_msg = skb;
- tipc_group_decr_active(grp, m);
- m->state = MBR_LEAVING;
- } else {
- msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
- __skb_queue_tail(inputq, skb);
- }
- } else {
- if (m->state != MBR_LEAVING) {
- tipc_group_decr_active(grp, m);
- m->state = MBR_LEAVING;
- msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
- } else {
- msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
- }
- __skb_queue_tail(inputq, skb);
- }
+ tipc_group_decr_active(grp, m);
+ m->state = MBR_LEAVING;
list_del_init(&m->list);
- list_del_init(&m->congested);
+ tipc_group_open(m, usr_wakeup);
+
+ /* Only send event if no LEAVE message can be expected */
+ if (!tipc_node_is_up(net, node))
+ tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
+ m->bc_rcv_nxt, inputq);
+ break;
+ default:
+ break;
}
*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
- return;
-drop:
- kfree_skb(skb);
}
OpenPOWER on IntegriCloud