X-Git-Url: http://plrg.eecs.uci.edu/git/?p=cdsspec-compiler.git;a=blobdiff_plain;f=benchmark%2Fmpmc-queue%2Fmpmc-queue.h;h=a1f5b1c2f40e17954217c4256ba4e04186f7fb96;hp=863710139c2e196d0e0ae8a40f6d6325b4a16989;hb=0332e0e5575b7d717bccad0b53f7a4b87c1ca969;hpb=f70d2ded0f2c2b7e64f29d1fd4da2bac84a13969;ds=sidebyside diff --git a/benchmark/mpmc-queue/mpmc-queue.h b/benchmark/mpmc-queue/mpmc-queue.h index 8637101..a1f5b1c 100644 --- a/benchmark/mpmc-queue/mpmc-queue.h +++ b/benchmark/mpmc-queue/mpmc-queue.h @@ -32,8 +32,9 @@ public: CLASS = mpmc_boundq_1_alt; @DeclareStruct: typedef struct elem { - t_element* pos; + t_element *pos; boolean written; + thread_id_t tid; call_id_t id; } elem; @DeclareVar: @@ -43,14 +44,15 @@ public: list = new_spec_list(); tag = new_id_tag(); @DefineFunc: - elem* new_elem(t_element *pos, call_id_t id) { + elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) { elem *e = (elem*) MODEL_MALLOC(sizeof(elem)); e->pos = pos; e->written = false; e->id = id; + e->tid = tid; } @DefineFunc: - elem* get_elem(t_element *pos) { + elem* get_elem_by_pos(t_element *pos) { for (int i = 0; i < size(list); i++) { elem *e = (elem*) elem_at_index(list, i); if (e->pos == pos) { @@ -60,28 +62,131 @@ public: return NULL; } @DefineFunc: - bool has_elem(elem *e) { + elem* get_elem_by_tid(thread_id_t tid) { + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + if (e->tid== tid) { + return e; + } + } + return NULL; + } + @DefineFunc: + int elem_idx_by_pos(t_element *pos) { for (int i = 0; i < size(list); i++) { elem *existing = (elem*) elem_at_index(list, i); - if (e->pos == existing->pos) { - return true; + if (pos == existing->pos) { + return i; } } - return false; + return -1; + } + @DefineFunc: + int elem_idx_by_tid(thread_id_t tid) { + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (tid == existing->tid) { + return i; + } + } + return -1; + } + @DefineFunc: + call_id_t prepare_id() { + return get_and_inc(tag); } @DefineFunc: - bool insert_elem(elem *e) { + bool prepare_check(t_element *pos, thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + return NULL == e; + } + @DefineFunc: + void prepare(call_id_t id, t_element *pos, thread_id_t tid) { + call_id_t id = get_and_inc(tag); + elem *e = new_elem(pos, id, tid); push_back(list, e); } @DefineFunc: - void consume_elem(t_element *pos) { - + call_id_t publish_id(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; } - + @DefineFunc: + bool publish_check(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return false; + return e->written; + } + @DefineFunc: + void publish(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + e->written = true; + } + @DefineFunc: + call_id_t fetch_id(t_element *pos) { + elem *e = get_elem_by_pos(pos); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + bool fetch_check(t_element *pos) { + int idx = elem_idx_by_pos(pos); + if (idx == -1) + return false; + else + return true; + } + @DefineFunc: + void fetch(t_element *pos) { + int idx = elem_idx_by_pos(pos); + if (idx == -1) + return; + remove_at_index(list, idx); + } + @DefineFunc: + bool consume_check(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return false; + return e->written; + } + @DefineFunc: + call_id_t consume_id(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + void consume(thread_id_t tid) { + int idx = elem_idx_by_tid(tid); + if (idx == -1) + return; + remove_at_index(list, idx); + } + @Happens_before: + Prepare -> Fetch + Publish -> Consume + @End */ //----------------------------------------------------- + /** + @Begin + @Interface: Fetch + @Commit_point_set: Fetch_Point + @ID: fetch_id(__RET__) + @Check: + fetch_check(__RET__) + @Action: + fetch(__RET__); + @End + */ t_element * read_fetch() { unsigned int rdwr = m_rdwr.load(mo_acquire); unsigned int rd,wr; @@ -110,12 +215,34 @@ public: return p; } + /** + @Begin + @Interface: Consume + @Commit_point_set: Consume_Point + @ID: consume_id(__TID__) + @Check: + consume_check(__TID__) + @Action: + consume(__TID__); + @End + */ void read_consume() { m_read.fetch_add(1,mo_release); } //----------------------------------------------------- + /** + @Begin + @Interface: Prepare + @Commit_point_set: Prepare_Point + @ID: prepare_id(__RET__) + @Check: + prepare_check(__RET__) + @Action: + prepare(__RET__); + @End + */ t_element * write_prepare() { unsigned int rdwr = m_rdwr.load(mo_acquire); unsigned int rd,wr; @@ -144,6 +271,17 @@ public: return p; } + /** + @Begin + @Interface: Publish + @Commit_point_set: Publish_Point + @ID: publish_id(__TID__) + @Check: + publish_check(__TID__) + @Action: + publish(__TID__); + @End + */ void write_publish() { m_written.fetch_add(1,mo_release);