save
authorPeizhao Ou <peizhaoo@uci.edu>
Thu, 20 Mar 2014 06:01:13 +0000 (23:01 -0700)
committerPeizhao Ou <peizhaoo@uci.edu>
Thu, 20 Mar 2014 06:01:13 +0000 (23:01 -0700)
benchmark/chase-lev-deque-bugfix/deque.c
benchmark/chase-lev-deque-bugfix/deque.h
benchmark/chase-lev-deque-bugfix/main.c
benchmark/mpmc-queue/mpmc-queue.cc
benchmark/mpmc-queue/mpmc-queue.h

index 2cfbb76..7c32656 100644 (file)
@@ -35,7 +35,8 @@ int take(Deque *q) {
        int x;
        if (t <= b) {
                /* Non-empty queue. */
-               x = atomic_load_explicit(&a->buffer[b % atomic_load_explicit(&a->size,memory_order_relaxed)], memory_order_relaxed);
+               int size = atomic_load_explicit(&a->size,memory_order_relaxed);
+               x = atomic_load_explicit(&a->buffer[b % size], memory_order_relaxed);
                /**
                        @Begin
                        @Commit_point_define_check: t != b
@@ -48,17 +49,10 @@ int take(Deque *q) {
                                1, memory_order_seq_cst, memory_order_relaxed);
                        /**
                                @Begin
-                               @Commit_point_define_check: succ == true
+                               @Commit_point_define_check: succ
                                @Label: Take_Point3
                                @End
                        */
-                       
-                       /**
-                               @Begin
-                               @Commit_point_define_check: succ == false
-                               @Label: Take_Point4
-                               @End
-                       */
                        if (!succ) {
                                /* Failed race. */
                                x = EMPTY;
@@ -102,15 +96,18 @@ void push(Deque *q, int x) {
                //Bug in paper...should have next line...
                a = (Array *) atomic_load_explicit(&q->array, memory_order_relaxed);
        }
-       atomic_store_explicit(&a->buffer[b % atomic_load_explicit(&a->size, memory_order_relaxed)], x, memory_order_relaxed);
-       atomic_thread_fence(memory_order_release);
+       int size = atomic_load_explicit(&a->size, memory_order_relaxed);
+       atomic_store_explicit(&a->buffer[b % size], x, memory_order_relaxed);
        /**
                @Begin
                @Commit_point_define_check: true
                @Label: Push_Point
                @End
        */
+       atomic_thread_fence(memory_order_release);
+       
        atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
+       
 }
 
 /**
@@ -132,19 +129,28 @@ int steal(Deque *q) {
        if (t < b) {
                /* Non-empty queue. */
                Array *a = (Array *) atomic_load_explicit(&q->array, memory_order_acquire);
-               x = atomic_load_explicit(&a->buffer[t % atomic_load_explicit(&a->size, memory_order_relaxed)], memory_order_relaxed);
+               int size = atomic_load_explicit(&a->size, memory_order_relaxed);
+               x = atomic_load_explicit(&a->buffer[t % size], memory_order_relaxed);
+               /**
+                       @Begin
+                       @Potential_commit_point_define: true
+                       @Label: Potential_Steal
+                       @End
+               */
+               
                bool succ = atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1,
                        memory_order_seq_cst, memory_order_relaxed);
                /**
                        @Begin
-                       @Commit_point_define_check: succ == true
-                       @Label: Steal_Point2
+                       @Commit_point_define_check: !succ
+                       @Label: Steal_Point4
                        @End
                */
-               
+
                /**
                        @Begin
-                       @Commit_point_define_check: succ == false
+                       @Commit_point_define: succ
+                       @Potential_commit_point_label: Potential_Steal
                        @Label: Steal_Point3
                        @End
                */
index fa76d8d..d8114d5 100644 (file)
@@ -1,6 +1,13 @@
 #ifndef DEQUE_H
 #define DEQUE_H
 
+#include <spec_lib.h>
+#include <stdlib.h>
+#include <cdsannotate.h>
+#include <specannotation.h>
+#include <model_memory.h>
+#include "common.h"
+
 typedef struct {
        atomic_size_t size;
        atomic_int buffer[];
@@ -59,7 +66,7 @@ void resize(Deque *q);
 /**
     @Begin
     @Interface: Take 
-    @Commit_point_set: Take_Point1 | Take_Point2 | Take_Point3 | Take_Point4
+    @Commit_point_set: Take_Point1 | Take_Point2 | Take_Point3
     @ID: size(__deque) == 0 ? DEFAULT_CALL_ID : get_id(back(__deque))
     @Action:
         int _Old_Val = EMPTY;
@@ -88,7 +95,7 @@ void push(Deque *q, int x);
 /**
     @Begin
     @Interface: Steal 
-    @Commit_point_set: Steal_Point1 | Steal_Point2 | Steal_Point3
+    @Commit_point_set: Steal_Point1 | Steal_Point2
     @ID: size(__deque) == 0 ? DEFAULT_CALL_ID : get_id(front(__deque))
     @Action:
         int _Old_Val = EMPTY;
index 7fb831f..782539e 100644 (file)
@@ -14,9 +14,6 @@ int b;
 int c;
 
 static void task(void * param) {
-       // FIXME: Add the following take() will expose an Uninitialzied Load bug...
-       //a=take(q);
-
        a=steal(q);
 }
 
@@ -48,7 +45,7 @@ int user_main(int argc, char **argv)
                correct=false;
        if (!correct)
                printf("a=%d b=%d c=%d\n",a,b,c);
-       MODEL_ASSERT(correct);
+       //MODEL_ASSERT(correct);
 
        return 0;
 }
index cfa82f2..0785711 100644 (file)
@@ -13,7 +13,7 @@ void threadA(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
        int32_t *bin = queue->write_prepare();
        store_32(bin, 1);
        printf("write_bin %d, val %d\n", bin, 1);
-       queue->write_publish();
+       queue->write_publish(bin);
 }
 
 void threadB(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
@@ -30,7 +30,7 @@ void threadC(struct mpmc_boundq_1_alt<int32_t, sizeof(int32_t)> *queue)
 {
        int32_t *bin = queue->write_prepare();
        store_32(bin, 1);
-       queue->write_publish();
+       queue->write_publish(bin);
 
        while (bin = queue->read_fetch()) {
                printf("Read: %d\n", load_32(bin));
@@ -117,7 +117,7 @@ int user_main(int argc, char **argv)
        int32_t *bin = queue.write_prepare();
        store_32(bin, 17);
        printf("init_write_bin %d, val %d\n", bin, 17);
-       queue.write_publish();
+       queue.write_publish(bin);
 #endif
 
        printf("Start threads\n");
index cd9d430..a0797bc 100644 (file)
@@ -49,209 +49,12 @@ public:
                        LANG = CPP;
                        CLASS = mpmc_boundq_1_alt;
                @Global_define:
-               @DeclareStruct:
-                       typedef struct elem {
-                               t_element *pos;
-                               bool written;
-                               thread_id_t tid;
-                               thread_id_t fetch_tid;
-                               call_id_t id;
-                       } elem;
                @DeclareVar:
-                       spec_list *list;
                        id_tag_t *tag;
                @InitVar:
-                       list = new_spec_list();
-                       tag = new_id_tag();
-               @DefineFunc:
-                       elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) {
-                               elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
-                               e->pos = pos;
-                               e->written = false;
-                               e->id = id;
-                               e->tid = tid;
-                               e->fetch_tid = -1;
-                       }
-               @DefineFunc:
-                       elem* get_elem_by_pos(t_element *pos) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *e = (elem*) elem_at_index(list, i);
-                                       if (e->pos == pos) {
-                                               return e;
-                                       }
-                               }
-                               return NULL;
-                       }
-               @DefineFunc:
-                       void show_list() {
-                               //model_print("Status:\n");
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *e = (elem*) elem_at_index(list, i);
-                                       //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id); 
-                               }
-                       }
-               @DefineFunc:
-                       elem* get_elem_by_tid(thread_id_t tid) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *e = (elem*) elem_at_index(list, i);
-                                       if (e->tid== tid) {
-                                               return e;
-                                       }
-                               }
-                               return NULL;
-                       }
-               @DefineFunc:
-                       elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *e = (elem*) elem_at_index(list, i);
-                                       if (e->fetch_tid== fetch_tid) {
-                                               return e;
-                                       }
-                               }
-                               return NULL;
-                       }
-               @DefineFunc:
-                       int elem_idx_by_pos(t_element *pos) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *existing = (elem*) elem_at_index(list, i);
-                                       if (pos == existing->pos) {
-                                               return i;
-                                       }
-                               }
-                               return -1;
-                       }
-               @DefineFunc:
-                       int elem_idx_by_tid(thread_id_t tid) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *existing = (elem*) elem_at_index(list, i);
-                                       if (tid == existing->tid) {
-                                               return i;
-                                       }
-                               }
-                               return -1;
-                       }
-               @DefineFunc:
-                       int elem_idx_by_fetch_tid(thread_id_t fetch_tid) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *existing = (elem*) elem_at_index(list, i);
-                                       if (fetch_tid == existing->fetch_tid) {
-                                               return i;
-                                       }
-                               }
-                               return -1;
-                       }
-               @DefineFunc:
-                       int elem_num(t_element *pos) {
-                               int cnt = 0;
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *existing = (elem*) elem_at_index(list, i);
-                                       if (pos == existing->pos) {
-                                               cnt++;
-                                       }
-                               }
-                               return cnt;
-                       }
-               @DefineFunc:
-                       call_id_t prepare_id() {
-                               call_id_t res = get_and_inc(tag);
-                               //model_print("prepare_id: %d\n", res);
-                               return res;
-                       }
-               @DefineFunc:
-                       bool prepare_check(t_element *pos, thread_id_t tid) {
-                               show_list();
-                               elem *e = get_elem_by_pos(pos);
-                               //model_print("prepare_check: e %d\n", e);
-                               return NULL == e;
-                       }
-               @DefineFunc:
-                       void prepare(call_id_t id, t_element *pos, thread_id_t tid) {
-                               //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid);
-                               elem *e = new_elem(pos, id, tid);
-                               push_back(list, e);
-                       }
-               @DefineFunc:
-                       call_id_t publish_id(thread_id_t tid) {
-                               elem *e = get_elem_by_tid(tid);
-                               //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id);
-                               if (NULL == e)
-                                       return DEFAULT_CALL_ID;
-                               return e->id;
-                       }
-               @DefineFunc:
-                       bool publish_check(thread_id_t tid) {
-                               show_list();
-                               elem *e = get_elem_by_tid(tid);
-                               //model_print("publish_check: tid %d\n", tid);
-                               if (NULL == e)
-                                       return false;
-                               if (elem_num(e->pos) > 1)
-                                       return false;
-                               return !e->written;
-                       }
-               @DefineFunc:
-                       void publish(thread_id_t tid) {
-                               //model_print("publish: tid %d\n", tid);
-                               elem *e = get_elem_by_tid(tid);
-                               e->written = true;
-                       }
-               @DefineFunc:
-                       call_id_t fetch_id(t_element *pos) {
-                               elem *e = get_elem_by_pos(pos);
-                               //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id);
-                               if (NULL == e)
-                                       return DEFAULT_CALL_ID;
-                               return e->id;
-                       }
-               @DefineFunc:
-                       bool fetch_check(t_element *pos) {
-                               show_list();
-                               if (pos == NULL) return true;
-                               elem *e = get_elem_by_pos(pos);
-                               //model_print("fetch_check: pos %d, e %d\n", pos, e);
-                               if (e == NULL) return false;
-                               if (elem_num(e->pos) > 1)
-                                       return false;
-                               return true;
-                       }
-               @DefineFunc:
-                       void fetch(t_element *pos, thread_id_t tid) {
-                               if (pos == NULL) return;
-                               elem *e = (elem*) get_elem_by_pos(pos);
-                               //model_print("fetch: pos %d, tid %d\n", pos, tid);
-                               // Remember the thread that fetches the position
-                               e->fetch_tid = tid;
-                       }
-               @DefineFunc:
-                       bool consume_check(thread_id_t tid) {
-                               show_list();
-                               elem *e = get_elem_by_fetch_tid(tid);
-                               //model_print("consume_check: tid %d, e %d\n", tid, e);
-                               if (NULL == e)
-                                       return false;
-                               if (elem_num(e->pos) > 1)
-                                       return false;
-                               return e->written;
-                       }
-               @DefineFunc:
-                       call_id_t consume_id(thread_id_t tid) {
-                               elem *e = get_elem_by_fetch_tid(tid);
-                               //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id);
-                               if (NULL == e)
-                                       return DEFAULT_CALL_ID;
-                               return e->id;
-                       }
-               @DefineFunc:
-                       void consume(thread_id_t tid) {
-                               //model_print("consume: tid %d\n", tid);
-                               int idx = elem_idx_by_fetch_tid(tid);
-                               if (idx == -1)
-                                       return;
-                               remove_at_index(list, idx);
-                       }
+                       tag = NULL;
        @Happens_before:
-               Prepare -> Fetch
-               Publish -> Consume
+               Publish -> Fetch
        @End
        */
 
@@ -260,20 +63,16 @@ public:
        /**
                @Begin
                @Interface: Fetch
-               @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
-               @ID: fetch_id(__RET__)
-               @Check:
-                       fetch_check(__RET__)
-               @Action:
-                       fetch(__RET__, __TID__);
+               @Commit_point_set: Fetch_Succ_Point
+               @ID: (call_id_t) __RET__
                @End
        */
        t_element * read_fetch() {
                unsigned int rdwr = m_rdwr.load(mo_acquire);
                /**
                        @Begin
-                       @Potential_commit_point_define: true
-                       @Label: Fetch_Potential_Point
+                       @Commit_point_define_check: (rdwr>>16) & 0xFFFF == rdwr & 0xFFFF
+                       @Label: Fetch_Succ_Point1
                        @End
                */
                unsigned int rd,wr;
@@ -282,23 +81,11 @@ public:
                        wr = rdwr & 0xFFFF;
 
                        if ( wr == rd ) { // empty
-                               /**
-                                       @Begin
-                                       @Commit_point_define: true
-                                       @Potential_commit_point_label: Fetch_Potential_Point 
-                                       @Label: Fetch_Empty_Point
-                                       @End
-                               */
                                return false;
                        }
                        
                        bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
-                       /**
-                               @Begin
-                               @Commit_point_define_check: succ == true
-                               @Label: Fetch_Succ_Point
-                               @End
-                       */
+                       
                        if (succ)
                                break;
                        else
@@ -307,7 +94,16 @@ public:
 
                // (*1)
                rl::backoff bo;
-               while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
+               while ( true ) {
+                       unsigned int tmp = m_written.load(mo_acquire);
+                       /**
+                               @Begin
+                               @Commit_point_define_check: (tmp & 0xFFFF) == wr
+                               @Label: Fetch_Succ_Point2
+                               @End
+                       */
+                       if ((tmp & 0xFFFF) == wr)
+                               break;
                        thrd_yield();
                }
 
@@ -316,72 +112,25 @@ public:
                return p;
        }
 
-       /**
-               @Begin
-               @Interface: Consume
-               @Commit_point_set: Consume_Point
-               @ID: consume_id(__TID__)
-               @Check:
-                       consume_check(__TID__)
-               @Action:
-                       consume(__TID__);
-               @End
-       */
        void read_consume() {
                m_read.fetch_add(1,mo_release);
-               /**
-                       @Begin
-                       @Commit_point_define_check: true
-                       @Label: Consume_Point
-                       @End
-               */
        }
 
        //-----------------------------------------------------
 
-       /**
-               @Begin
-               @Interface: Prepare 
-               @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
-               @ID: prepare_id()
-               @Check:
-                       prepare_check(__RET__, __TID__)
-               @Action:
-                       prepare(__ID__, __RET__, __TID__);
-               @End
-       */
        t_element * write_prepare() {
                unsigned int rdwr = m_rdwr.load(mo_acquire);
-               /**
-                       @Begin
-                       @Potential_commit_point_define: true
-                       @Label: Prepare_Potential_Point
-                       @End
-               */
                unsigned int rd,wr;
                for(;;) {
                        rd = (rdwr>>16) & 0xFFFF;
                        wr = rdwr & 0xFFFF;
 
                        if ( wr == ((rd + t_size)&0xFFFF) ) { // full
-                               /**
-                                       @Begin
-                                       @Commit_point_define: true
-                                       @Potential_commit_point_label: Prepare_Potential_Point 
-                                       @Label: Prepare_Full_Point
-                                       @End
-                               */
                                return NULL;
                        }
                        
                        bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
                                ((wr+1)&0xFFFF),mo_acq_rel);
-                       /**
-                               @Begin
-                               @Commit_point_define_check: succ == true
-                               @Label: Prepare_Succ_Point
-                               @End
-                       */
                        if (succ)
                                break;
                        else
@@ -403,14 +152,10 @@ public:
                @Begin
                @Interface: Publish 
                @Commit_point_set: Publish_Point
-               @ID: publish_id(__TID__)
-               @Check:
-                       publish_check(__TID__)
-               @Action:
-                       publish(__TID__);
+               @ID: (uint64_t) elem
                @End
        */
-       void write_publish()
+       void write_publish(t_element *elem)
        {
                m_written.fetch_add(1,mo_release);
                /**