edits
[cdsspec-compiler.git] / benchmark / spsc-bugfix / queue.h
1 #ifndef _QUEUE_H
2 #define _QUEUE_H
3
4 #include <unrelacy.h>
5 #include <atomic>
6
7 #include <spec_lib.h>
8 #include <stdlib.h>
9 #include <cdsannotate.h>
10 #include <specannotation.h>
11 #include <model_memory.h>
12 #include "common.h" 
13
14 #include "eventcount.h"
15
16 /**
17         @Begin
18         @Class_begin
19         @End
20 */
21 template<typename T>
22 class spsc_queue
23 {
24         
25 public:
26
27         
28         spsc_queue()
29         {
30
31                 /**
32                         @Begin
33                         @Entry_point
34                         @End
35                 */
36
37                 node* n = new node ();
38                 head = n;
39                 tail = n;
40         }
41
42         ~spsc_queue()
43         {
44                 //RL_ASSERT(head == tail);
45                 delete ((node*)head($));
46         }
47
48         /**
49                 @Begin
50                 @Options:
51                         LANG = CPP;
52                         CLASS = spsc_queue;
53                 @Global_define:
54                         @DeclareStruct:
55                         typedef struct tag_elem {
56                                 call_id_t id;
57                                 T data;
58                         } tag_elem_t;
59                 
60                 @DeclareVar:
61                         spec_list *__queue;
62                         id_tag_t *tag;
63                 @InitVar:
64                         __queue = new_spec_list();
65                         tag = new_id_tag();
66                 @DefineFunc:
67                         tag_elem_t* new_tag_elem(call_id_t id, T data) {
68                         tag_elem_t *e = (tag_elem_t*) MODEL_MALLOC(sizeof(tag_elem_t));
69                                 e->id = id;
70                                 e->data = data;
71                                 return e;
72                         }
73                 @DefineFunc:
74                         call_id_t get_id(void *wrapper) {
75                                 return ((tag_elem_t*) wrapper)->id;
76                         }
77                 @DefineFunc:
78                         unsigned int get_data(void *wrapper) {
79                                 return ((tag_elem_t*) wrapper)->data;
80                         }
81                 @Happens_before: Enqueue -> Dequeue             
82                 @End
83         */
84
85         
86         /**
87                 @Begin
88                 @Interface: Enqueue
89                 @Commit_point_set: Enqueue_Point
90                 @ID: get_and_inc(tag)
91                 @Action: push_back(__queue, new_tag_elem(__ID__, data));
92                         //tag_elem_t *elem = new_tag_elem(__ID__, data);
93                         //push_back(__queue, elem);
94                 @End
95         */
96         void enqueue(T data)
97         {
98                 node* n = new node (data);
99                 head($)->next.store(n, std::memory_order_release);
100                 /**
101                         @Begin
102                         @Commit_point_define_check: true
103                         @Label: Enqueue_Point
104                         @End
105                 */
106                 head = n;
107                 // #Mark delete this
108                 ec.signal();
109         }
110         /**
111                 @Begin
112                 @Interface: Dequeue
113                 @Commit_point_set: Dequeue_Point
114                 @ID: get_id(front(__queue))
115                 @Action:
116                         T _Old_Val = get_data(front(__queue));
117                         pop_front(__queue);
118                 @Post_check: _Old_Val == __RET__
119                 @End
120         */
121         T dequeue()
122         {
123                 T data = try_dequeue();
124                 while (0 == data)
125                 {
126                         int cmp = ec.get();
127                         data = try_dequeue();
128                         if (data)
129                                 break;
130                         ec.wait(cmp);
131                         data = try_dequeue();
132                         if (data)
133                                 break;
134                 }
135                 return data;
136         }
137
138 private:
139         struct node
140         {
141                 std::atomic<node*> next;
142                 rl::var<T> data;
143
144                 node(T data = T())
145                         : data(data)
146                 {
147                         next = 0;
148                 }
149         };
150
151         rl::var<node*> head;
152         rl::var<node*> tail;
153
154         eventcount ec;
155
156         T try_dequeue()
157         {
158                 node* t = tail($);
159                 node* n = t->next.load(std::memory_order_acquire);
160                 /**
161                         @Begin
162                         @Commit_point_define_check: n != 0
163                         @Label: Dequeue_Point
164                         @End
165                 */
166                 if (0 == n)
167                         return 0;
168                 T data = n->data($);
169                 delete (t);
170                 tail = n;
171                 return data;
172         }
173 };
174 /**
175         @Begin
176         @Class_end
177         @End
178 */
179
180 #endif