aa1012976ee8501596d27cc7852f9006ea0e1e0c
[cdsspec-compiler.git] / benchmark / mpmc-queue / mpmc-queue.h
1 #include <stdatomic.h>
2 #include <unrelacy.h>
3 #include <common.h>
4
5 #include <spec_lib.h>
6 #include <stdlib.h>
7 #include <cdsannotate.h>
8 #include <specannotation.h>
9 #include <model_memory.h>
10
11 /**
12         @Begin
13         @Class_begin
14         @End
15 */
16 template <typename t_element, size_t t_size>
17 struct mpmc_boundq_1_alt
18 {
19 private:
20
21         // elements should generally be cache-line-size padded :
22         t_element               m_array[t_size];
23
24         // rdwr counts the reads & writes that have started
25         atomic<unsigned int>    m_rdwr;
26         // "read" and "written" count the number completed
27         atomic<unsigned int>    m_read;
28         atomic<unsigned int>    m_written;
29
30 public:
31
32         mpmc_boundq_1_alt()
33         {
34         /**
35                 @Begin
36                         @Entry_point
37                         @End
38                 */
39                 m_rdwr = 0;
40                 m_read = 0;
41                 m_written = 0;
42         }
43         
44
45         /**
46                 @Begin
47                 @Options:
48                         LANG = CPP;
49                         CLASS = mpmc_boundq_1_alt;
50                 @Global_define:
51                 //@DeclareStruct:
52                         //typedef struct elem {
53                         //      t_element *pos;
54                         //      bool written;
55                         //      thread_id_t tid;
56                         //      thread_id_t fetch_tid;
57                         //      call_id_t id;
58                 //      } elem;
59         //      @DeclareVar:
60         //              spec_list *list;
61                         //id_tag_t *tag;
62         //      @InitVar:
63         //              list = new_spec_list();
64                         //tag = new_id_tag();
65         //      @Cleanup:
66 //                      if (list)
67 //                              free_spec_list();
68         @Happens_before:
69                 Publish -> Fetch
70                 Consume -> Prepare
71         @End
72         */
73
74         //-----------------------------------------------------
75
76         /**
77                 @Begin
78                 @Interface: Fetch
79                 @Commit_point_set: Fetch_RW_Load_Empty | Fetch_RW_RMW | Fetch_W_Load
80                 @ID: (call_id_t) __RET__
81                 //@Check:
82                         //__RET__ == NULL || has_elem(list, __RET__)
83                 @End
84         */
85         t_element * read_fetch() {
86                 // Try this new weaker semantics
87                 unsigned int rdwr = m_rdwr.load(mo_acquire);
88                 //unsigned int rdwr = m_rdwr.load(mo_relaxed);
89                 /**
90                         @Begin
91                         @Potential_commit_point_define: true
92                         @Label: Fetch_Potential_RW_Load
93                         @End
94                 */
95                 unsigned int rd,wr;
96                 for(;;) {
97                         rd = (rdwr>>16) & 0xFFFF;
98                         wr = rdwr & 0xFFFF;
99
100                         if ( wr == rd ) { // empty
101
102                                 /**
103                                         @Begin
104                                         @Commit_point_define: true
105                                         @Potential_commit_point_label: Fetch_Potential_RW_Load
106                                         @Label: Fetch_RW_Load_Empty
107                                         @End
108                                 */
109
110                                 return false;
111                         }
112                         
113                         bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
114                         /**
115                                 @Begin
116                                 @Commit_point_define_check: succ
117                                 @Label: Fetch_RW_RMW
118                                 @End
119                         */
120                         if (succ)
121                                 break;
122                         else
123                                 thrd_yield();
124                 }
125
126                 // (*1)
127                 rl::backoff bo;
128                 while (true) {
129                         int written = m_written.load(mo_acquire);
130                         /**
131                                 @Begin
132                                 @Potential_commit_point_define: true
133                                 @Label: Fetch_Potential_W_Load
134                                 @End
135                         */
136                         if ((written & 0xFFFF) != wr) {
137                                 thrd_yield();
138                         } else {
139                                 break;
140                         }
141                 }
142                 
143                 /**
144                         @Begin
145                         @Commit_point_define: true
146                         @Potential_commit_point_label: Fetch_Potential_W_Load
147                         @Label: Fetch_W_Load
148                         @End
149                 */
150
151                 t_element * p = & ( m_array[ rd % t_size ] );
152                 
153                 return p;
154         }
155
156         /**
157                 @Begin
158                 @Interface: Consume
159                 @Commit_point_set: Consume_R_RMW
160                 @ID: (call_id_t) bin 
161                 //@Check:
162                 //      consume_check(__TID__)
163                 //@Action:
164                         //consume(__TID__);
165                 @End
166         */
167         void read_consume(t_element *bin) {
168                 /**** FIXME: miss ****/
169                 m_read.fetch_add(1,mo_release);
170                 /**
171                         @Begin
172                         @Commit_point_define_check: true
173                         @Label: Consume_R_RMW
174                         @End
175                 */
176         }
177
178         //-----------------------------------------------------
179
180         /**
181                 @Begin
182                 @Interface: Prepare 
183                 @Commit_point_set: Prepare_RW_Load_Full | Prepare_RW_RMW | Prepare_R_Load
184                 @ID: (call_id_t) __RET__
185                 //@Check:
186                         //prepare_check(__RET__, __TID__)
187                 //@Action:
188                         //push_back(list, __RET__);
189                 @End
190         */
191         t_element * write_prepare() {
192                 // Try weaker semantics
193                 unsigned int rdwr = m_rdwr.load(mo_acquire);
194                 //unsigned int rdwr = m_rdwr.load(mo_relaxed);
195                 /**
196                         @Begin
197                         @Potential_commit_point_define: true
198                         @Label: Prepare_Potential_RW_Load
199                         @End
200                 */
201                 unsigned int rd,wr;
202                 for(;;) {
203                         rd = (rdwr>>16) & 0xFFFF;
204                         wr = rdwr & 0xFFFF;
205
206                         if ( wr == ((rd + t_size)&0xFFFF) ) { // full
207
208                                 /**
209                                         @Begin
210                                         @Commit_point_define: true
211                                         @Potential_commit_point_label: Prepare_Potential_RW_Load
212                                         @Label: Prepare_RW_Load_Full
213                                         @End
214                                 */
215                                 return NULL;
216                         }
217                         
218                         bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
219                                 ((wr+1)&0xFFFF),mo_acq_rel);
220                         /**
221                                 @Begin
222                                 @Commit_point_define_check: succ
223                                 @Label: Prepare_RW_RMW
224                                 @End
225                         */
226                         if (succ)
227                                 break;
228                         else
229                                 thrd_yield();
230                 }
231
232                 // (*1)
233                 rl::backoff bo;
234                 while (true) {
235                         int read = m_read.load(mo_acquire);
236                         /**
237                                 @Begin
238                                 @Potential_commit_point_define: true
239                                 @Label: Prepare_Potential_R_Load
240                                 @End
241                         */
242                         if ((read & 0xFFFF) != rd)
243                                 thrd_yield();
244                         else
245                                 break;
246                 }
247
248                 /**
249                         @Begin
250                         @Commit_point_define: true
251                         @Potential_commit_point_label: Prepare_Potential_R_Load
252                         @Label: Prepare_R_Load
253                         @End
254                 */
255
256                 t_element * p = & ( m_array[ wr % t_size ] );
257
258                 return p;
259         }
260
261         /**
262                 @Begin
263                 @Interface: Publish 
264                 @Commit_point_set: Publish_W_RMW
265                 @ID: (call_id_t) bin 
266                 //@Check:
267                         //publish_check(__TID__)
268                 //@Action:
269                         //publish(__TID__);
270                 @End
271         */
272         void write_publish(t_element *bin)
273         {
274                 /**** hb violation ****/
275                 m_written.fetch_add(1,mo_release);
276                 /**
277                         @Begin
278                         @Commit_point_define_check: true
279                         @Label: Publish_W_RMW
280                         @End
281                 */
282         }
283
284         //-----------------------------------------------------
285
286
287 };
288 /**
289         @Begin
290         @Class_end
291         @End
292 */