tipc: move the delivery of named messages out of nametbl lock
authorYing Xue <ying.xue@windriver.com>
Mon, 28 Apr 2014 10:00:10 +0000 (18:00 +0800)
committerDavid S. Miller <davem@davemloft.net>
Mon, 28 Apr 2014 18:49:54 +0000 (14:49 -0400)
Commit a89778d8baf19cd7e728d81121a294a06cedaad1 ("tipc: add support
for link state subscriptions") introduced below possible deadlock
scenario:

       CPU0                          CPU1
T0:   tipc_publish()                 link_timeout()
T1:   tipc_nametbl_publish()         [grab node lock]*
T2:   [grab nametbl write lock]*     link_state_event()
T3:   named_cluster_distribute()     link_activate()
T4:   [grab node lock]*              tipc_node_link_up()
T5:                                  tipc_nametbl_publish()
T6:                                  [grab nametble write lock]*

The opposite order of holding nametbl write lock and node lock on
above two different paths may result in a deadlock. If we move the
the delivery of named messages via link out of name nametbl lock,
the reverse order of holding locks will be eliminated, as a result,
the deadlock will be killed as well.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/name_distr.c
net/tipc/name_distr.h
net/tipc/name_table.c

index 36a72822601c91f2c2d64aadc92fb6ad79976d3a..974a73f3d87622f867ad07ffd563a0856dc7d985 100644 (file)
@@ -127,7 +127,7 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
        return buf;
 }
 
-static void named_cluster_distribute(struct sk_buff *buf)
+void named_cluster_distribute(struct sk_buff *buf)
 {
        struct sk_buff *buf_copy;
        struct tipc_node *n_ptr;
@@ -156,7 +156,7 @@ static void named_cluster_distribute(struct sk_buff *buf)
 /**
  * tipc_named_publish - tell other nodes about a new publication by this node
  */
-void tipc_named_publish(struct publication *publ)
+struct sk_buff *tipc_named_publish(struct publication *publ)
 {
        struct sk_buff *buf;
        struct distr_item *item;
@@ -165,23 +165,23 @@ void tipc_named_publish(struct publication *publ)
        publ_lists[publ->scope]->size++;
 
        if (publ->scope == TIPC_NODE_SCOPE)
-               return;
+               return NULL;
 
        buf = named_prepare_buf(PUBLICATION, ITEM_SIZE, 0);
        if (!buf) {
                pr_warn("Publication distribution failure\n");
-               return;
+               return NULL;
        }
 
        item = (struct distr_item *)msg_data(buf_msg(buf));
        publ_to_item(item, publ);
-       named_cluster_distribute(buf);
+       return buf;
 }
 
 /**
  * tipc_named_withdraw - tell other nodes about a withdrawn publication by this node
  */
-void tipc_named_withdraw(struct publication *publ)
+struct sk_buff *tipc_named_withdraw(struct publication *publ)
 {
        struct sk_buff *buf;
        struct distr_item *item;
@@ -190,17 +190,17 @@ void tipc_named_withdraw(struct publication *publ)
        publ_lists[publ->scope]->size--;
 
        if (publ->scope == TIPC_NODE_SCOPE)
-               return;
+               return NULL;
 
        buf = named_prepare_buf(WITHDRAWAL, ITEM_SIZE, 0);
        if (!buf) {
                pr_warn("Withdrawal distribution failure\n");
-               return;
+               return NULL;
        }
 
        item = (struct distr_item *)msg_data(buf_msg(buf));
        publ_to_item(item, publ);
-       named_cluster_distribute(buf);
+       return buf;
 }
 
 /*
index 9b312ccfd43e7da41bcab4ca33d5f0f4d5be86cf..47ff829f936176cc438c30e0e6336400dc6a7c06 100644 (file)
@@ -39,8 +39,9 @@
 
 #include "name_table.h"
 
-void tipc_named_publish(struct publication *publ);
-void tipc_named_withdraw(struct publication *publ);
+struct sk_buff *tipc_named_publish(struct publication *publ);
+struct sk_buff *tipc_named_withdraw(struct publication *publ);
+void named_cluster_distribute(struct sk_buff *buf);
 void tipc_named_node_up(unsigned long node);
 void tipc_named_rcv(struct sk_buff *buf);
 void tipc_named_reinit(void);
index 042e8e3cabc09f84aa5dce626c57a30faf3ca32d..9bcf4b58853f68145d2a52a1731be1878d4e2070 100644 (file)
@@ -664,6 +664,7 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
                                         u32 scope, u32 port_ref, u32 key)
 {
        struct publication *publ;
+       struct sk_buff *buf = NULL;
 
        if (table.local_publ_count >= TIPC_MAX_PUBLICATIONS) {
                pr_warn("Publication failed, local publication limit reached (%u)\n",
@@ -676,9 +677,12 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
                                   tipc_own_addr, port_ref, key);
        if (likely(publ)) {
                table.local_publ_count++;
-               tipc_named_publish(publ);
+               buf = tipc_named_publish(publ);
        }
        write_unlock_bh(&tipc_nametbl_lock);
+
+       if (buf)
+               named_cluster_distribute(buf);
        return publ;
 }
 
@@ -688,15 +692,19 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
 int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
 {
        struct publication *publ;
+       struct sk_buff *buf;
 
        write_lock_bh(&tipc_nametbl_lock);
        publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
        if (likely(publ)) {
                table.local_publ_count--;
-               tipc_named_withdraw(publ);
+               buf = tipc_named_withdraw(publ);
                write_unlock_bh(&tipc_nametbl_lock);
                list_del_init(&publ->pport_list);
                kfree(publ);
+
+               if (buf)
+                       named_cluster_distribute(buf);
                return 1;
        }
        write_unlock_bh(&tipc_nametbl_lock);