injection result for ms-queue
authorPeizhao Ou <peizhaoo@uci.edu>
Fri, 21 Mar 2014 21:06:33 +0000 (14:06 -0700)
committerPeizhao Ou <peizhaoo@uci.edu>
Fri, 21 Mar 2014 21:06:33 +0000 (14:06 -0700)
benchmark/mpmc-queue/mpmc-queue.h
benchmark/ms-queue/my_queue.c

index 0c94b6ef5133cf7226a2269dedacd53c9333b086..ee3950d8219d14dcc7a2ed2b7924e1d07c7bbdb7 100644 (file)
@@ -2,13 +2,6 @@
 #include <unrelacy.h>
 #include <common.h>
 
-#include <spec_lib.h>
-#include <stdlib.h>
-#include <cdsannotate.h>
-#include <specannotation.h>
-#include <model_memory.h>
-#include "common.h" 
-
 /**
        @Begin
        @Class_begin
@@ -49,12 +42,209 @@ public:
                        LANG = CPP;
                        CLASS = mpmc_boundq_1_alt;
                @Global_define:
+               @DeclareStruct:
+                       typedef struct elem {
+                               t_element *pos;
+                               bool written;
+                               thread_id_t tid;
+                               thread_id_t fetch_tid;
+                               call_id_t id;
+                       } elem;
                @DeclareVar:
+                       spec_list *list;
                        id_tag_t *tag;
                @InitVar:
-                       tag = NULL;
+                       list = new_spec_list();
+                       tag = new_id_tag();
+               @DefineFunc:
+                       elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) {
+                               elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
+                               e->pos = pos;
+                               e->written = false;
+                               e->id = id;
+                               e->tid = tid;
+                               e->fetch_tid = -1;
+                       }
+               @DefineFunc:
+                       elem* get_elem_by_pos(t_element *pos) {
+                               for (int i = 0; i < size(list); i++) {
+                                       elem *e = (elem*) elem_at_index(list, i);
+                                       if (e->pos == pos) {
+                                               return e;
+                                       }
+                               }
+                               return NULL;
+                       }
+               @DefineFunc:
+                       void show_list() {
+                               //model_print("Status:\n");
+                               for (int i = 0; i < size(list); i++) {
+                                       elem *e = (elem*) elem_at_index(list, i);
+                                       //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id); 
+                               }
+                       }
+               @DefineFunc:
+                       elem* get_elem_by_tid(thread_id_t tid) {
+                               for (int i = 0; i < size(list); i++) {
+                                       elem *e = (elem*) elem_at_index(list, i);
+                                       if (e->tid== tid) {
+                                               return e;
+                                       }
+                               }
+                               return NULL;
+                       }
+               @DefineFunc:
+                       elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) {
+                               for (int i = 0; i < size(list); i++) {
+                                       elem *e = (elem*) elem_at_index(list, i);
+                                       if (e->fetch_tid== fetch_tid) {
+                                               return e;
+                                       }
+                               }
+                               return NULL;
+                       }
+               @DefineFunc:
+                       int elem_idx_by_pos(t_element *pos) {
+                               for (int i = 0; i < size(list); i++) {
+                                       elem *existing = (elem*) elem_at_index(list, i);
+                                       if (pos == existing->pos) {
+                                               return i;
+                                       }
+                               }
+                               return -1;
+                       }
+               @DefineFunc:
+                       int elem_idx_by_tid(thread_id_t tid) {
+                               for (int i = 0; i < size(list); i++) {
+                                       elem *existing = (elem*) elem_at_index(list, i);
+                                       if (tid == existing->tid) {
+                                               return i;
+                                       }
+                               }
+                               return -1;
+                       }
+               @DefineFunc:
+                       int elem_idx_by_fetch_tid(thread_id_t fetch_tid) {
+                               for (int i = 0; i < size(list); i++) {
+                                       elem *existing = (elem*) elem_at_index(list, i);
+                                       if (fetch_tid == existing->fetch_tid) {
+                                               return i;
+                                       }
+                               }
+                               return -1;
+                       }
+               @DefineFunc:
+                       int elem_num(t_element *pos) {
+                               int cnt = 0;
+                               for (int i = 0; i < size(list); i++) {
+                                       elem *existing = (elem*) elem_at_index(list, i);
+                                       if (pos == existing->pos) {
+                                               cnt++;
+                                       }
+                               }
+                               return cnt;
+                       }
+               @DefineFunc:
+                       call_id_t prepare_id() {
+                               call_id_t res = get_and_inc(tag);
+                               //model_print("prepare_id: %d\n", res);
+                               return res;
+                       }
+               @DefineFunc:
+                       bool prepare_check(t_element *pos, thread_id_t tid) {
+                               show_list();
+                               elem *e = get_elem_by_pos(pos);
+                               //model_print("prepare_check: e %d\n", e);
+                               return NULL == e;
+                       }
+               @DefineFunc:
+                       void prepare(call_id_t id, t_element *pos, thread_id_t tid) {
+                               //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid);
+                               elem *e = new_elem(pos, id, tid);
+                               push_back(list, e);
+                       }
+               @DefineFunc:
+                       call_id_t publish_id(thread_id_t tid) {
+                               elem *e = get_elem_by_tid(tid);
+                               //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id);
+                               if (NULL == e)
+                                       return DEFAULT_CALL_ID;
+                               return e->id;
+                       }
+               @DefineFunc:
+                       bool publish_check(thread_id_t tid) {
+                               show_list();
+                               elem *e = get_elem_by_tid(tid);
+                               //model_print("publish_check: tid %d\n", tid);
+                               if (NULL == e)
+                                       return false;
+                               if (elem_num(e->pos) > 1)
+                                       return false;
+                               return !e->written;
+                       }
+               @DefineFunc:
+                       void publish(thread_id_t tid) {
+                               //model_print("publish: tid %d\n", tid);
+                               elem *e = get_elem_by_tid(tid);
+                               e->written = true;
+                       }
+               @DefineFunc:
+                       call_id_t fetch_id(t_element *pos) {
+                               elem *e = get_elem_by_pos(pos);
+                               //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id);
+                               if (NULL == e)
+                                       return DEFAULT_CALL_ID;
+                               return e->id;
+                       }
+               @DefineFunc:
+                       bool fetch_check(t_element *pos) {
+                               show_list();
+                               if (pos == NULL) return true;
+                               elem *e = get_elem_by_pos(pos);
+                               //model_print("fetch_check: pos %d, e %d\n", pos, e);
+                               if (e == NULL) return false;
+                               if (elem_num(e->pos) > 1)
+                                       return false;
+                               return true;
+                       }
+               @DefineFunc:
+                       void fetch(t_element *pos, thread_id_t tid) {
+                               if (pos == NULL) return;
+                               elem *e = (elem*) get_elem_by_pos(pos);
+                               //model_print("fetch: pos %d, tid %d\n", pos, tid);
+                               // Remember the thread that fetches the position
+                               e->fetch_tid = tid;
+                       }
+               @DefineFunc:
+                       bool consume_check(thread_id_t tid) {
+                               show_list();
+                               elem *e = get_elem_by_fetch_tid(tid);
+                               //model_print("consume_check: tid %d, e %d\n", tid, e);
+                               if (NULL == e)
+                                       return false;
+                               if (elem_num(e->pos) > 1)
+                                       return false;
+                               return e->written;
+                       }
+               @DefineFunc:
+                       call_id_t consume_id(thread_id_t tid) {
+                               elem *e = get_elem_by_fetch_tid(tid);
+                               //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id);
+                               if (NULL == e)
+                                       return DEFAULT_CALL_ID;
+                               return e->id;
+                       }
+               @DefineFunc:
+                       void consume(thread_id_t tid) {
+                               //model_print("consume: tid %d\n", tid);
+                               int idx = elem_idx_by_fetch_tid(tid);
+                               if (idx == -1)
+                                       return;
+                               remove_at_index(list, idx);
+                       }
        @Happens_before:
-               Publish -> Fetch
+               Prepare -> Fetch
+               Publish -> Consume
        @End
        */
 
@@ -63,16 +253,20 @@ public:
        /**
                @Begin
                @Interface: Fetch
-               @Commit_point_set: Fetch_Succ_Point | Fetch_Fail_Point
-               @ID: (call_id_t) __RET__
+               @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
+               @ID: fetch_id(__RET__)
+               @Check:
+                       fetch_check(__RET__)
+               @Action:
+                       fetch(__RET__, __TID__);
                @End
        */
        t_element * read_fetch() {
                unsigned int rdwr = m_rdwr.load(mo_acquire);
                /**
                        @Begin
-                       @Commit_point_define_check: (unsigned int) ((rdwr>>16) & 0xFFFF) == (unsigned int) (rdwr & 0xFFFF)
-                       @Label: Fetch_Fail_Point
+                       @Potential_commit_point_define: true
+                       @Label: Fetch_Potential_Point
                        @End
                */
                unsigned int rd,wr;
@@ -80,23 +274,24 @@ public:
                        rd = (rdwr>>16) & 0xFFFF;
                        wr = rdwr & 0xFFFF;
 
-                       //model_print("cond: %d\n", (unsigned int) ((rdwr>>16) & 0xFFFF) ==
-                       //(unsigned int) (rdwr & 0xFFFF));
-                       //model_print("cond: %d\n", wr == rd);
                        if ( wr == rd ) { // empty
                                /**
-                                       //@Begin
+                                       @Begin
                                        @Commit_point_define: true
                                        @Potential_commit_point_label: Fetch_Potential_Point 
-                                       @Label: Fetch_Fail_Point
+                                       @Label: Fetch_Empty_Point
                                        @End
                                */
-                               model_print("in cond\n");
                                return false;
                        }
                        
                        bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
-                       
+                       /**
+                               @Begin
+                               @Commit_point_define_check: succ == true
+                               @Label: Fetch_Succ_Point
+                               @End
+                       */
                        if (succ)
                                break;
                        else
@@ -105,17 +300,7 @@ public:
 
                // (*1)
                rl::backoff bo;
-               
-               while ( true ) {
-                       if ((m_written.load(mo_acquire) & 0xFFFF) == wr) {
-                               /**
-                                       @Begin
-                                       @Commit_point_define_check: true 
-                                       @Label: Fetch_Succ_Point
-                                       @End
-                               */
-                               break;
-                       }
+               while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
                        thrd_yield();
                }
 
@@ -124,25 +309,72 @@ public:
                return p;
        }
 
+       /**
+               @Begin
+               @Interface: Consume
+               @Commit_point_set: Consume_Point
+               @ID: consume_id(__TID__)
+               @Check:
+                       consume_check(__TID__)
+               @Action:
+                       consume(__TID__);
+               @End
+       */
        void read_consume() {
                m_read.fetch_add(1,mo_release);
+               /**
+                       @Begin
+                       @Commit_point_define_check: true
+                       @Label: Consume_Point
+                       @End
+               */
        }
 
        //-----------------------------------------------------
 
+       /**
+               @Begin
+               @Interface: Prepare 
+               @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
+               @ID: prepare_id()
+               @Check:
+                       prepare_check(__RET__, __TID__)
+               @Action:
+                       prepare(__ID__, __RET__, __TID__);
+               @End
+       */
        t_element * write_prepare() {
                unsigned int rdwr = m_rdwr.load(mo_acquire);
+               /**
+                       @Begin
+                       @Potential_commit_point_define: true
+                       @Label: Prepare_Potential_Point
+                       @End
+               */
                unsigned int rd,wr;
                for(;;) {
                        rd = (rdwr>>16) & 0xFFFF;
                        wr = rdwr & 0xFFFF;
 
                        if ( wr == ((rd + t_size)&0xFFFF) ) { // full
+                               /**
+                                       @Begin
+                                       @Commit_point_define: true
+                                       @Potential_commit_point_label: Prepare_Potential_Point 
+                                       @Label: Prepare_Full_Point
+                                       @End
+                               */
                                return NULL;
                        }
                        
                        bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
                                ((wr+1)&0xFFFF),mo_acq_rel);
+                       /**
+                               @Begin
+                               @Commit_point_define_check: succ == true
+                               @Label: Prepare_Succ_Point
+                               @End
+                       */
                        if (succ)
                                break;
                        else
@@ -164,10 +396,14 @@ public:
                @Begin
                @Interface: Publish 
                @Commit_point_set: Publish_Point
-               @ID: (uint64_t) elem
+               @ID: publish_id(__TID__)
+               @Check:
+                       publish_check(__TID__)
+               @Action:
+                       publish(__TID__);
                @End
        */
-       void write_publish(t_element *elem)
+       void write_publish()
        {
                m_written.fetch_add(1,mo_release);
                /**
index ae8d2b73d2fc221c5f9f8bc7e29c73ad1841f77e..017670b5703a7c678fea23c9a7b0fb69867f8b71 100644 (file)
@@ -101,7 +101,9 @@ void enqueue(queue_t *q, unsigned int val)
        atomic_store_explicit(&q->nodes[node].next, tmp, relaxed);
 
        while (!success) {
+               /****FIXME: detected UL ****/
                tail = atomic_load_explicit(&q->tail, acquire);
+               /****FIXME: miss ****/
                next = atomic_load_explicit(&q->nodes[get_ptr(tail)].next, acquire);
                if (tail == atomic_load_explicit(&q->tail, relaxed)) {
 
@@ -110,6 +112,7 @@ void enqueue(queue_t *q, unsigned int val)
 
                        if (get_ptr(next) == 0) { // == NULL
                                pointer value = MAKE_POINTER(node, get_count(next) + 1);
+                               /****FIXME: first release UL, second release miss ****/
                                success = atomic_compare_exchange_strong_explicit(&q->nodes[get_ptr(tail)].next,
                                                &next, value, release, release);
                                /**
@@ -120,15 +123,18 @@ void enqueue(queue_t *q, unsigned int val)
                                */
                        }
                        if (!success) {
+                               /****FIXME: detected UL ****/
                                unsigned int ptr = get_ptr(atomic_load_explicit(&q->nodes[get_ptr(tail)].next, acquire));
                                pointer value = MAKE_POINTER(ptr,
                                                get_count(tail) + 1);
+                               /****FIXME: both miss ****/
                                atomic_compare_exchange_strong_explicit(&q->tail,
                                                &tail, value, release, release);
                                thrd_yield();
                        }
                }
        }
+       /****FIXME: first UL, second miss ****/
        atomic_compare_exchange_strong_explicit(&q->tail,
                        &tail,
                        MAKE_POINTER(node, get_count(tail) + 1),
@@ -149,8 +155,10 @@ unsigned int dequeue(queue_t *q)
        pointer next;
 
        while (!success) {
+               /****FIXME: detected correctness error ****/
                head = atomic_load_explicit(&q->head, acquire);
                tail = atomic_load_explicit(&q->tail, relaxed);
+               /****FIXME: miss ****/
                next = atomic_load_explicit(&q->nodes[get_ptr(head)].next, acquire);
                if (atomic_load_explicit(&q->head, relaxed) == head) {
                        if (get_ptr(head) == get_ptr(tail)) {
@@ -167,6 +175,7 @@ unsigned int dequeue(queue_t *q)
                                        */
                                        return 0; // NULL
                                }
+                               /****FIXME: both miss ****/
                                atomic_compare_exchange_strong_explicit(&q->tail,
                                                &tail,
                                                MAKE_POINTER(get_ptr(next), get_count(tail) + 1),
@@ -175,6 +184,7 @@ unsigned int dequeue(queue_t *q)
                        } else {
                                //value = load_32(&q->nodes[get_ptr(next)].value);
                                value = q->nodes[get_ptr(next)].value;
+                               /****FIXME: first correctness error, second miss ****/
                                success = atomic_compare_exchange_strong_explicit(&q->head,
                                                &head,
                                                MAKE_POINTER(get_ptr(next), get_count(head) + 1),