#include #include #include /** @Begin @Class_begin @End */ template struct mpmc_boundq_1_alt { private: // elements should generally be cache-line-size padded : t_element m_array[t_size]; // rdwr counts the reads & writes that have started atomic m_rdwr; // "read" and "written" count the number completed atomic m_read; atomic m_written; public: mpmc_boundq_1_alt() { /** @Begin @Entry_point @End */ m_rdwr = 0; m_read = 0; m_written = 0; } /** @Begin @Options: LANG = CPP; CLASS = mpmc_boundq_1_alt; @Global_define: @DeclareStruct: typedef struct elem { t_element *pos; bool written; thread_id_t tid; thread_id_t fetch_tid; call_id_t id; } elem; @DeclareVar: spec_list *list; id_tag_t *tag; @InitVar: list = new_spec_list(); tag = new_id_tag(); @DefineFunc: elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) { elem *e = (elem*) MODEL_MALLOC(sizeof(elem)); e->pos = pos; e->written = false; e->id = id; e->tid = tid; e->fetch_tid = -1; } @DefineFunc: elem* get_elem_by_pos(t_element *pos) { for (int i = 0; i < size(list); i++) { elem *e = (elem*) elem_at_index(list, i); if (e->pos == pos) { return e; } } return NULL; } @DefineFunc: void show_list() { //model_print("Status:\n"); for (int i = 0; i < size(list); i++) { elem *e = (elem*) elem_at_index(list, i); //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id); } } @DefineFunc: elem* get_elem_by_tid(thread_id_t tid) { for (int i = 0; i < size(list); i++) { elem *e = (elem*) elem_at_index(list, i); if (e->tid== tid) { return e; } } return NULL; } @DefineFunc: elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) { for (int i = 0; i < size(list); i++) { elem *e = (elem*) elem_at_index(list, i); if (e->fetch_tid== fetch_tid) { return e; } } return NULL; } @DefineFunc: int elem_idx_by_pos(t_element *pos) { for (int i = 0; i < size(list); i++) { elem *existing = (elem*) elem_at_index(list, i); if (pos == existing->pos) { return i; } } return -1; } @DefineFunc: int elem_idx_by_tid(thread_id_t tid) { for (int i = 0; i < size(list); i++) { elem *existing = (elem*) elem_at_index(list, i); if (tid == existing->tid) { return i; } } return -1; } @DefineFunc: int elem_idx_by_fetch_tid(thread_id_t fetch_tid) { for (int i = 0; i < size(list); i++) { elem *existing = (elem*) elem_at_index(list, i); if (fetch_tid == existing->fetch_tid) { return i; } } return -1; } @DefineFunc: int elem_num(t_element *pos) { int cnt = 0; for (int i = 0; i < size(list); i++) { elem *existing = (elem*) elem_at_index(list, i); if (pos == existing->pos) { cnt++; } } return cnt; } @DefineFunc: call_id_t prepare_id() { call_id_t res = get_and_inc(tag); //model_print("prepare_id: %d\n", res); return res; } @DefineFunc: bool prepare_check(t_element *pos, thread_id_t tid) { show_list(); elem *e = get_elem_by_pos(pos); //model_print("prepare_check: e %d\n", e); return NULL == e; } @DefineFunc: void prepare(call_id_t id, t_element *pos, thread_id_t tid) { //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid); elem *e = new_elem(pos, id, tid); push_back(list, e); } @DefineFunc: call_id_t publish_id(thread_id_t tid) { elem *e = get_elem_by_tid(tid); //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id); if (NULL == e) return DEFAULT_CALL_ID; return e->id; } @DefineFunc: bool publish_check(thread_id_t tid) { show_list(); elem *e = get_elem_by_tid(tid); //model_print("publish_check: tid %d\n", tid); if (NULL == e) return false; if (elem_num(e->pos) > 1) return false; return !e->written; } @DefineFunc: void publish(thread_id_t tid) { //model_print("publish: tid %d\n", tid); elem *e = get_elem_by_tid(tid); e->written = true; } @DefineFunc: call_id_t fetch_id(t_element *pos) { elem *e = get_elem_by_pos(pos); //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id); if (NULL == e) return DEFAULT_CALL_ID; return e->id; } @DefineFunc: bool fetch_check(t_element *pos) { show_list(); if (pos == NULL) return true; elem *e = get_elem_by_pos(pos); //model_print("fetch_check: pos %d, e %d\n", pos, e); if (e == NULL) return false; if (elem_num(e->pos) > 1) return false; return true; } @DefineFunc: void fetch(t_element *pos, thread_id_t tid) { if (pos == NULL) return; elem *e = (elem*) get_elem_by_pos(pos); //model_print("fetch: pos %d, tid %d\n", pos, tid); // Remember the thread that fetches the position e->fetch_tid = tid; } @DefineFunc: bool consume_check(thread_id_t tid) { show_list(); elem *e = get_elem_by_fetch_tid(tid); //model_print("consume_check: tid %d, e %d\n", tid, e); if (NULL == e) return false; if (elem_num(e->pos) > 1) return false; return e->written; } @DefineFunc: call_id_t consume_id(thread_id_t tid) { elem *e = get_elem_by_fetch_tid(tid); //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id); if (NULL == e) return DEFAULT_CALL_ID; return e->id; } @DefineFunc: void consume(thread_id_t tid) { //model_print("consume: tid %d\n", tid); int idx = elem_idx_by_fetch_tid(tid); if (idx == -1) return; remove_at_index(list, idx); } @Happens_before: Prepare -> Fetch Publish -> Consume @End */ //----------------------------------------------------- /** @Begin @Interface: Fetch @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point @ID: fetch_id(__RET__) @Check: fetch_check(__RET__) @Action: fetch(__RET__, __TID__); @End */ t_element * read_fetch() { unsigned int rdwr = m_rdwr.load(mo_acquire); /** @Begin @Potential_commit_point_define: true @Label: Fetch_Potential_Point @End */ unsigned int rd,wr; for(;;) { rd = (rdwr>>16) & 0xFFFF; wr = rdwr & 0xFFFF; if ( wr == rd ) { // empty /** @Begin @Commit_point_define: true @Potential_commit_point_label: Fetch_Potential_Point @Label: Fetch_Empty_Point @End */ return false; } bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel); /** @Begin @Commit_point_define_check: succ == true @Label: Fetch_Succ_Point @End */ if (succ) break; else thrd_yield(); } // (*1) rl::backoff bo; while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) { thrd_yield(); } t_element * p = & ( m_array[ rd % t_size ] ); return p; } /** @Begin @Interface: Consume @Commit_point_set: Consume_Point @ID: consume_id(__TID__) @Check: consume_check(__TID__) @Action: consume(__TID__); @End */ void read_consume() { m_read.fetch_add(1,mo_release); /** @Begin @Commit_point_define_check: true @Label: Consume_Point @End */ } //----------------------------------------------------- /** @Begin @Interface: Prepare @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point @ID: prepare_id() @Check: prepare_check(__RET__, __TID__) @Action: prepare(__ID__, __RET__, __TID__); @End */ t_element * write_prepare() { unsigned int rdwr = m_rdwr.load(mo_acquire); /** @Begin @Potential_commit_point_define: true @Label: Prepare_Potential_Point @End */ unsigned int rd,wr; for(;;) { rd = (rdwr>>16) & 0xFFFF; wr = rdwr & 0xFFFF; if ( wr == ((rd + t_size)&0xFFFF) ) { // full /** @Begin @Commit_point_define: true @Potential_commit_point_label: Prepare_Potential_Point @Label: Prepare_Full_Point @End */ return NULL; } bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) | ((wr+1)&0xFFFF),mo_acq_rel); /** @Begin @Commit_point_define_check: succ == true @Label: Prepare_Succ_Point @End */ if (succ) break; else thrd_yield(); } // (*1) rl::backoff bo; while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) { thrd_yield(); } t_element * p = & ( m_array[ wr % t_size ] ); return p; } /** @Begin @Interface: Publish @Commit_point_set: Publish_Point @ID: publish_id(__TID__) @Check: publish_check(__TID__) @Action: publish(__TID__); @End */ void write_publish() { m_written.fetch_add(1,mo_release); /** @Begin @Commit_point_define_check: true @Label: Publish_Point @End */ } //----------------------------------------------------- }; /** @Begin @Class_end @End */