tipc: make media xmit call outside node spinlock context
[firefly-linux-kernel-4.4.55.git] / net / tipc / link.c
index ea32679b673797f6fa8ee61521445272b861187b..c052437a7cfad4b2592d696d6433c56c5e53d89a 100644 (file)
@@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
        /* This really cannot happen...  */
        if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
                pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
-               tipc_link_reset(link);
                return -ENOBUFS;
        }
        /* Non-blocking sender: */
@@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
        return 0;
 }
 
+/**
+ * tipc_link_xmit(): enqueue buffer list according to queue situation
+ * @link: link to use
+ * @list: chain of buffers containing message
+ * @xmitq: returned list of packets to be sent by caller
+ *
+ * Consumes the buffer chain, except when returning -ELINKCONG,
+ * since the caller then may want to make more send attempts.
+ * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
+ * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
+ */
+int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
+                  struct sk_buff_head *xmitq)
+{
+       struct tipc_msg *hdr = buf_msg(skb_peek(list));
+       unsigned int maxwin = l->window;
+       unsigned int i, imp = msg_importance(hdr);
+       unsigned int mtu = l->mtu;
+       u16 ack = l->rcv_nxt - 1;
+       u16 seqno = l->snd_nxt;
+       u16 bc_last_in = l->owner->bclink.last_in;
+       struct sk_buff_head *transmq = &l->transmq;
+       struct sk_buff_head *backlogq = &l->backlogq;
+       struct sk_buff *skb, *_skb, *bskb;
+
+       /* Match msg importance against this and all higher backlog limits: */
+       for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
+               if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
+                       return link_schedule_user(l, list);
+       }
+       if (unlikely(msg_size(hdr) > mtu))
+               return -EMSGSIZE;
+
+       /* Prepare each packet for sending, and add to relevant queue: */
+       while (skb_queue_len(list)) {
+               skb = skb_peek(list);
+               hdr = buf_msg(skb);
+               msg_set_seqno(hdr, seqno);
+               msg_set_ack(hdr, ack);
+               msg_set_bcast_ack(hdr, bc_last_in);
+
+               if (likely(skb_queue_len(transmq) < maxwin)) {
+                       _skb = skb_clone(skb, GFP_ATOMIC);
+                       if (!_skb)
+                               return -ENOBUFS;
+                       __skb_dequeue(list);
+                       __skb_queue_tail(transmq, skb);
+                       __skb_queue_tail(xmitq, _skb);
+                       l->rcv_unacked = 0;
+                       seqno++;
+                       continue;
+               }
+               if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
+                       kfree_skb(__skb_dequeue(list));
+                       l->stats.sent_bundled++;
+                       continue;
+               }
+               if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
+                       kfree_skb(__skb_dequeue(list));
+                       __skb_queue_tail(backlogq, bskb);
+                       l->backlog[msg_importance(buf_msg(bskb))].len++;
+                       l->stats.sent_bundled++;
+                       l->stats.sent_bundles++;
+                       continue;
+               }
+               l->backlog[imp].len += skb_queue_len(list);
+               skb_queue_splice_tail_init(list, backlogq);
+       }
+       l->snd_nxt = seqno;
+       return 0;
+}
+
 static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
 {
        skb_queue_head_init(list);
@@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
        return __tipc_link_xmit(link->owner->net, link, &head);
 }
 
-/* tipc_link_xmit_skb(): send single buffer to destination
- * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
- * messages, which will not cause link congestion
- * The only exception is datagram messages rerouted after secondary
- * lookup, which are rare and safe to dispose of anyway.
- * TODO: Return real return value, and let callers use
- * tipc_wait_for_sendpkt() where applicable
- */
-int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
-                      u32 selector)
-{
-       struct sk_buff_head head;
-       int rc;
-
-       skb2list(skb, &head);
-       rc = tipc_link_xmit(net, &head, dnode, selector);
-       if (rc)
-               kfree_skb(skb);
-       return 0;
-}
-
-/**
- * tipc_link_xmit() is the general link level function for message sending
- * @net: the applicable net namespace
- * @list: chain of buffers containing message
- * @dsz: amount of user data to be sent
- * @dnode: address of destination node
- * @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning error
- * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
- */
-int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
-                  u32 selector)
-{
-       struct tipc_link *link = NULL;
-       struct tipc_node *node;
-       int rc = -EHOSTUNREACH;
-
-       node = tipc_node_find(net, dnode);
-       if (node) {
-               tipc_node_lock(node);
-               link = node_active_link(node, selector & 1);
-               if (link)
-                       rc = __tipc_link_xmit(net, link, list);
-               tipc_node_unlock(node);
-               tipc_node_put(node);
-       }
-       if (link)
-               return rc;
-
-       if (likely(in_own_node(net, dnode))) {
-               tipc_sk_rcv(net, list);
-               return 0;
-       }
-
-       __skb_queue_purge(list);
-       return rc;
-}
-
 /*
  * tipc_link_sync_xmit - synchronize broadcast link endpoints.
  *