7 #include <cdsannotate.h>
8 #include <specannotation.h>
9 #include <model_memory.h>
17 template <typename t_element, size_t t_size>
18 struct mpmc_boundq_1_alt
22 // elements should generally be cache-line-size padded :
23 t_element m_array[t_size];
25 // rdwr counts the reads & writes that have started
26 atomic<unsigned int> m_rdwr;
27 // "read" and "written" count the number completed
28 atomic<unsigned int> m_read;
29 atomic<unsigned int> m_written;
50 CLASS = mpmc_boundq_1_alt;
57 thread_id_t fetch_tid;
64 list = new_spec_list();
67 elem* new_elem(t_element *pos, call_id_t id, thread_id_t tid) {
68 elem *e = (elem*) MODEL_MALLOC(sizeof(elem));
76 elem* get_elem_by_pos(t_element *pos) {
77 for (int i = 0; i < size(list); i++) {
78 elem *e = (elem*) elem_at_index(list, i);
87 //model_print("Status:\n");
88 for (int i = 0; i < size(list); i++) {
89 elem *e = (elem*) elem_at_index(list, i);
90 //model_print("%d: pos %d, written %d, tid %d, fetch_tid %d, call_id %d\n", i, e->pos, e->written, e->tid, e->fetch_tid, e->id);
94 elem* get_elem_by_tid(thread_id_t tid) {
95 for (int i = 0; i < size(list); i++) {
96 elem *e = (elem*) elem_at_index(list, i);
104 elem* get_elem_by_fetch_tid(thread_id_t fetch_tid) {
105 for (int i = 0; i < size(list); i++) {
106 elem *e = (elem*) elem_at_index(list, i);
107 if (e->fetch_tid== fetch_tid) {
114 int elem_idx_by_pos(t_element *pos) {
115 for (int i = 0; i < size(list); i++) {
116 elem *existing = (elem*) elem_at_index(list, i);
117 if (pos == existing->pos) {
124 int elem_idx_by_tid(thread_id_t tid) {
125 for (int i = 0; i < size(list); i++) {
126 elem *existing = (elem*) elem_at_index(list, i);
127 if (tid == existing->tid) {
134 int elem_idx_by_fetch_tid(thread_id_t fetch_tid) {
135 for (int i = 0; i < size(list); i++) {
136 elem *existing = (elem*) elem_at_index(list, i);
137 if (fetch_tid == existing->fetch_tid) {
144 int elem_num(t_element *pos) {
146 for (int i = 0; i < size(list); i++) {
147 elem *existing = (elem*) elem_at_index(list, i);
148 if (pos == existing->pos) {
155 call_id_t prepare_id() {
156 call_id_t res = get_and_inc(tag);
157 //model_print("prepare_id: %d\n", res);
161 bool prepare_check(t_element *pos, thread_id_t tid) {
163 elem *e = get_elem_by_pos(pos);
164 //model_print("prepare_check: e %d\n", e);
168 void prepare(call_id_t id, t_element *pos, thread_id_t tid) {
169 //model_print("prepare: id %d, pos %d, tid %d\n", id, pos, tid);
170 elem *e = new_elem(pos, id, tid);
174 call_id_t publish_id(thread_id_t tid) {
175 elem *e = get_elem_by_tid(tid);
176 //model_print("publish_id: id %d\n", e == NULL ? 0 : e->id);
178 return DEFAULT_CALL_ID;
182 bool publish_check(thread_id_t tid) {
184 elem *e = get_elem_by_tid(tid);
185 //model_print("publish_check: tid %d\n", tid);
188 if (elem_num(e->pos) > 1)
193 void publish(thread_id_t tid) {
194 //model_print("publish: tid %d\n", tid);
195 elem *e = get_elem_by_tid(tid);
199 call_id_t fetch_id(t_element *pos) {
200 elem *e = get_elem_by_pos(pos);
201 //model_print("fetch_id: id %d\n", e == NULL ? 0 : e->id);
203 return DEFAULT_CALL_ID;
207 bool fetch_check(t_element *pos) {
209 if (pos == NULL) return true;
210 elem *e = get_elem_by_pos(pos);
211 //model_print("fetch_check: pos %d, e %d\n", pos, e);
212 if (e == NULL) return false;
213 if (elem_num(e->pos) > 1)
218 void fetch(t_element *pos, thread_id_t tid) {
219 if (pos == NULL) return;
220 elem *e = (elem*) get_elem_by_pos(pos);
221 //model_print("fetch: pos %d, tid %d\n", pos, tid);
222 // Remember the thread that fetches the position
226 bool consume_check(thread_id_t tid) {
228 elem *e = get_elem_by_fetch_tid(tid);
229 //model_print("consume_check: tid %d, e %d\n", tid, e);
232 if (elem_num(e->pos) > 1)
237 call_id_t consume_id(thread_id_t tid) {
238 elem *e = get_elem_by_fetch_tid(tid);
239 //model_print("consume_id: id %d\n", e == NULL ? 0 : e->id);
241 return DEFAULT_CALL_ID;
245 void consume(thread_id_t tid) {
246 //model_print("consume: tid %d\n", tid);
247 int idx = elem_idx_by_fetch_tid(tid);
250 remove_at_index(list, idx);
258 //-----------------------------------------------------
263 @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
264 @ID: fetch_id(__RET__)
268 fetch(__RET__, __TID__);
271 t_element * read_fetch() {
272 unsigned int rdwr = m_rdwr.load(mo_acquire);
275 @Potential_commit_point_define: true
276 @Label: Fetch_Potential_Point
281 rd = (rdwr>>16) & 0xFFFF;
284 if ( wr == rd ) { // empty
287 @Commit_point_define: true
288 @Potential_commit_point_label: Fetch_Potential_Point
289 @Label: Fetch_Empty_Point
295 bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
298 @Commit_point_define_check: succ == true
299 @Label: Fetch_Succ_Point
310 while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
314 t_element * p = & ( m_array[ rd % t_size ] );
322 @Commit_point_set: Consume_Point
323 @ID: consume_id(__TID__)
325 consume_check(__TID__)
330 void read_consume() {
331 m_read.fetch_add(1,mo_release);
334 @Commit_point_define_check: true
335 @Label: Consume_Point
340 //-----------------------------------------------------
345 @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
348 prepare_check(__RET__, __TID__)
350 prepare(__ID__, __RET__, __TID__);
353 t_element * write_prepare() {
354 unsigned int rdwr = m_rdwr.load(mo_acquire);
357 @Potential_commit_point_define: true
358 @Label: Prepare_Potential_Point
363 rd = (rdwr>>16) & 0xFFFF;
366 if ( wr == ((rd + t_size)&0xFFFF) ) { // full
369 @Commit_point_define: true
370 @Potential_commit_point_label: Prepare_Potential_Point
371 @Label: Prepare_Full_Point
377 bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
378 ((wr+1)&0xFFFF),mo_acq_rel);
381 @Commit_point_define_check: succ == true
382 @Label: Prepare_Succ_Point
393 while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
397 t_element * p = & ( m_array[ wr % t_size ] );
405 @Commit_point_set: Publish_Point
406 @ID: publish_id(__TID__)
408 publish_check(__TID__)
415 m_written.fetch_add(1,mo_release);
418 @Commit_point_define_check: true
419 @Label: Publish_Point
424 //-----------------------------------------------------