minor fix
[cdsspec-compiler.git] / benchmark / mpmc-queue / mpmc-queue.h
1 #include <stdatomic.h>
2 #include <unrelacy.h>
3 #include <common.h>
4
5 #include <spec_lib.h>
6 #include <stdlib.h>
7 #include <cdsannotate.h>
8 #include <specannotation.h>
9 #include <model_memory.h>
10
11 /**
12         @Begin
13         @Class_begin
14         @End
15 */
16 template <typename t_element, size_t t_size>
17 struct mpmc_boundq_1_alt
18 {
19 private:
20
21         // elements should generally be cache-line-size padded :
22         t_element               m_array[t_size];
23
24         // rdwr counts the reads & writes that have started
25         atomic<unsigned int>    m_rdwr;
26         // "read" and "written" count the number completed
27         atomic<unsigned int>    m_read;
28         atomic<unsigned int>    m_written;
29
30 public:
31
32         mpmc_boundq_1_alt()
33         {
34         /**
35                 @Begin
36                         @Entry_point
37                         @End
38                 */
39                 m_rdwr = 0;
40                 m_read = 0;
41                 m_written = 0;
42         }
43         
44
45         /**
46                 @Begin
47                 @Options:
48                         LANG = CPP;
49                         CLASS = mpmc_boundq_1_alt;
50                 @Global_define:
51                 @DeclareStruct:
52                         typedef struct elem {
53                                 t_element *pos;
54                                 bool written;
55                                 thread_id_t tid;
56                                 thread_id_t fetch_tid;
57                                 call_id_t id;
58                         } elem;
59                 @DeclareVar:
60                         spec_list *list;
61                         //id_tag_t *tag;
62                 @InitVar:
63                         list = new_spec_list();
64                         //tag = new_id_tag();
65         @Happens_before:
66                 Publish -> Fetch
67                 Consume -> Prepare
68         @End
69         */
70
71         //-----------------------------------------------------
72
73         /**
74                 @Begin
75                 @Interface: Fetch
76                 @Commit_point_set: Fetch_Empty_Point | Fetch_Succ_Point
77                 @ID: (call_id_t) __RET__
78                 //@Check:
79                         //__RET__ == NULL || has_elem(list, __RET__)
80                 @End
81         */
82         t_element * read_fetch() {
83                 // Try this new weaker semantics
84                 //unsigned int rdwr = m_rdwr.load(mo_acquire);
85                 unsigned int rdwr = m_rdwr.load(mo_relaxed);
86                 /**
87                         @Begin
88                         @Potential_commit_point_define: true
89                         @Label: Fetch_Potential_Point
90                         @End
91                 */
92                 unsigned int rd,wr;
93                 for(;;) {
94                         rd = (rdwr>>16) & 0xFFFF;
95                         wr = rdwr & 0xFFFF;
96
97                         if ( wr == rd ) { // empty
98
99                                 /**
100                                         @Begin
101                                         @Commit_point_define: true
102                                         @Potential_commit_point_label: Fetch_Potential_Point 
103                                         @Label: Fetch_Empty_Point
104                                         @End
105                                 */
106
107                                 return false;
108                         }
109                         
110                         bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
111                         /**
112                                 @Begin
113                                 @Commit_point_define_check: succ == true
114                                 @Label: Fetch_Succ_Point
115                                 @End
116                         */
117                         if (succ)
118                                 break;
119                         else
120                                 thrd_yield();
121                 }
122
123                 // (*1)
124                 rl::backoff bo;
125                 while ( (m_written.load(mo_acquire) & 0xFFFF) != wr ) {
126                         thrd_yield();
127                 }
128
129                 t_element * p = & ( m_array[ rd % t_size ] );
130                 
131                 return p;
132         }
133
134         /**
135                 @Begin
136                 @Interface: Consume
137                 @Commit_point_set: Consume_Point
138                 @ID: (call_id_t) bin 
139                 //@Check:
140                 //      consume_check(__TID__)
141                 //@Action:
142                         //consume(__TID__);
143                 @End
144         */
145         void read_consume(t_element *bin) {
146                 m_read.fetch_add(1,mo_release);
147                 /**
148                         @Begin
149                         @Commit_point_define_check: true
150                         @Label: Consume_Point
151                         @End
152                 */
153         }
154
155         //-----------------------------------------------------
156
157         /**
158                 @Begin
159                 @Interface: Prepare 
160                 @Commit_point_set: Prepare_Full_Point | Prepare_Succ_Point
161                 @ID: (call_id_t) __RET__
162                 //@Check:
163                         //prepare_check(__RET__, __TID__)
164                 //@Action:
165                         //push_back(list, __RET__);
166                 @End
167         */
168         t_element * write_prepare() {
169                 // Try weaker semantics
170                 //unsigned int rdwr = m_rdwr.load(mo_acquire);
171                 unsigned int rdwr = m_rdwr.load(mo_relaxed);
172                 /**
173                         @Begin
174                         @Potential_commit_point_define: true
175                         @Label: Prepare_Potential_Point
176                         @End
177                 */
178                 unsigned int rd,wr;
179                 for(;;) {
180                         rd = (rdwr>>16) & 0xFFFF;
181                         wr = rdwr & 0xFFFF;
182
183                         if ( wr == ((rd + t_size)&0xFFFF) ) { // full
184
185                                 /**
186                                         @Begin
187                                         @Commit_point_define: true
188                                         @Potential_commit_point_label: Prepare_Potential_Point 
189                                         @Label: Prepare_Full_Point
190                                         @End
191                                 */
192                                 return NULL;
193                         }
194                         
195                         bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
196                                 ((wr+1)&0xFFFF),mo_acq_rel);
197                         /**
198                                 @Begin
199                                 @Commit_point_define_check: succ == true
200                                 @Label: Prepare_Succ_Point
201                                 @End
202                         */
203                         if (succ)
204                                 break;
205                         else
206                                 thrd_yield();
207                 }
208
209                 // (*1)
210                 rl::backoff bo;
211                 while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
212                         thrd_yield();
213                 }
214
215                 t_element * p = & ( m_array[ wr % t_size ] );
216
217                 return p;
218         }
219
220         /**
221                 @Begin
222                 @Interface: Publish 
223                 @Commit_point_set: Publish_Point
224                 @ID: (call_id_t) bin 
225                 //@Check:
226                         //publish_check(__TID__)
227                 //@Action:
228                         //publish(__TID__);
229                 @End
230         */
231         void write_publish(t_element *bin)
232         {
233                 m_written.fetch_add(1,mo_release);
234                 /**
235                         @Begin
236                         @Commit_point_define_check: true
237                         @Label: Publish_Point
238                         @End
239                 */
240         }
241
242         //-----------------------------------------------------
243
244
245 };
246 /**
247         @Begin
248         @Class_end
249         @End
250 */