7 #include <cdsannotate.h>
8 #include <specannotation.h>
9 #include <model_memory.h>
16 template <typename t_element, size_t t_size>
17 struct mpmc_boundq_1_alt
21 // elements should generally be cache-line-size padded :
22 t_element m_array[t_size];
24 // rdwr counts the reads & writes that have started
25 atomic<unsigned int> m_rdwr;
26 // "read" and "written" count the number completed
27 atomic<unsigned int> m_read;
28 atomic<unsigned int> m_written;
49 CLASS = mpmc_boundq_1_alt;
56 thread_id_t fetch_tid;
63 list = new_spec_list();
66 elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) {
67 elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
75 elem* get_elem_by_pos(t_element *pos) {
76 for (int i = 0; i < size(list); i++) {
77 elem *e = (elem*) elem_at_index(list, i);
86 //model_print("Status:\n");
87 for (int i = 0; i < size(list); i++) {
88 elem *e = (elem*) elem_at_index(list, i);
89 //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id);
93 elem* get_elem_by_tid(thread_id_t tid) {
94 for (int i = 0; i < size(list); i++) {
95 elem *e = (elem*) elem_at_index(list, i);
103 elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) {
104 for (int i = 0; i < size(list); i++) {
105 elem *e = (elem*) elem_at_index(list, i);
106 if (e->fetch_tid== fetch_tid) {
113 int elem_idx_by_pos(t_element *pos) {
114 for (int i = 0; i < size(list); i++) {
115 elem *existing = (elem*) elem_at_index(list, i);
116 if (pos == existing->pos) {
123 int elem_idx_by_tid(thread_id_t tid) {
124 for (int i = 0; i < size(list); i++) {
125 elem *existing = (elem*) elem_at_index(list, i);
126 if (tid == existing->tid) {
133 int elem_idx_by_fetch_tid(thread_id_t fetch_tid) {
134 for (int i = 0; i < size(list); i++) {
135 elem *existing = (elem*) elem_at_index(list, i);
136 if (fetch_tid == existing->fetch_tid) {
143 int elem_num(t_element *pos) {
145 for (int i = 0; i < size(list); i++) {
146 elem *existing = (elem*) elem_at_index(list, i);
147 if (pos == existing->pos) {
154 call_id_t prepare_id() {
155 call_id_t res = get_and_inc(tag);
156 //model_print("prepare_id: %d\n", res);
160 bool prepare_check(t_element *pos, thread_id_t tid) {
162 elem *e = get_elem_by_pos(pos);
163 //model_print("prepare_check: e %d\n", e);
167 void prepare(call_id_t id, t_element *pos, thread_id_t tid) {
168 //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid);
169 elem *e = new_elem(pos, id, tid);
173 call_id_t publish_id(thread_id_t tid) {
174 elem *e = get_elem_by_tid(tid);
175 //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id);
177 return DEFAULT_CALL_ID;
181 bool publish_check(thread_id_t tid) {
183 elem *e = get_elem_by_tid(tid);
184 //model_print("publish_check: tid %d\n", tid);
187 if (elem_num(e->pos) > 1)
192 void publish(thread_id_t tid) {
193 //model_print("publish: tid %d\n", tid);
194 elem *e = get_elem_by_tid(tid);
198 call_id_t fetch_id(t_element *pos) {
199 elem *e = get_elem_by_pos(pos);
200 //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id);
202 return DEFAULT_CALL_ID;
206 bool fetch_check(t_element *pos) {
208 if (pos == NULL) return true;
209 elem *e = get_elem_by_pos(pos);
210 //model_print("fetch_check: pos %d, e %d\n", pos, e);
211 if (e == NULL) return false;
212 if (elem_num(e->pos) > 1)
217 void fetch(t_element *pos, thread_id_t tid) {
218 if (pos == NULL) return;
219 elem *e = (elem*) get_elem_by_pos(pos);
220 //model_print("fetch: pos %d, tid %d\n", pos, tid);
221 // Remember the thread that fetches the position
225 bool consume_check(thread_id_t tid) {
227 elem *e = get_elem_by_fetch_tid(tid);
228 //model_print("consume_check: tid %d, e %d\n", tid, e);
231 if (elem_num(e->pos) > 1)
236 call_id_t consume_id(thread_id_t tid) {
237 elem *e = get_elem_by_fetch_tid(tid);
238 //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id);
240 return DEFAULT_CALL_ID;
244 void consume(thread_id_t tid) {
245 //model_print("consume: tid %d\n", tid);
246 int idx = elem_idx_by_fetch_tid(tid);
249 remove_at_index(list, idx);
257 //-----------------------------------------------------
262 @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
263 @ID: fetch_id(__RET__)
267 fetch(__RET__, __TID__);
270 t_element * read_fetch() {
271 unsigned int rdwr = m_rdwr.load(mo_acquire);
274 @Potential_commit_point_define: true
275 @Label: Fetch_Potential_Point
280 rd = (rdwr>>16) & 0xFFFF;
283 if ( wr == rd ) { // empty
286 @Commit_point_define: true
287 @Potential_commit_point_label: Fetch_Potential_Point
288 @Label: Fetch_Empty_Point
294 bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
297 @Commit_point_define_check: succ == true
298 @Label: Fetch_Succ_Point
309 while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
313 t_element * p = & ( m_array[ rd % t_size ] );
321 @Commit_point_set: Consume_Point
322 @ID: consume_id(__TID__)
324 consume_check(__TID__)
329 void read_consume() {
330 m_read.fetch_add(1,mo_release);
333 @Commit_point_define_check: true
334 @Label: Consume_Point
339 //-----------------------------------------------------
344 @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
347 prepare_check(__RET__, __TID__)
349 prepare(__ID__, __RET__, __TID__);
352 t_element * write_prepare() {
353 unsigned int rdwr = m_rdwr.load(mo_acquire);
356 @Potential_commit_point_define: true
357 @Label: Prepare_Potential_Point
362 rd = (rdwr>>16) & 0xFFFF;
365 if ( wr == ((rd + t_size)&0xFFFF) ) { // full
368 @Commit_point_define: true
369 @Potential_commit_point_label: Prepare_Potential_Point
370 @Label: Prepare_Full_Point
376 bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
377 ((wr+1)&0xFFFF),mo_acq_rel);
380 @Commit_point_define_check: succ == true
381 @Label: Prepare_Succ_Point
392 while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
396 t_element * p = & ( m_array[ wr % t_size ] );
404 @Commit_point_set: Publish_Point
405 @ID: publish_id(__TID__)
407 publish_check(__TID__)
414 m_written.fetch_add(1,mo_release);
417 @Commit_point_define_check: true
418 @Label: Publish_Point
423 //-----------------------------------------------------