edits
[cdsspec-compiler.git] / benchmark / mpmc-queue / mpmc-queue.h
index 627a58b65b8325f1ed55be465167708eaa6f0b61..ff6a168f1c248cc2db457f8fa7b7da8dc818f91a 100644 (file)
@@ -2,6 +2,12 @@
 #include <unrelacy.h>
 #include <common.h>
 
+#include <spec_lib.h>
+#include <stdlib.h>
+#include <cdsannotate.h>
+#include <specannotation.h>
+#include <model_memory.h>
+
 /**
        @Begin
        @Class_begin
@@ -11,7 +17,11 @@ template <typename t_element, size_t t_size>
 struct mpmc_boundq_1_alt
 {
 private:
-
+       
+       unsigned int MASK;
+       
+       atomic<t_element*> arr;
+       
        // elements should generally be cache-line-size padded :
        t_element               m_array[t_size];
 
@@ -23,7 +33,7 @@ private:
 
 public:
 
-       mpmc_boundq_1_alt()
+       mpmc_boundq_1_alt(int _MASK = 0xFFFF)
        {
        /**
                @Begin
@@ -33,6 +43,9 @@ public:
                m_rdwr = 0;
                m_read = 0;
                m_written = 0;
+               // For this we want MASK = 1; MASK wrap around
+               MASK = _MASK; //0x11;
+               arr = m_array;
        }
        
 
@@ -42,209 +55,28 @@ public:
                        LANG = CPP;
                        CLASS = mpmc_boundq_1_alt;
                @Global_define:
-               @DeclareStruct:
-                       typedef struct elem {
-                               t_element *pos;
-                               bool written;
-                               thread_id_t tid;
-                               thread_id_t fetch_tid;
-                               call_id_t id;
-                       } elem;
-               @DeclareVar:
-                       spec_list *list;
-                       id_tag_t *tag;
-               @InitVar:
-                       list = new_spec_list();
-                       tag = new_id_tag();
-               @DefineFunc:
-                       elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) {
-                               elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
-                               e->pos = pos;
-                               e->written = false;
-                               e->id = id;
-                               e->tid = tid;
-                               e->fetch_tid = -1;
-                       }
-               @DefineFunc:
-                       elem* get_elem_by_pos(t_element *pos) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *e = (elem*) elem_at_index(list, i);
-                                       if (e->pos == pos) {
-                                               return e;
-                                       }
-                               }
-                               return NULL;
-                       }
-               @DefineFunc:
-                       void show_list() {
-                               model_print("Status:\n");
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *e = (elem*) elem_at_index(list, i);
-                                       model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id); 
-                               }
-                       }
-               @DefineFunc:
-                       elem* get_elem_by_tid(thread_id_t tid) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *e = (elem*) elem_at_index(list, i);
-                                       if (e->tid== tid) {
-                                               return e;
-                                       }
-                               }
-                               return NULL;
-                       }
-               @DefineFunc:
-                       elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *e = (elem*) elem_at_index(list, i);
-                                       if (e->fetch_tid== fetch_tid) {
-                                               return e;
-                                       }
-                               }
-                               return NULL;
-                       }
-               @DefineFunc:
-                       int elem_idx_by_pos(t_element *pos) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *existing = (elem*) elem_at_index(list, i);
-                                       if (pos == existing->pos) {
-                                               return i;
-                                       }
-                               }
-                               return -1;
-                       }
-               @DefineFunc:
-                       int elem_idx_by_tid(thread_id_t tid) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *existing = (elem*) elem_at_index(list, i);
-                                       if (tid == existing->tid) {
-                                               return i;
-                                       }
-                               }
-                               return -1;
-                       }
-               @DefineFunc:
-                       int elem_idx_by_fetch_tid(thread_id_t fetch_tid) {
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *existing = (elem*) elem_at_index(list, i);
-                                       if (fetch_tid == existing->fetch_tid) {
-                                               return i;
-                                       }
-                               }
-                               return -1;
-                       }
-               @DefineFunc:
-                       int elem_num(t_element *pos) {
-                               int cnt = 0;
-                               for (int i = 0; i < size(list); i++) {
-                                       elem *existing = (elem*) elem_at_index(list, i);
-                                       if (pos == existing->pos) {
-                                               cnt++;
-                                       }
-                               }
-                               return cnt;
-                       }
-               @DefineFunc:
-                       call_id_t prepare_id() {
-                               call_id_t res = get_and_inc(tag);
-                               model_print("prepare_id: %d\n", res);
-                               return res;
-                       }
-               @DefineFunc:
-                       bool prepare_check(t_element *pos, thread_id_t tid) {
-                               show_list();
-                               elem *e = get_elem_by_pos(pos);
-                               model_print("prepare_check: e %d\n", e);
-                               return NULL == e;
-                       }
-               @DefineFunc:
-                       void prepare(call_id_t id, t_element *pos, thread_id_t tid) {
-                               model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid);
-                               elem *e = new_elem(pos, id, tid);
-                               push_back(list, e);
-                       }
-               @DefineFunc:
-                       call_id_t publish_id(thread_id_t tid) {
-                               elem *e = get_elem_by_tid(tid);
-                               model_print("publish_id: id %d\n", e == NULL ? 0 : e->id);
-                               if (NULL == e)
-                                       return DEFAULT_CALL_ID;
-                               return e->id;
-                       }
-               @DefineFunc:
-                       bool publish_check(thread_id_t tid) {
-                               show_list();
-                               elem *e = get_elem_by_tid(tid);
-                               model_print("publish_check: tid %d\n", tid);
-                               if (NULL == e)
-                                       return false;
-                               if (elem_num(e->pos) > 1)
-                                       return false;
-                               return !e->written;
-                       }
-               @DefineFunc:
-                       void publish(thread_id_t tid) {
-                               model_print("publish: tid %d\n", tid);
-                               elem *e = get_elem_by_tid(tid);
-                               e->written = true;
-                       }
-               @DefineFunc:
-                       call_id_t fetch_id(t_element *pos) {
-                               elem *e = get_elem_by_pos(pos);
-                               model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id);
-                               if (NULL == e)
-                                       return DEFAULT_CALL_ID;
-                               return e->id;
-                       }
-               @DefineFunc:
-                       bool fetch_check(t_element *pos) {
-                               show_list();
-                               if (pos == NULL) return true;
-                               elem *e = get_elem_by_pos(pos);
-                               model_print("fetch_check: pos %d, e %d\n", pos, e);
-                               if (e == NULL) return false;
-                               if (elem_num(e->pos) > 1)
-                                       return false;
-                               return true;
-                       }
-               @DefineFunc:
-                       void fetch(t_element *pos, thread_id_t tid) {
-                               if (pos == NULL) return;
-                               elem *e = (elem*) get_elem_by_pos(pos);
-                               model_print("fetch: pos %d, tid %d\n", pos, tid);
-                               // Remember the thread that fetches the position
-                               e->fetch_tid = tid;
-                       }
-               @DefineFunc:
-                       bool consume_check(thread_id_t tid) {
-                               show_list();
-                               elem *e = get_elem_by_fetch_tid(tid);
-                               model_print("consume_check: tid %d, e %d\n", tid, e);
-                               if (NULL == e)
-                                       return false;
-                               if (elem_num(e->pos) > 1)
-                                       return false;
-                               return e->written;
-                       }
-               @DefineFunc:
-                       call_id_t consume_id(thread_id_t tid) {
-                               elem *e = get_elem_by_fetch_tid(tid);
-                               model_print("consume_id: id %d\n", e == NULL ? 0 : e->id);
-                               if (NULL == e)
-                                       return DEFAULT_CALL_ID;
-                               return e->id;
-                       }
-               @DefineFunc:
-                       void consume(thread_id_t tid) {
-                               model_print("consume: tid %d\n", tid);
-                               int idx = elem_idx_by_fetch_tid(tid);
-                               if (idx == -1)
-                                       return;
-                               remove_at_index(list, idx);
-                       }
-       @Happens_before:
-               Prepare -> Fetch
-               Publish -> Consume
+               @Happens_before:
+                       Publish -> Fetch
+                       Consume -> Prepare
+               @Commutativity: Prepare <-> Prepare: _Method1.__RET__ !=
+               _Method2.__RET__ || !_Method1.__RET__ || !_Method2.__RET__
+               @Commutativity: Prepare <-> Publish: _Method1.__RET__ != _Method2.bin ||
+               !_Method1.__RET__
+               @Commutativity: Prepare <-> Fetch: _Method1.__RET__ != _Method2.__RET__
+               || !_Method1.__RET__ || !_Method2.__RET__
+               @Commutativity: Prepare <-> Consume : _Method1.__RET__ != _Method2.bin || !_Method1.__RET__
+
+               @Commutativity: Publish <-> Publish: _Method1.bin != _Method2.bin
+               @Commutativity: Publish <-> Fetch: _Method1.bin != _Method2.__RET__ ||
+               !_Method2.__RET__
+               @Commutativity: Publish <-> Consume : _Method1.bin != _Method2.bin
+
+               @Commutativity: Fetch <-> Fetch: _Method1.__RET__ != _Method2.__RET__ ||
+               !_Method1.__RET__ || !_Method2.__RET__
+               @Commutativity: Fetch <-> Consume : _Method1.__RET__ != _Method2.bin || !_Method1.__RET__
+
+               @Commutativity: Consume <-> Consume : _Method1.bin != _Method2.bin
+
        @End
        */
 
@@ -253,59 +85,81 @@ public:
        /**
                @Begin
                @Interface: Fetch
-               @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
-               @ID: fetch_id(__RET__)
-               @Check:
-                       fetch_check(__RET__)
-               @Action:
-                       fetch(__RET__, __TID__);
+               @Commit_point_set: FetchReadRW1 | FetchReadRW2 | FetchReadPointer
+               @ID: (call_id_t) __RET__
+               //@Action: model_print("Fetch: %d\n", __RET__);
                @End
        */
        t_element * read_fetch() {
+               // Since we have a lool to CAS the value of m_rdwr, this can be relaxed
                unsigned int rdwr = m_rdwr.load(mo_acquire);
+               //unsigned int rdwr = m_rdwr.load(mo_relaxed);
                /**
                        @Begin
-                       @Potential_commit_point_define: true
-                       @Label: Fetch_Potential_Point
+                       @Commit_point_define_check: true
+                       @Label: FetchReadRW1
                        @End
                */
                unsigned int rd,wr;
                for(;;) {
-                       rd = (rdwr>>16) & 0xFFFF;
-                       wr = rdwr & 0xFFFF;
+                       rd = (rdwr>>16) & MASK;
+                       wr = rdwr & MASK;
 
                        if ( wr == rd ) { // empty
+                               return false;
+                       }
+                       /**** Inadmissibility (testcase2.cc, MASK = 1, size = 1) ****/
+                       bool succ =
+                               m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
+                       if (succ) {
+                               break;
+                       } else {
                                /**
                                        @Begin
-                                       @Commit_point_define: true
-                                       @Potential_commit_point_label: Fetch_Potential_Point 
-                                       @Label: Fetch_Empty_Point
+                                       @Commit_point_clear: true
+                                       @Label: FetchClear1
+                                       @End
+                               */
+
+                               /**
+                                       @Begin
+                                       @Commit_point_define_check: true
+                                       @Label: FetchReadRW2
                                        @End
                                */
-                               return false;
-                       }
-                       
-                       bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
-                       if (succ)
-                               break;
-                       else
                                thrd_yield();
+                       }
                }
 
                // (*1)
                rl::backoff bo;
-               while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
-                       thrd_yield();
+               while (true) {
+                       /**** Inadmissibility ****/
+                       int written = m_written.load(mo_acquire);
+                       if ((written & MASK) != wr) {
+                               thrd_yield();
+                       } else {
+                               printf("Fetch: m_written=%d\n", written);
+                               break;
+                       }
                }
+               t_element * p = & ( m_array[ rd % t_size ] );
+
+               // This is just a hack to tell the CDSChecker that we have a memory
+               // operation here
+               arr.load(memory_order_relaxed);
                /**
                        @Begin
-                       @Commit_point_define: true
-                       @Potential_commit_point_label: Fetch_Potential_Point 
-                       @Label: Fetch_Succ_Point
+                       @Commit_point_clear: true
+                       @Label: FetchClear2
+                       @End
+               */
+               /**
+                       @Begin
+                       @Commit_point_define_check: true
+                       @Label: FetchReadPointer
                        @End
                */
-
-               t_element * p = & ( m_array[ rd % t_size ] );
                
                return p;
        }
@@ -313,20 +167,18 @@ public:
        /**
                @Begin
                @Interface: Consume
-               @Commit_point_set: Consume_Point
-               @ID: consume_id(__TID__)
-               @Check:
-                       consume_check(__TID__)
-               @Action:
-                       consume(__TID__);
+               @Commit_point_set: ConsumeFetchAdd
+               @ID: (call_id_t) bin 
+               //@Action: model_print("Consume: %d\n", bin);
                @End
        */
-       void read_consume() {
+       void read_consume(t_element *bin) {
+               /**** Inadmissibility ****/
                m_read.fetch_add(1,mo_release);
                /**
                        @Begin
                        @Commit_point_define_check: true
-                       @Label: Consume_Point
+                       @Label: ConsumeFetchAdd
                        @End
                */
        }
@@ -336,58 +188,84 @@ public:
        /**
                @Begin
                @Interface: Prepare 
-               @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
-               @ID: prepare_id()
-               @Check:
-                       prepare_check(__RET__, __TID__)
-               @Action:
-                       prepare(__ID__, __RET__, __TID__);
+               @Commit_point_set: PrepareReadRW1 | PrepareReadRW2 | PrepareReadPointer
+               @ID: (call_id_t) __RET__
+               //@Action: model_print("Prepare: %d\n", __RET__);
                @End
        */
        t_element * write_prepare() {
+               // Try weaker semantics
+               // Since we have a lool to CAS the value of m_rdwr, this can be relaxed
                unsigned int rdwr = m_rdwr.load(mo_acquire);
+               //unsigned int rdwr = m_rdwr.load(mo_relaxed);
                /**
                        @Begin
-                       @Potential_commit_point_define: true
-                       @Label: Prepare_Potential_Point
+                       @Commit_point_define_check: true
+                       @Label: PrepareReadRW1
                        @End
                */
                unsigned int rd,wr;
                for(;;) {
-                       rd = (rdwr>>16) & 0xFFFF;
-                       wr = rdwr & 0xFFFF;
+                       rd = (rdwr>>16) & MASK;
+                       wr = rdwr & MASK;
+                       //printf("write_prepare: rd=%d, wr=%d\n", rd, wr);
 
-                       if ( wr == ((rd + t_size)&0xFFFF) ) { // full
+                       if ( wr == ((rd + t_size)&MASK) ) { // full
+                               return NULL;
+                       }
+                       
+                       /**** Inadmissibility (testcase3.cc, MASK = 1, size = 1) ****/
+                       bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
+                               ((wr+1)&MASK),mo_acq_rel);
+                       //printf("wr=%d\n", (wr+1)&MASK);
+                       if (succ) {
+                               break;
+                       } else {
                                /**
                                        @Begin
-                                       @Commit_point_define: true
-                                       @Potential_commit_point_label: Prepare_Potential_Point 
-                                       @Label: Prepare_Full_Point
+                                       @Commit_point_clear: true
+                                       @Label: PrepareClear1
                                        @End
                                */
-                               return NULL;
-                       }
 
-                       if ( m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel) )
-                               break;
-                       else
+                               /**
+                                       @Begin
+                                       @Commit_point_define_check: true
+                                       @Label: PrepareReadRW2
+                                       @End
+                               */
                                thrd_yield();
+                       }
                }
 
                // (*1)
                rl::backoff bo;
-               while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
-                       thrd_yield();
+               while (true) {
+                       /**** Inadmissibility ****/
+                       int read = m_read.load(mo_acquire);
+                       if ((read & MASK) != rd)
+                               thrd_yield();
+                       else
+                               break;
                }
+
+               t_element * p = & ( m_array[ wr % t_size ] );
+
+               // This is also just a hack to tell the CDSChecker that we have a memory
+               // operation here
+               arr.load(memory_order_relaxed);
                /**
                        @Begin
-                       @Commit_point_define: true
-                       @Potential_commit_point_label: Prepare_Potential_Point 
-                       @Label: Prepare_Succ_Point
+                       @Commit_point_clear: true
+                       @Label: PrepareClear2
+                       @End
+               */
+               /**
+                       @Begin
+                       @Commit_point_define_check: true
+                       @Label: PrepareReadPointer
                        @End
                */
-
-               t_element * p = & ( m_array[ wr % t_size ] );
 
                return p;
        }
@@ -395,23 +273,22 @@ public:
        /**
                @Begin
                @Interface: Publish 
-               @Commit_point_set: Publish_Point
-               @ID: publish_id(__TID__)
-               @Check:
-                       publish_check(__TID__)
-               @Action:
-                       publish(__TID__);
+               @Commit_point_set: Publish_W_RMW
+               @ID: (call_id_t) bin 
+               //@Action: model_print("Publish: %d\n", bin);
                @End
        */
-       void write_publish()
+       void write_publish(t_element *bin)
        {
-               m_written.fetch_add(1,mo_release);
+               /**** Inadmissibility ****/
+               int tmp = m_written.fetch_add(1,mo_release);
                /**
                        @Begin
                        @Commit_point_define_check: true
-                       @Label: Publish_Point
+                       @Label: Publish_W_RMW
                        @End
                */
+               printf("publish: m_written=%d\n", tmp + 1);
        }
 
        //-----------------------------------------------------