save
[cdsspec-compiler.git] / benchmark / spsc-bugfix / queue.h
1 #ifndef _QUEUE_H
2 #define _QUEUE_H
3
4 #include <unrelacy.h>
5 #include <atomic>
6
7 #include <spec_lib.h>
8 #include <stdlib.h>
9 #include <cdsannotate.h>
10 #include <specannotation.h>
11 #include <model_memory.h>
12 #include "common.h" 
13
14 #include "eventcount.h"
15
16 /**
17         @Begin
18         @Class_begin
19         @End
20 */
21 template<typename T>
22 class spsc_queue
23 {
24         
25 public:
26
27         
28         spsc_queue()
29         {
30
31                 /**
32                         @Begin
33                         @Entry_point
34                         @End
35                 */
36
37                 node* n = new node ();
38                 head = n;
39                 tail = n;
40         }
41
42         ~spsc_queue()
43         {
44                 RL_ASSERT(head == tail);
45                 delete ((node*)head($));
46         }
47
48         /**
49                 @Begin
50                 @Options:
51                         LANG = CPP;
52                         CLASS = spsc_queue;
53                 @Global_define:
54                         @DeclareStruct:
55                         typedef struct tag_elem {
56                                 call_id_t id;
57                                 T data;
58                         } tag_elem_t;
59                 
60                 @DeclareVar:
61                         spec_list *__queue;
62                         id_tag_t *tag;
63                 @InitVar:
64                         __queue = new_spec_list();
65                         tag = new_id_tag();
66                 @DefineFunc:
67                         tag_elem_t* new_tag_elem(call_id_t id, T data) {
68                         tag_elem_t *e = (tag_elem_t*) MODEL_MALLOC(sizeof(tag_elem_t));
69                                 e->id = id;
70                                 e->data = data;
71                                 return e;
72                         }
73                 @DefineFunc:
74                         call_id_t get_id(void *wrapper) {
75                                 return ((tag_elem_t*) wrapper)->id;
76                         }
77                 @DefineFunc:
78                         unsigned int get_data(void *wrapper) {
79                                 return ((tag_elem_t*) wrapper)->data;
80                         }
81                 @Happens_before:
82                         Enqueue -> Dequeue              
83                 @End
84         */
85
86         
87         /**
88                 @Begin
89                 @Interface: Enqueue
90                 @Commit_point_set: Enqueue_Point
91                 @ID: get_and_inc(tag)
92                 @Action:
93                         tag_elem_t *elem = new_tag_elem(__ID__, data);
94                         push_back(__queue, elem);
95                 @End
96         */
97         void enqueue(T data)
98         {
99                 node* n = new node (data);
100                 head($)->next.store(n, std::memory_order_release);
101                 /**
102                         @Begin
103                         @Commit_point_define_check: true
104                         @Label: Enqueue_Point
105                         @End
106                 */
107                 head = n;
108                 // #Mark delete this
109                 ec.signal();
110         }
111         /**
112                 @Begin
113                 @Interface: Dequeue
114                 @Commit_point_set: Dequeue_Point
115                 @ID: get_id(front(__queue))
116                 @Action:
117                         T _Old_Val = get_data(front(__queue));
118                         pop_front(__queue);
119                 @Post_check:
120                         _Old_Val == __RET__
121                 @End
122         */
123         T dequeue()
124         {
125                 T data = try_dequeue();
126                 while (0 == data)
127                 {
128                         int cmp = ec.get();
129                         data = try_dequeue();
130                         if (data)
131                                 break;
132                         ec.wait(cmp);
133                         data = try_dequeue();
134                         if (data)
135                                 break;
136                 }
137                 return data;
138         }
139
140 private:
141         struct node
142         {
143                 std::atomic<node*> next;
144                 rl::var<T> data;
145
146                 node(T data = T())
147                         : data(data)
148                 {
149                         next = 0;
150                 }
151         };
152
153         rl::var<node*> head;
154         rl::var<node*> tail;
155
156         eventcount ec;
157
158         T try_dequeue()
159         {
160                 node* t = tail($);
161                 node* n = t->next.load(std::memory_order_acquire);
162                 /**
163                         @Begin
164                         @Commit_point_define_check: n != 0
165                         @Label: Dequeue_Point
166                         @End
167                 */
168                 if (0 == n)
169                         return 0;
170                 T data = n->data($);
171                 delete (t);
172                 tail = n;
173                 return data;
174         }
175 };
176 /**
177         @Begin
178         @Class_end
179         @End
180 */
181
182 #endif