changes to mpmc spec and add notes to ms-queue
authorPeizhao Ou <peizhaoo@uci.edu>
Thu, 15 Jan 2015 22:44:42 +0000 (14:44 -0800)
committerPeizhao Ou <peizhaoo@uci.edu>
Thu, 15 Jan 2015 22:44:42 +0000 (14:44 -0800)
benchmark/mpmc-queue/mpmc-queue.cc
benchmark/mpmc-queue/mpmc-queue.h
benchmark/ms-queue/note.txt [new file with mode: 0644]
src/edu/uci/eecs/specCompiler/codeGenerator/CodeGenerator.java

index 3a19045a65f5f935446f077ca8337a2ea5581e8c..39c2c0a24808a3ce62b05a47b66b11f0445d198e 100644 (file)
@@ -11,7 +11,7 @@
 void threadA(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
 {
        int32_t *bin = queue->write_prepare();
-       store_32(bin, 1);
+       //store_32(bin, 1);
        *bin = 1;
        printf("write_bin %d, val %d\n", bin, 1);
        queue->write_publish(bin);
@@ -21,8 +21,8 @@ void threadB(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
 {
        int32_t *bin;
        while (bin = queue->read_fetch()) {
-               printf("Read: %d\n", load_32(bin));
-               printf("read_bin %d, val %d\n", bin, load_32(bin));
+               //printf("Read: %d\n", load_32(bin));
+               //printf("read_bin %d, val %d\n", bin, load_32(bin));
                printf("Read: %d\n", *bin);
                queue->read_consume(bin);
        }
@@ -31,12 +31,12 @@ void threadB(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
 void threadC(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
 {
        int32_t *bin = queue->write_prepare();
-       store_32(bin, 1);
+       //store_32(bin, 1);
        *bin = 1;
        queue->write_publish(bin);
 
        while (bin = queue->read_fetch()) {
-               printf("Read: %d\n", load_32(bin));
+               //printf("Read: %d\n", load_32(bin));
                printf("Read: %d\n", *bin);
                queue->read_consume(bin);
        }
@@ -119,7 +119,7 @@ int user_main(int argc, char **argv)
 #ifndef CONFIG_MPMC_NO_INITIAL_ELEMENT
        printf("Adding initial element\n");
        int32_t *bin = queue.write_prepare();
-       store_32(bin, 17);
+       //store_32(bin, 17);
        *bin, 17;
        printf("init_write_bin %d, val %d\n", bin, 17);
        queue.write_publish(bin);
index e6d54855ede7d2b5f2d7f7dda7b51dfa6498b37b..1b737bdcf6af9a94032c4025905eaf511eccd81b 100644 (file)
@@ -73,7 +73,7 @@ public:
        /**
                @Begin
                @Interface: Fetch
-               @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
+               @Commit_point_set: Fetch_RW_Load_Empty | Fetch_RW_RMW | Fetch_W_Load
                @ID: (call_id_t) __RET__
                //@Check:
                        //__RET__ == NULL || has_elem(list, __RET__)
@@ -86,7 +86,7 @@ public:
                /**
                        @Begin
                        @Potential_commit_point_define: true
-                       @Label: Fetch_Potential_Point
+                       @Label: Fetch_Potential_RW_Load
                        @End
                */
                unsigned int rd,wr;
@@ -99,8 +99,8 @@ public:
                                /**
                                        @Begin
                                        @Commit_point_define: true
-                                       @Potential_commit_point_label: Fetch_Potential_Point 
-                                       @Label: Fetch_Empty_Point
+                                       @Potential_commit_point_label: Fetch_Potential_RW_Load
+                                       @Label: Fetch_RW_Load_Empty
                                        @End
                                */
 
@@ -110,8 +110,8 @@ public:
                        bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
                        /**
                                @Begin
-                               @Commit_point_define_check: succ == true
-                               @Label: Fetch_Succ_Point
+                               @Commit_point_define_check: succ
+                               @Label: Fetch_RW_RMW
                                @End
                        */
                        if (succ)
@@ -122,9 +122,28 @@ public:
 
                // (*1)
                rl::backoff bo;
-               while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
-                       thrd_yield();
+               while (true) {
+                       int written = m_written.load(mo_acquire);
+                       /**
+                               @Begin
+                               @Potential_commit_point_define: true
+                               @Label: Fetch_Potential_W_Load
+                               @End
+                       */
+                       if ((written & 0xFFFF) != wr) {
+                               thrd_yield();
+                       } else {
+                               break;
+                       }
                }
+               
+               /**
+                       @Begin
+                       @Commit_point_define: true
+                       @Potential_commit_point_label: Fetch_Potential_W_Load
+                       @Label: Fetch_W_Load
+                       @End
+               */
 
                t_element * p = & ( m_array[ rd % t_size ] );
                
@@ -134,7 +153,7 @@ public:
        /**
                @Begin
                @Interface: Consume
-               @Commit_point_set: Consume_Point
+               @Commit_point_set: Consume_R_RMW
                @ID: (call_id_t) bin 
                //@Check:
                //      consume_check(__TID__)
@@ -148,7 +167,7 @@ public:
                /**
                        @Begin
                        @Commit_point_define_check: true
-                       @Label: Consume_Point
+                       @Label: Consume_R_RMW
                        @End
                */
        }
@@ -158,7 +177,7 @@ public:
        /**
                @Begin
                @Interface: Prepare 
-               @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
+               @Commit_point_set: Prepare_RW_Load_Full | Prepare_RW_RMW | Prepare_R_Load
                @ID: (call_id_t) __RET__
                //@Check:
                        //prepare_check(__RET__, __TID__)
@@ -173,7 +192,7 @@ public:
                /**
                        @Begin
                        @Potential_commit_point_define: true
-                       @Label: Prepare_Potential_Point
+                       @Label: Prepare_Potential_RW_Load
                        @End
                */
                unsigned int rd,wr;
@@ -186,8 +205,8 @@ public:
                                /**
                                        @Begin
                                        @Commit_point_define: true
-                                       @Potential_commit_point_label: Prepare_Potential_Point 
-                                       @Label: Prepare_Full_Point
+                                       @Potential_commit_point_label: Prepare_Potential_RW_Load
+                                       @Label: Prepare_RW_Load_Full
                                        @End
                                */
                                return NULL;
@@ -197,8 +216,8 @@ public:
                                ((wr+1)&0xFFFF),mo_acq_rel);
                        /**
                                @Begin
-                               @Commit_point_define_check: succ == true
-                               @Label: Prepare_Succ_Point
+                               @Commit_point_define_check: succ
+                               @Label: Prepare_RW_RMW
                                @End
                        */
                        if (succ)
@@ -209,10 +228,28 @@ public:
 
                // (*1)
                rl::backoff bo;
-               while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
-                       thrd_yield();
+               while (true) {
+                       int read = m_read.load(mo_acquire);
+                       /**
+                               @Begin
+                               @Potential_commit_point_define: true
+                               @Label: Prepare_Potential_R_Load
+                               @End
+                       */
+                       if ((read & 0xFFFF) != rd)
+                               thrd_yield();
+                       else
+                               break;
                }
 
+               /**
+                       @Begin
+                       @Commit_point_define: true
+                       @Potential_commit_point_label: Prepare_Potential_R_Load
+                       @Label: Prepare_R_Load
+                       @End
+               */
+
                t_element * p = & ( m_array[ wr % t_size ] );
 
                return p;
@@ -221,7 +258,7 @@ public:
        /**
                @Begin
                @Interface: Publish 
-               @Commit_point_set: Publish_Point
+               @Commit_point_set: Publish_W_RMW
                @ID: (call_id_t) bin 
                //@Check:
                        //publish_check(__TID__)
@@ -236,7 +273,7 @@ public:
                /**
                        @Begin
                        @Commit_point_define_check: true
-                       @Label: Publish_Point
+                       @Label: Publish_W_RMW
                        @End
                */
        }
diff --git a/benchmark/ms-queue/note.txt b/benchmark/ms-queue/note.txt
new file mode 100644 (file)
index 0000000..2889961
--- /dev/null
@@ -0,0 +1,25 @@
+#1: The new bug we found in this data structure is as the following:
+
+In dequeue(), the load of Tail has to be acquire. If not, the load of the next
+field won't get synchronize with the enqueuer that inserted the element right
+after this dequeue() operation. We can catch this bug using the two independent
+dequeuers and 1 (or 2) enqueuer.
+
+#2: When writing the speicification for this data structure, we have been
+considering the following invariance of the enqueue() and dequeue() operations.
+For enqueue() to succeed, it finally loads the Tail, inserts the new element by
+updating the next field right after the Tail load. The load of Tail actually
+orders the enqueuers, and the CAS of the next field will order the enqueuer and
+its dequeuer. For dequeue() to succeed, it either loads the Head and Tail, and
+then bails; or it loads the Head and then loads the next field. Similarly, the
+load of Head orders dequeuers.
+
+We have one key "additional_ordering_point" in enqueue() method for ordering the
+dequeuer and its immediately next enqueuer. For example, when we have two
+threads, 1 dequeuer and 1 enqueuer, when dequeue() returns NULL, we can order
+dequeue() before enqueue() by using the additional ordering point of enqueue()
+where the swing of Tail (or just the load of Tail when someone else swings it).
+Since dequeue() will load the Tail, and enqueue() will CAS or load the Tail
+later, deuqeue() can be ordered() before enqueue(). Notably here, additional
+ordering points are only used when methods has 'same-location' commit points and
+they can't order each other.
index 4a1c2a79db852b1e29c2aa883f3932e3e896a6d1..4acbfb623e57735f0eb8dd66cf9bde72d48c03fc 100644 (file)
@@ -327,7 +327,7 @@ public class CodeGenerator {
 //             File[][] sources = { srcLinuxRWLocks,  srcMSQueue, srcRCU,
 //                             srcDeque, srcMCSLock, srcSPSCQueue, srcMPMCQueue, srcHashtable };
 
-                File[][] sources = {srcMSQueue, srcHashtable };
+                File[][] sources = {srcMSQueue, srcMPMCQueue };
                // Compile all the benchmarks
                for (int i = 0; i < sources.length; i++) {
                        CodeGenerator gen = new CodeGenerator(sources[i]);