X-Git-Url: http://plrg.eecs.uci.edu/git/?p=cdsspec-compiler.git;a=blobdiff_plain;f=benchmark%2Fmpmc-queue%2Fmpmc-queue.h;h=a1f5b1c2f40e17954217c4256ba4e04186f7fb96;hp=aba325a05635692217c814cb482610e4aeeb5d04;hb=0332e0e5575b7d717bccad0b53f7a4b87c1ca969;hpb=b5590a7174ba510c646b9033c1df4951f9a1efae diff --git a/benchmark/mpmc-queue/mpmc-queue.h b/benchmark/mpmc-queue/mpmc-queue.h index aba325a..a1f5b1c 100644 --- a/benchmark/mpmc-queue/mpmc-queue.h +++ b/benchmark/mpmc-queue/mpmc-queue.h @@ -32,8 +32,9 @@ public: CLASS = mpmc_boundq_1_alt; @DeclareStruct: typedef struct elem { - t_element* pos; + t_element *pos; boolean written; + thread_id_t tid; call_id_t id; } elem; @DeclareVar: @@ -43,14 +44,15 @@ public: list = new_spec_list(); tag = new_id_tag(); @DefineFunc: - elem* new_elem(t_element *pos, call_id_t id) { + elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) { elem *e = (elem*) MODEL_MALLOC(sizeof(elem)); e->pos = pos; e->written = false; e->id = id; + e->tid = tid; } @DefineFunc: - elem* get_elem(t_element *pos) { + elem* get_elem_by_pos(t_element *pos) { for (int i = 0; i < size(list); i++) { elem *e = (elem*) elem_at_index(list, i); if (e->pos == pos) { @@ -60,7 +62,17 @@ public: return NULL; } @DefineFunc: - int has_elem(t_element *pos) { + elem* get_elem_by_tid(thread_id_t tid) { + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + if (e->tid== tid) { + return e; + } + } + return NULL; + } + @DefineFunc: + int elem_idx_by_pos(t_element *pos) { for (int i = 0; i < size(list); i++) { elem *existing = (elem*) elem_at_index(list, i); if (pos == existing->pos) { @@ -70,27 +82,111 @@ public: return -1; } @DefineFunc: - void prepare(t_element *pos) { - elem *e = new_elem( - push_back(list, e); + int elem_idx_by_tid(thread_id_t tid) { + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (tid == existing->tid) { + return i; + } + } + return -1; + } + @DefineFunc: + call_id_t prepare_id() { + return get_and_inc(tag); + } + @DefineFunc: + bool prepare_check(t_element *pos, thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + return NULL == e; } @DefineFunc: - void publish(t_element *pos) { - elem *e = new_elem( + void prepare(call_id_t id, t_element *pos, thread_id_t tid) { + call_id_t id = get_and_inc(tag); + elem *e = new_elem(pos, id, tid); push_back(list, e); } @DefineFunc: - void consume_elem(t_element *pos) { - int idx = has_elem(pos); + call_id_t publish_id(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + bool publish_check(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return false; + return e->written; + } + @DefineFunc: + void publish(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + e->written = true; + } + @DefineFunc: + call_id_t fetch_id(t_element *pos) { + elem *e = get_elem_by_pos(pos); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + bool fetch_check(t_element *pos) { + int idx = elem_idx_by_pos(pos); + if (idx == -1) + return false; + else + return true; + } + @DefineFunc: + void fetch(t_element *pos) { + int idx = elem_idx_by_pos(pos); if (idx == -1) return; remove_at_index(list, idx); } - + @DefineFunc: + bool consume_check(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return false; + return e->written; + } + @DefineFunc: + call_id_t consume_id(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + void consume(thread_id_t tid) { + int idx = elem_idx_by_tid(tid); + if (idx == -1) + return; + remove_at_index(list, idx); + } + @Happens_before: + Prepare -> Fetch + Publish -> Consume + @End */ //----------------------------------------------------- + /** + @Begin + @Interface: Fetch + @Commit_point_set: Fetch_Point + @ID: fetch_id(__RET__) + @Check: + fetch_check(__RET__) + @Action: + fetch(__RET__); + @End + */ t_element * read_fetch() { unsigned int rdwr = m_rdwr.load(mo_acquire); unsigned int rd,wr; @@ -119,12 +215,34 @@ public: return p; } + /** + @Begin + @Interface: Consume + @Commit_point_set: Consume_Point + @ID: consume_id(__TID__) + @Check: + consume_check(__TID__) + @Action: + consume(__TID__); + @End + */ void read_consume() { m_read.fetch_add(1,mo_release); } //----------------------------------------------------- + /** + @Begin + @Interface: Prepare + @Commit_point_set: Prepare_Point + @ID: prepare_id(__RET__) + @Check: + prepare_check(__RET__) + @Action: + prepare(__RET__); + @End + */ t_element * write_prepare() { unsigned int rdwr = m_rdwr.load(mo_acquire); unsigned int rd,wr; @@ -153,6 +271,17 @@ public: return p; } + /** + @Begin + @Interface: Publish + @Commit_point_set: Publish_Point + @ID: publish_id(__TID__) + @Check: + publish_check(__TID__) + @Action: + publish(__TID__); + @End + */ void write_publish() { m_written.fetch_add(1,mo_release);