save
[cdsspec-compiler.git] / benchmark / mpmc-queue / mpmc-queue.h
1 #include <stdatomic.h>
2 #include <unrelacy.h>
3 #include <common.h>
4
5 #include <spec_lib.h>
6 #include <stdlib.h>
7 #include <cdsannotate.h>
8 #include <specannotation.h>
9 #include <model_memory.h>
10 #include "common.h" 
11
12 /**
13         @Begin
14         @Class_begin
15         @End
16 */
17 template <typename t_element, size_t t_size>
18 struct mpmc_boundq_1_alt
19 {
20 private:
21
22         // elements should generally be cache-line-size padded :
23         t_element               m_array[t_size];
24
25         // rdwr counts the reads & writes that have started
26         atomic<unsigned int>    m_rdwr;
27         // "read" and "written" count the number completed
28         atomic<unsigned int>    m_read;
29         atomic<unsigned int>    m_written;
30
31 public:
32
33         mpmc_boundq_1_alt()
34         {
35         /**
36                 @Begin
37                         @Entry_point
38                         @End
39                 */
40                 m_rdwr = 0;
41                 m_read = 0;
42                 m_written = 0;
43         }
44         
45
46         /**
47                 @Begin
48                 @Options:
49                         LANG = CPP;
50                         CLASS = mpmc_boundq_1_alt;
51                 @Global_define:
52                 @DeclareVar:
53                         id_tag_t *tag;
54                 @InitVar:
55                         tag = NULL;
56         @Happens_before:
57                 Publish -> Fetch
58         @End
59         */
60
61         //-----------------------------------------------------
62
63         /**
64                 @Begin
65                 @Interface: Fetch
66                 @Commit_point_set: Fetch_Succ_Point | Fetch_Fail_Point
67                 @ID: (call_id_t) __RET__
68                 @End
69         */
70         t_element * read_fetch() {
71                 unsigned int rdwr = m_rdwr.load(mo_acquire);
72                 /**
73                         @Begin
74                         @Potential_commit_point_define: true
75                         @Label: Fetch_Potential_Point
76                         @End
77                 */
78                 unsigned int rd,wr;
79                 for(;;) {
80                         rd = (rdwr>>16) & 0xFFFF;
81                         wr = rdwr & 0xFFFF;
82
83                         if ( wr == rd ) { // empty
84                                 /**
85                                         @Begin
86                                         @Commit_point_define: true
87                                         @Potential_commit_point_label: Fetch_Potential_Point 
88                                         @Label: Fetch_Fail_Point
89                                         @End
90                                 */
91                                 return false;
92                         }
93                         
94                         bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
95                         
96                         if (succ)
97                                 break;
98                         else
99                                 thrd_yield();
100                 }
101
102                 // (*1)
103                 rl::backoff bo;
104                 while ( true ) {
105                         unsigned int tmp = m_written.load(mo_acquire);
106                         /**
107                                 @Begin
108                                 @Commit_point_define_check: (tmp & 0xFFFF) == wr
109                                 @Label: Fetch_Succ_Point
110                                 @End
111                         */
112                         if ((tmp & 0xFFFF) == wr)
113                                 break;
114                         thrd_yield();
115                 }
116
117                 t_element * p = & ( m_array[ rd % t_size ] );
118                 
119                 return p;
120         }
121
122         void read_consume() {
123                 m_read.fetch_add(1,mo_release);
124         }
125
126         //-----------------------------------------------------
127
128         t_element * write_prepare() {
129                 unsigned int rdwr = m_rdwr.load(mo_acquire);
130                 unsigned int rd,wr;
131                 for(;;) {
132                         rd = (rdwr>>16) & 0xFFFF;
133                         wr = rdwr & 0xFFFF;
134
135                         if ( wr == ((rd + t_size)&0xFFFF) ) { // full
136                                 return NULL;
137                         }
138                         
139                         bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
140                                 ((wr+1)&0xFFFF),mo_acq_rel);
141                         if (succ)
142                                 break;
143                         else
144                                 thrd_yield();
145                 }
146
147                 // (*1)
148                 rl::backoff bo;
149                 while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
150                         thrd_yield();
151                 }
152
153                 t_element * p = & ( m_array[ wr % t_size ] );
154
155                 return p;
156         }
157
158         /**
159                 @Begin
160                 @Interface: Publish 
161                 @Commit_point_set: Publish_Point
162                 @ID: (uint64_t) elem
163                 @End
164         */
165         void write_publish(t_element *elem)
166         {
167                 m_written.fetch_add(1,mo_release);
168                 /**
169                         @Begin
170                         @Commit_point_define_check: true
171                         @Label: Publish_Point
172                         @End
173                 */
174         }
175
176         //-----------------------------------------------------
177
178
179 };
180 /**
181         @Begin
182         @Class_end
183         @End
184 */