X-Git-Url: http://plrg.eecs.uci.edu/git/?p=cdsspec-compiler.git;a=blobdiff_plain;f=benchmark%2Fmpmc-queue%2Fmpmc-queue.h;h=a1f5b1c2f40e17954217c4256ba4e04186f7fb96;hp=1e59f43fa5130f1ed795b4935f093c1351ee64f6;hb=0332e0e5575b7d717bccad0b53f7a4b87c1ca969;hpb=0a040a5b619952d29338f73db8f685d7367178fc;ds=sidebyside diff --git a/benchmark/mpmc-queue/mpmc-queue.h b/benchmark/mpmc-queue/mpmc-queue.h index 1e59f43..a1f5b1c 100644 --- a/benchmark/mpmc-queue/mpmc-queue.h +++ b/benchmark/mpmc-queue/mpmc-queue.h @@ -26,12 +26,167 @@ public: /** - @Global_define: - Order_queue spec_queue; + @Global_define: + @Options: + LANG = CPP; + CLASS = mpmc_boundq_1_alt; + @DeclareStruct: + typedef struct elem { + t_element *pos; + boolean written; + thread_id_t tid; + call_id_t id; + } elem; + @DeclareVar: + spec_list *list; + id_tag_t *tag; + @InitVar: + list = new_spec_list(); + tag = new_id_tag(); + @DefineFunc: + elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) { + elem *e = (elem*) MODEL_MALLOC(sizeof(elem)); + e->pos = pos; + e->written = false; + e->id = id; + e->tid = tid; + } + @DefineFunc: + elem* get_elem_by_pos(t_element *pos) { + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + if (e->pos == pos) { + return e; + } + } + return NULL; + } + @DefineFunc: + elem* get_elem_by_tid(thread_id_t tid) { + for (int i = 0; i < size(list); i++) { + elem *e = (elem*) elem_at_index(list, i); + if (e->tid== tid) { + return e; + } + } + return NULL; + } + @DefineFunc: + int elem_idx_by_pos(t_element *pos) { + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (pos == existing->pos) { + return i; + } + } + return -1; + } + @DefineFunc: + int elem_idx_by_tid(thread_id_t tid) { + for (int i = 0; i < size(list); i++) { + elem *existing = (elem*) elem_at_index(list, i); + if (tid == existing->tid) { + return i; + } + } + return -1; + } + @DefineFunc: + call_id_t prepare_id() { + return get_and_inc(tag); + } + @DefineFunc: + bool prepare_check(t_element *pos, thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + return NULL == e; + } + @DefineFunc: + void prepare(call_id_t id, t_element *pos, thread_id_t tid) { + call_id_t id = get_and_inc(tag); + elem *e = new_elem(pos, id, tid); + push_back(list, e); + } + @DefineFunc: + call_id_t publish_id(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + bool publish_check(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return false; + return e->written; + } + @DefineFunc: + void publish(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + e->written = true; + } + @DefineFunc: + call_id_t fetch_id(t_element *pos) { + elem *e = get_elem_by_pos(pos); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + bool fetch_check(t_element *pos) { + int idx = elem_idx_by_pos(pos); + if (idx == -1) + return false; + else + return true; + } + @DefineFunc: + void fetch(t_element *pos) { + int idx = elem_idx_by_pos(pos); + if (idx == -1) + return; + remove_at_index(list, idx); + } + @DefineFunc: + bool consume_check(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return false; + return e->written; + } + @DefineFunc: + call_id_t consume_id(thread_id_t tid) { + elem *e = get_elem_by_tid(tid); + if (NULL == e) + return DEFAULT_CALL_ID; + return e->id; + } + @DefineFunc: + void consume(thread_id_t tid) { + int idx = elem_idx_by_tid(tid); + if (idx == -1) + return; + remove_at_index(list, idx); + } + @Happens_before: + Prepare -> Fetch + Publish -> Consume + @End */ //----------------------------------------------------- + /** + @Begin + @Interface: Fetch + @Commit_point_set: Fetch_Point + @ID: fetch_id(__RET__) + @Check: + fetch_check(__RET__) + @Action: + fetch(__RET__); + @End + */ t_element * read_fetch() { unsigned int rdwr = m_rdwr.load(mo_acquire); unsigned int rd,wr; @@ -57,29 +212,37 @@ public: t_element * p = & ( m_array[ rd % t_size ] ); - /** - @Commit_point_Check: true - @Label: ANY - @Check: - spec_queue.peak() == p - */ return p; } + /** + @Begin + @Interface: Consume + @Commit_point_set: Consume_Point + @ID: consume_id(__TID__) + @Check: + consume_check(__TID__) + @Action: + consume(__TID__); + @End + */ void read_consume() { m_read.fetch_add(1,mo_release); - /** - @Commit_point_define: true - @Label: Read_Consume_Success - @Check: - spec_queue.size() > 0 - @Action: - spec_queue.remove(); - */ } //----------------------------------------------------- + /** + @Begin + @Interface: Prepare + @Commit_point_set: Prepare_Point + @ID: prepare_id(__RET__) + @Check: + prepare_check(__RET__) + @Action: + prepare(__RET__); + @End + */ t_element * write_prepare() { unsigned int rdwr = m_rdwr.load(mo_acquire); unsigned int rd,wr; @@ -105,13 +268,20 @@ public: t_element * p = & ( m_array[ wr % t_size ] ); - /** - @Commit_point_check: ANY - @Action: spec_queue.add(p); - */ return p; } + /** + @Begin + @Interface: Publish + @Commit_point_set: Publish_Point + @ID: publish_id(__TID__) + @Check: + publish_check(__TID__) + @Action: + publish(__TID__); + @End + */ void write_publish() { m_written.fetch_add(1,mo_release);