save
[cdsspec-compiler.git] / benchmark / mpmc-queue / mpmc-queue.h
1 #include <stdatomic.h>
2 #include <unrelacy.h>
3 #include <common.h>
4
5 #include <spec_lib.h>
6 #include <stdlib.h>
7 #include <cdsannotate.h>
8 #include <specannotation.h>
9 #include <model_memory.h>
10 #include "common.h" 
11
12 /**
13         @Begin
14         @Class_begin
15         @End
16 */
17 template <typename t_element, size_t t_size>
18 struct mpmc_boundq_1_alt
19 {
20 private:
21
22         // elements should generally be cache-line-size padded :
23         t_element               m_array[t_size];
24
25         // rdwr counts the reads & writes that have started
26         atomic<unsigned int>    m_rdwr;
27         // "read" and "written" count the number completed
28         atomic<unsigned int>    m_read;
29         atomic<unsigned int>    m_written;
30
31 public:
32
33         mpmc_boundq_1_alt()
34         {
35         /**
36                 @Begin
37                         @Entry_point
38                         @End
39                 */
40                 m_rdwr = 0;
41                 m_read = 0;
42                 m_written = 0;
43         }
44         
45
46         /**
47                 @Begin
48                 @Options:
49                         LANG = CPP;
50                         CLASS = mpmc_boundq_1_alt;
51                 @Global_define:
52                 @DeclareVar:
53                         id_tag_t *tag;
54                 @InitVar:
55                         tag = NULL;
56         @Happens_before:
57                 Publish -> Fetch
58         @End
59         */
60
61         //-----------------------------------------------------
62
63         /**
64                 @Begin
65                 @Interface: Fetch
66                 @Commit_point_set: Fetch_Succ_Point
67                 @ID: (call_id_t) __RET__
68                 @End
69         */
70         t_element * read_fetch() {
71                 unsigned int rdwr = m_rdwr.load(mo_acquire);
72                 /**
73                         @Begin
74                         @Commit_point_define_check: (rdwr>>16) & 0xFFFF == rdwr & 0xFFFF
75                         @Label: Fetch_Succ_Point1
76                         @End
77                 */
78                 unsigned int rd,wr;
79                 for(;;) {
80                         rd = (rdwr>>16) & 0xFFFF;
81                         wr = rdwr & 0xFFFF;
82
83                         if ( wr == rd ) { // empty
84                                 return false;
85                         }
86                         
87                         bool succ = m_rdwr.compare_exchange_weak(rdwr,rdwr+(1<<16),mo_acq_rel);
88                         
89                         if (succ)
90                                 break;
91                         else
92                                 thrd_yield();
93                 }
94
95                 // (*1)
96                 rl::backoff bo;
97                 while ( true ) {
98                         unsigned int tmp = m_written.load(mo_acquire);
99                         /**
100                                 @Begin
101                                 @Commit_point_define_check: (tmp & 0xFFFF) == wr
102                                 @Label: Fetch_Succ_Point2
103                                 @End
104                         */
105                         if ((tmp & 0xFFFF) == wr)
106                                 break;
107                         thrd_yield();
108                 }
109
110                 t_element * p = & ( m_array[ rd % t_size ] );
111                 
112                 return p;
113         }
114
115         void read_consume() {
116                 m_read.fetch_add(1,mo_release);
117         }
118
119         //-----------------------------------------------------
120
121         t_element * write_prepare() {
122                 unsigned int rdwr = m_rdwr.load(mo_acquire);
123                 unsigned int rd,wr;
124                 for(;;) {
125                         rd = (rdwr>>16) & 0xFFFF;
126                         wr = rdwr & 0xFFFF;
127
128                         if ( wr == ((rd + t_size)&0xFFFF) ) { // full
129                                 return NULL;
130                         }
131                         
132                         bool succ = m_rdwr.compare_exchange_weak(rdwr,(rd<<16) |
133                                 ((wr+1)&0xFFFF),mo_acq_rel);
134                         if (succ)
135                                 break;
136                         else
137                                 thrd_yield();
138                 }
139
140                 // (*1)
141                 rl::backoff bo;
142                 while ( (m_read.load(mo_acquire) & 0xFFFF) != rd ) {
143                         thrd_yield();
144                 }
145
146                 t_element * p = & ( m_array[ wr % t_size ] );
147
148                 return p;
149         }
150
151         /**
152                 @Begin
153                 @Interface: Publish 
154                 @Commit_point_set: Publish_Point
155                 @ID: (uint64_t) elem
156                 @End
157         */
158         void write_publish(t_element *elem)
159         {
160                 m_written.fetch_add(1,mo_release);
161                 /**
162                         @Begin
163                         @Commit_point_define_check: true
164                         @Label: Publish_Point
165                         @End
166                 */
167         }
168
169         //-----------------------------------------------------
170
171
172 };
173 /**
174         @Begin
175         @Class_end
176         @End
177 */