CLASS = mpmc_boundq_1_alt;
@DeclareStruct:
typedef struct elem {
- t_element* pos;
+ t_element *pos;
boolean written;
+ thread_id_t tid;
call_id_t id;
} elem;
@DeclareVar:
list = new_spec_list();
tag = new_id_tag();
@DefineFunc:
- elem* new_elem(t_element *pos, call_id_t id) {
+ elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) {
elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
e->pos = pos;
e->written = false;
e->id = id;
+ e->tid = tid;
}
@DefineFunc:
- elem* get_elem(t_element *pos) {
+ elem* get_elem_by_pos(t_element *pos) {
for (int i = 0; i < size(list); i++) {
elem *e = (elem*) elem_at_index(list, i);
if (e->pos == pos) {
return NULL;
}
@DefineFunc:
- int has_elem(t_element *pos) {
+ elem* get_elem_by_tid(thread_id_t tid) {
+ for (int i = 0; i < size(list); i++) {
+ elem *e = (elem*) elem_at_index(list, i);
+ if (e->tid== tid) {
+ return e;
+ }
+ }
+ return NULL;
+ }
+ @DefineFunc:
+ int elem_idx_by_pos(t_element *pos) {
for (int i = 0; i < size(list); i++) {
elem *existing = (elem*) elem_at_index(list, i);
if (pos == existing->pos) {
return -1;
}
@DefineFunc:
- void prepare(t_element *pos) {
- elem *e = new_elem(
- push_back(list, e);
+ int elem_idx_by_tid(thread_id_t tid) {
+ for (int i = 0; i < size(list); i++) {
+ elem *existing = (elem*) elem_at_index(list, i);
+ if (tid == existing->tid) {
+ return i;
+ }
+ }
+ return -1;
+ }
+ @DefineFunc:
+ call_id_t prepare_id() {
+ return get_and_inc(tag);
+ }
+ @DefineFunc:
+ bool prepare_check(t_element *pos, thread_id_t tid) {
+ elem *e = get_elem_by_tid(tid);
+ return NULL == e;
}
@DefineFunc:
- void publish(t_element *pos) {
- elem *e = new_elem(
+ void prepare(call_id_t id, t_element *pos, thread_id_t tid) {
+ call_id_t id = get_and_inc(tag);
+ elem *e = new_elem(pos, id, tid);
push_back(list, e);
}
@DefineFunc:
- void consume_elem(t_element *pos) {
- int idx = has_elem(pos);
+ call_id_t publish_id(thread_id_t tid) {
+ elem *e = get_elem_by_tid(tid);
+ if (NULL == e)
+ return DEFAULT_CALL_ID;
+ return e->id;
+ }
+ @DefineFunc:
+ bool publish_check(thread_id_t tid) {
+ elem *e = get_elem_by_tid(tid);
+ if (NULL == e)
+ return false;
+ return e->written;
+ }
+ @DefineFunc:
+ void publish(thread_id_t tid) {
+ elem *e = get_elem_by_tid(tid);
+ e->written = true;
+ }
+ @DefineFunc:
+ call_id_t fetch_id(t_element *pos) {
+ elem *e = get_elem_by_pos(pos);
+ if (NULL == e)
+ return DEFAULT_CALL_ID;
+ return e->id;
+ }
+ @DefineFunc:
+ bool fetch_check(t_element *pos) {
+ int idx = elem_idx_by_pos(pos);
+ if (idx == -1)
+ return false;
+ else
+ return true;
+ }
+ @DefineFunc:
+ void fetch(t_element *pos) {
+ int idx = elem_idx_by_pos(pos);
if (idx == -1)
return;
remove_at_index(list, idx);
}
-
+ @DefineFunc:
+ bool consume_check(thread_id_t tid) {
+ elem *e = get_elem_by_tid(tid);
+ if (NULL == e)
+ return false;
+ return e->written;
+ }
+ @DefineFunc:
+ call_id_t consume_id(thread_id_t tid) {
+ elem *e = get_elem_by_tid(tid);
+ if (NULL == e)
+ return DEFAULT_CALL_ID;
+ return e->id;
+ }
+ @DefineFunc:
+ void consume(thread_id_t tid) {
+ int idx = elem_idx_by_tid(tid);
+ if (idx == -1)
+ return;
+ remove_at_index(list, idx);
+ }
+ @Happens_before:
+ Prepare -> Fetch
+ Publish -> Consume
+ @End
*/
//-----------------------------------------------------
+ /**
+ @Begin
+ @Interface: Fetch
+ @Commit_point_set: Fetch_Point
+ @ID: fetch_id(__RET__)
+ @Check:
+ fetch_check(__RET__)
+ @Action:
+ fetch(__RET__);
+ @End
+ */
t_element * read_fetch() {
unsigned int rdwr = m_rdwr.load(mo_acquire);
unsigned int rd,wr;
return p;
}
+ /**
+ @Begin
+ @Interface: Consume
+ @Commit_point_set: Consume_Point
+ @ID: consume_id(__TID__)
+ @Check:
+ consume_check(__TID__)
+ @Action:
+ consume(__TID__);
+ @End
+ */
void read_consume() {
m_read.fetch_add(1,mo_release);
}
//-----------------------------------------------------
+ /**
+ @Begin
+ @Interface: Prepare
+ @Commit_point_set: Prepare_Point
+ @ID: prepare_id(__RET__)
+ @Check:
+ prepare_check(__RET__)
+ @Action:
+ prepare(__RET__);
+ @End
+ */
t_element * write_prepare() {
unsigned int rdwr = m_rdwr.load(mo_acquire);
unsigned int rd,wr;
return p;
}
+ /**
+ @Begin
+ @Interface: Publish
+ @Commit_point_set: Publish_Point
+ @ID: publish_id(__TID__)
+ @Check:
+ publish_check(__TID__)
+ @Action:
+ publish(__TID__);
+ @End
+ */
void write_publish()
{
m_written.fetch_add(1,mo_release);