tipc: let broadcast packet reception use new link receive function
[firefly-linux-kernel-4.4.55.git] / net / tipc / bcast.c
index 7fdf895e797385cbf8e96cd139042e4454500fd8..ea28c2919b384ce50c50b9d18b461e4c1a669b18 100644 (file)
@@ -112,11 +112,6 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
        return tipc_net(net)->bcbase;
 }
 
-static struct tipc_link *tipc_bc_sndlink(struct net *net)
-{
-       return tipc_net(net)->bcl;
-}
-
 /**
  * tipc_nmap_equal - test for equality of node maps
  */
@@ -169,31 +164,6 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
        bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
 }
 
-void tipc_bclink_add_node(struct net *net, u32 addr)
-{
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-       struct tipc_link *l = tipc_bc_sndlink(net);
-       tipc_bclink_lock(net);
-       tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
-       tipc_link_add_bc_peer(l);
-       tipc_bclink_unlock(net);
-}
-
-void tipc_bclink_remove_node(struct net *net, u32 addr)
-{
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-
-       tipc_bclink_lock(net);
-       tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
-       tn->bcl->ackers--;
-
-       /* Last node? => reset backlog queue */
-       if (!tn->bcbase->bcast_nodes.count)
-               tipc_link_purge_backlog(tn->bcbase->link);
-
-       tipc_bclink_unlock(net);
-}
-
 static void bclink_set_last_sent(struct net *net)
 {
        struct tipc_net *tn = net_generic(net, tipc_net_id);
@@ -501,12 +471,141 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
                __skb_queue_purge(&rcvq);
                return rc;
        }
+
        /* Broadcast to all nodes, inluding local node */
        tipc_bcbearer_xmit(net, &xmitq);
        tipc_sk_mcast_rcv(net, &rcvq, &inputq);
        __skb_queue_purge(list);
        return 0;
 }
+
+/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
+ *
+ * RCU is locked, no other locks set
+ */
+int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
+{
+       struct tipc_msg *hdr = buf_msg(skb);
+       struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+       struct sk_buff_head xmitq;
+       int rc;
+
+       __skb_queue_head_init(&xmitq);
+
+       if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) {
+               kfree_skb(skb);
+               return 0;
+       }
+
+       tipc_bcast_lock(net);
+       if (msg_user(hdr) == BCAST_PROTOCOL)
+               rc = tipc_link_bc_nack_rcv(l, skb, &xmitq);
+       else
+               rc = tipc_link_rcv(l, skb, NULL);
+       tipc_bcast_unlock(net);
+
+       if (!skb_queue_empty(&xmitq))
+               tipc_bcbearer_xmit(net, &xmitq);
+
+       /* Any socket wakeup messages ? */
+       if (!skb_queue_empty(inputq))
+               tipc_sk_rcv(net, inputq);
+
+       return rc;
+}
+
+/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge
+ *
+ * RCU is locked, no other locks set
+ */
+void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
+{
+       struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+       struct sk_buff_head xmitq;
+
+       __skb_queue_head_init(&xmitq);
+
+       tipc_bcast_lock(net);
+       tipc_link_bc_ack_rcv(l, acked, &xmitq);
+       tipc_bcast_unlock(net);
+
+       tipc_bcbearer_xmit(net, &xmitq);
+
+       /* Any socket wakeup messages ? */
+       if (!skb_queue_empty(inputq))
+               tipc_sk_rcv(net, inputq);
+}
+
+/* tipc_bcast_synch_rcv -  check and update rcv link with peer's send state
+ *
+ * RCU is locked, no other locks set
+ */
+void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
+                        struct tipc_msg *hdr)
+{
+       struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+       struct sk_buff_head xmitq;
+
+       __skb_queue_head_init(&xmitq);
+
+       tipc_bcast_lock(net);
+       if (msg_type(hdr) == STATE_MSG) {
+               tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
+               tipc_link_bc_sync_rcv(l, hdr, &xmitq);
+       } else {
+               tipc_link_bc_init_rcv(l, hdr);
+       }
+       tipc_bcast_unlock(net);
+
+       tipc_bcbearer_xmit(net, &xmitq);
+
+       /* Any socket wakeup messages ? */
+       if (!skb_queue_empty(inputq))
+               tipc_sk_rcv(net, inputq);
+}
+
+/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
+ *
+ * RCU is locked, node lock is set
+ */
+void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l,
+                        struct sk_buff_head *xmitq)
+{
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
+       struct tipc_link *snd_l = tipc_bc_sndlink(net);
+
+       tipc_bclink_lock(net);
+       tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
+       tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
+       tipc_bclink_unlock(net);
+}
+
+/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
+ *
+ * RCU is locked, node lock is set
+ */
+void tipc_bcast_remove_peer(struct net *net, u32 addr,
+                           struct tipc_link *rcv_l)
+{
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
+       struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+       struct tipc_link *snd_l = tipc_bc_sndlink(net);
+       struct sk_buff_head xmitq;
+
+       __skb_queue_head_init(&xmitq);
+
+       tipc_bclink_lock(net);
+       tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
+       tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
+       tipc_bclink_unlock(net);
+
+       tipc_bcbearer_xmit(net, &xmitq);
+
+       /* Any socket wakeup messages ? */
+       if (!skb_queue_empty(inputq))
+               tipc_sk_rcv(net, inputq);
+}
+
 /**
  * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
  *
@@ -728,6 +827,7 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
                        return 0;
                }
        }
+       msg_set_mc_netid(msg, tn->net_id);
 
        /* Send buffer over bearers until all targets reached */
        bcbearer->remains = bclink->bcast_nodes;
@@ -1042,12 +1142,13 @@ int tipc_bcast_init(struct net *net)
        spin_lock_init(&tipc_net(net)->bclock);
        bb->node.net = net;
 
-       if (!tipc_link_bc_create(&bb->node,
+       if (!tipc_link_bc_create(&bb->node, 0, 0,
                                 MAX_PKT_DEFAULT_MCAST,
                                 BCLINK_WIN_DEFAULT,
                                 0,
                                 &bb->inputq,
                                 &bb->namedq,
+                                NULL,
                                 &l))
                goto enomem;
        bb->link = l;