more fix
[cdsspec-compiler.git] / benchmark / spsc-bugfix / queue.h
1 #ifndef _QUEUE_H
2 #define _QUEUE_H
3
4 #include <unrelacy.h>
5 #include <atomic>
6
7 #include <spec_lib.h>
8 #include <stdlib.h>
9 #include <cdsannotate.h>
10 #include <specannotation.h>
11 #include <model_memory.h>
12
13 #include "eventcount.h"
14
15 /**
16         @Begin
17         @Class_begin
18         @End
19 */
20 template<typename T>
21 class spsc_queue
22 {
23         
24 public:
25
26         
27         spsc_queue()
28         {
29
30                 /**
31                         @Begin
32                         @Entry_point
33                         @End
34                 */
35
36                 node* n = new node ();
37                 head = n;
38                 tail = n;
39         }
40
41         ~spsc_queue()
42         {
43                 RL_ASSERT(head == tail);
44                 delete ((node*)head($));
45         }
46
47         /**
48                 @Begin
49                 @Options:
50                         LANG = CPP;
51                         CLASS = spsc_queue;
52                 @Global_define:
53                         @DeclareStruct:
54                         typedef struct tag_elem {
55                                 call_id_t id;
56                                 T data;
57                         } tag_elem_t;
58                 
59                 @DeclareVar:
60                         spec_list *__queue;
61                         id_tag_t *tag;
62                 @InitVar:
63                         __queue = new_spec_list();
64                         tag = new_id_tag();
65                 @DefineFunc:
66                         tag_elem_t* new_tag_elem(call_id_t id, T data) {
67                         tag_elem_t *e = (tag_elem_t*) MODEL_MALLOC(sizeof(tag_elem_t));
68                                 e->id = id;
69                                 e->data = data;
70                                 return e;
71                         }
72                 @DefineFunc:
73                         call_id_t get_id(void *wrapper) {
74                                 return ((tag_elem_t*) wrapper)->id;
75                         }
76                 @DefineFunc:
77                         unsigned int get_data(void *wrapper) {
78                                 return ((tag_elem_t*) wrapper)->data;
79                         }
80                 @Happens_before:
81                         Enqueue -> Dequeue              
82                 @End
83         */
84
85         
86         /**
87                 @Begin
88                 @Interface: Enqueue
89                 @Commit_point_set: Enqueue_Point
90                 @ID: get_and_inc(tag)
91                 @Action:
92                         tag_elem_t *elem = new_tag_elem(__ID__, data);
93                         push_back(__queue, elem);
94                 @End
95         */
96         void enqueue(T data)
97         {
98                 node* n = new node (data);
99                 head($)->next.store(n, std::memory_order_release);
100                 /**
101                         @Begin
102                         @Commit_point_define_check: true
103                         @Label: Enqueue_Point
104                         @End
105                 */
106                 head = n;
107                 // #Mark delete this
108                 ec.signal();
109         }
110         /**
111                 @Begin
112                 @Interface: Dequeue
113                 @Commit_point_set: Dequeue_Point
114                 @ID: get_id(front(__queue))
115                 @Action:
116                         T _Old_Val = get_data(front(__queue));
117                         pop_front(__queue);
118                 @Post_check:
119                         _Old_Val == __RET__
120                 @End
121         */
122         T dequeue()
123         {
124                 T data = try_dequeue();
125                 while (0 == data)
126                 {
127                         int cmp = ec.get();
128                         data = try_dequeue();
129                         if (data)
130                                 break;
131                         ec.wait(cmp);
132                         data = try_dequeue();
133                         if (data)
134                                 break;
135                 }
136                 return data;
137         }
138
139 private:
140         struct node
141         {
142                 std::atomic<node*> next;
143                 rl::var<T> data;
144
145                 node(T data = T())
146                         : data(data)
147                 {
148                         next = 0;
149                 }
150         };
151
152         rl::var<node*> head;
153         rl::var<node*> tail;
154
155         eventcount ec;
156
157         T try_dequeue()
158         {
159                 node* t = tail($);
160                 node* n = t->next.load(std::memory_order_acquire);
161                 /**
162                         @Begin
163                         @Commit_point_define_check: n != 0
164                         @Label: Dequeue_Point
165                         @End
166                 */
167                 if (0 == n)
168                         return 0;
169                 T data = n->data($);
170                 delete (t);
171                 tail = n;
172                 return data;
173         }
174 };
175 /**
176         @Begin
177         @Class_end
178         @End
179 */
180
181 #endif