changes to mpmc spec and add notes to ms-queue
[cdsspec-compiler.git] / benchmark / mpmc-queue / mpmc-queue.h
1 #include <stdatomic.h>
2 #include <unrelacy.h>
3 #include <common.h>
4
5 #include <spec_lib.h>
6 #include <stdlib.h>
7 #include <cdsannotate.h>
8 #include <specannotation.h>
9 #include <model_memory.h>
10
11 /**
12         @Begin
13         @Class_begin
14         @End
15 */
16 template <typename t_element, size_t t_size>
17 struct mpmc_boundq_1_alt
18 {
19 private:
20
21         // elements should generally be cache-line-size padded :
22         t_element               m_array[t_size];
23
24         // rdwr counts the reads & writes that have started
25         atomic<unsigned int>    m_rdwr;
26         // "read" and "written" count the number completed
27         atomic<unsigned int>    m_read;
28         atomic<unsigned int>    m_written;
29
30 public:
31
32         mpmc_boundq_1_alt()
33         {
34         /**
35                 @Begin
36                         @Entry_point
37                         @End
38                 */
39                 m_rdwr = 0;
40                 m_read = 0;
41                 m_written = 0;
42         }
43         
44
45         /**
46                 @Begin
47                 @Options:
48                         LANG = CPP;
49                         CLASS = mpmc_boundq_1_alt;
50                 @Global_define:
51                 @DeclareStruct:
52                         typedef struct elem {
53                                 t_element *pos;
54                                 bool written;
55                                 thread_id_t tid;
56                                 thread_id_t fetch_tid;
57                                 call_id_t id;
58                         } elem;
59                 @DeclareVar:
60                         spec_list *list;
61                         //id_tag_t *tag;
62                 @InitVar:
63                         list = new_spec_list();
64                         //tag = new_id_tag();
65         @Happens_before:
66                 Publish -> Fetch
67                 Consume -> Prepare
68         @End
69         */
70
71         //-----------------------------------------------------
72
73         /**
74                 @Begin
75                 @Interface: Fetch
76                 @Commit_point_set: Fetch_RW_Load_Empty | Fetch_RW_RMW | Fetch_W_Load
77                 @ID: (call_id_t) __RET__
78                 //@Check:
79                         //__RET__ == NULL || has_elem(list, __RET__)
80                 @End
81         */
82         t_element * read_fetch() {
83                 // Try this new weaker semantics
84                 unsigned int rdwr = m_rdwr.load(mo_acquire);
85                 //unsigned int rdwr = m_rdwr.load(mo_relaxed);
86                 /**
87                         @Begin
88                         @Potential_commit_point_define: true
89                         @Label: Fetch_Potential_RW_Load
90                         @End
91                 */
92                 unsigned int rd,wr;
93                 for(;;) {
94                         rd = (rdwr>>16) & 0xFFFF;
95                         wr = rdwr & 0xFFFF;
96
97                         if ( wr == rd ) { // empty
98
99                                 /**
100                                         @Begin
101                                         @Commit_point_define: true
102                                         @Potential_commit_point_label: Fetch_Potential_RW_Load
103                                         @Label: Fetch_RW_Load_Empty
104                                         @End
105                                 */
106
107                                 return false;
108                         }
109                         
110                         bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
111                         /**
112                                 @Begin
113                                 @Commit_point_define_check: succ
114                                 @Label: Fetch_RW_RMW
115                                 @End
116                         */
117                         if (succ)
118                                 break;
119                         else
120                                 thrd_yield();
121                 }
122
123                 // (*1)
124                 rl::backoff bo;
125                 while (true) {
126                         int written = m_written.load(mo_acquire);
127                         /**
128                                 @Begin
129                                 @Potential_commit_point_define: true
130                                 @Label: Fetch_Potential_W_Load
131                                 @End
132                         */
133                         if ((written & 0xFFFF) != wr) {
134                                 thrd_yield();
135                         } else {
136                                 break;
137                         }
138                 }
139                 
140                 /**
141                         @Begin
142                         @Commit_point_define: true
143                         @Potential_commit_point_label: Fetch_Potential_W_Load
144                         @Label: Fetch_W_Load
145                         @End
146                 */
147
148                 t_element * p = & ( m_array[ rd % t_size ] );
149                 
150                 return p;
151         }
152
153         /**
154                 @Begin
155                 @Interface: Consume
156                 @Commit_point_set: Consume_R_RMW
157                 @ID: (call_id_t) bin 
158                 //@Check:
159                 //      consume_check(__TID__)
160                 //@Action:
161                         //consume(__TID__);
162                 @End
163         */
164         void read_consume(t_element *bin) {
165                 /**** FIXME: miss ****/
166                 m_read.fetch_add(1,mo_release);
167                 /**
168                         @Begin
169                         @Commit_point_define_check: true
170                         @Label: Consume_R_RMW
171                         @End
172                 */
173         }
174
175         //-----------------------------------------------------
176
177         /**
178                 @Begin
179                 @Interface: Prepare 
180                 @Commit_point_set: Prepare_RW_Load_Full | Prepare_RW_RMW | Prepare_R_Load
181                 @ID: (call_id_t) __RET__
182                 //@Check:
183                         //prepare_check(__RET__, __TID__)
184                 //@Action:
185                         //push_back(list, __RET__);
186                 @End
187         */
188         t_element * write_prepare() {
189                 // Try weaker semantics
190                 unsigned int rdwr = m_rdwr.load(mo_acquire);
191                 //unsigned int rdwr = m_rdwr.load(mo_relaxed);
192                 /**
193                         @Begin
194                         @Potential_commit_point_define: true
195                         @Label: Prepare_Potential_RW_Load
196                         @End
197                 */
198                 unsigned int rd,wr;
199                 for(;;) {
200                         rd = (rdwr>>16) & 0xFFFF;
201                         wr = rdwr & 0xFFFF;
202
203                         if ( wr == ((rd + t_size)&0xFFFF) ) { // full
204
205                                 /**
206                                         @Begin
207                                         @Commit_point_define: true
208                                         @Potential_commit_point_label: Prepare_Potential_RW_Load
209                                         @Label: Prepare_RW_Load_Full
210                                         @End
211                                 */
212                                 return NULL;
213                         }
214                         
215                         bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
216                                 ((wr+1)&0xFFFF),mo_acq_rel);
217                         /**
218                                 @Begin
219                                 @Commit_point_define_check: succ
220                                 @Label: Prepare_RW_RMW
221                                 @End
222                         */
223                         if (succ)
224                                 break;
225                         else
226                                 thrd_yield();
227                 }
228
229                 // (*1)
230                 rl::backoff bo;
231                 while (true) {
232                         int read = m_read.load(mo_acquire);
233                         /**
234                                 @Begin
235                                 @Potential_commit_point_define: true
236                                 @Label: Prepare_Potential_R_Load
237                                 @End
238                         */
239                         if ((read & 0xFFFF) != rd)
240                                 thrd_yield();
241                         else
242                                 break;
243                 }
244
245                 /**
246                         @Begin
247                         @Commit_point_define: true
248                         @Potential_commit_point_label: Prepare_Potential_R_Load
249                         @Label: Prepare_R_Load
250                         @End
251                 */
252
253                 t_element * p = & ( m_array[ wr % t_size ] );
254
255                 return p;
256         }
257
258         /**
259                 @Begin
260                 @Interface: Publish 
261                 @Commit_point_set: Publish_W_RMW
262                 @ID: (call_id_t) bin 
263                 //@Check:
264                         //publish_check(__TID__)
265                 //@Action:
266                         //publish(__TID__);
267                 @End
268         */
269         void write_publish(t_element *bin)
270         {
271                 /**** hb violation ****/
272                 m_written.fetch_add(1,mo_release);
273                 /**
274                         @Begin
275                         @Commit_point_define_check: true
276                         @Label: Publish_W_RMW
277                         @End
278                 */
279         }
280
281         //-----------------------------------------------------
282
283
284 };
285 /**
286         @Begin
287         @Class_end
288         @End
289 */