add spsc
[cdsspec-compiler.git] / benchmark / spsc-bugfix / queue.h
1 #ifndef _QUEUE_H
2 #define _QUEUE_H
3
4 #include <unrelacy.h>
5 #include <atomic>
6
7 #include "eventcount.h"
8
9 /**
10         @Begin
11         @Class_begin
12         @End
13 */
14 template<typename T>
15 class spsc_queue
16 {
17         
18 public:
19
20         
21         spsc_queue()
22         {
23
24                 /**
25                         @Begin
26                         @Entry_point
27                         @End
28                 */
29
30                 node* n = new node ();
31                 head = n;
32                 tail = n;
33         }
34
35         ~spsc_queue()
36         {
37                 RL_ASSERT(head == tail);
38                 delete ((node*)head($));
39         }
40
41         /**
42                 @Begin
43                 @Options:
44                         LANG = CPP;
45                         CLASS = spsc_queue;
46                 @Global_define:
47                         @DeclareStruct:
48                         typedef struct tag_elem {
49                                 call_id_t id;
50                                 T data;
51                         } tag_elem_t;
52                 
53                 @DeclareVar:
54                         spec_list *__queue;
55                         id_tag_t *tag;
56                 @InitVar:
57                         __queue = new_spec_list();
58                         tag = new_id_tag();
59                 @DefineFunc:
60                         tag_elem_t* new_tag_elem(call_id_t id, T data) {
61                         tag_elem_t *e = (tag_elem_t*) MODEL_MALLOC(sizeof(tag_elem_t));
62                                 e->id = id;
63                                 e->data = data;
64                                 return e;
65                         }
66                 @DefineFunc:
67                         call_id_t get_id(void *wrapper) {
68                                 return ((tag_elem_t*) wrapper)->id;
69                         }
70                 @DefineFunc:
71                         unsigned int get_data(void *wrapper) {
72                                 return ((tag_elem_t*) wrapper)->data;
73                         }
74                 @Happens_before:
75                         Enqueue -> Dequeue              
76                 @End
77         */
78
79         
80         /**
81                 @Begin
82                 @Interface: Enqueue
83                 @Commit_point_set: Enqueue_Point
84                 @ID: get_and_inc(tag)
85                 @Action:
86                         tag_elem_t *elem = new_tag_elem(__ID__, data);
87                         push_back(__queue, elem);
88                 @End
89         */
90         void enqueue(T data)
91         {
92                 node* n = new node (data);
93                 head($)->next.store(n, std::memory_order_release);
94                 /**
95                         @Begin
96                         @Commit_point_define_check: true
97                         @Label: Enqueue_Point
98                         @End
99                 */
100                 head = n;
101                 // #Mark delete this
102                 ec.signal();
103         }
104         /**
105                 @Begin
106                 @Interface: Dequeue
107                 @Commit_point_set: Dequeue_Point
108                 @ID: get_id(front(__queue))
109                 @Action:
110                         T _Old_Val = get_data(front(__queue));
111                         pop_front(__queue);
112                 @Post_check:
113                         _Old_Val == __RET__
114                 @End
115         */
116         T dequeue()
117         {
118                 T data = try_dequeue();
119                 while (0 == data)
120                 {
121                         int cmp = ec.get();
122                         data = try_dequeue();
123                         if (data)
124                                 break;
125                         ec.wait(cmp);
126                         data = try_dequeue();
127                         if (data)
128                                 break;
129                 }
130                 return data;
131         }
132
133 private:
134         struct node
135         {
136                 std::atomic<node*> next;
137                 rl::var<T> data;
138
139                 node(T data = T())
140                         : data(data)
141                 {
142                         next = 0;
143                 }
144         };
145
146         rl::var<node*> head;
147         rl::var<node*> tail;
148
149         eventcount ec;
150
151         T try_dequeue()
152         {
153                 node* t = tail($);
154                 node* n = t->next.load(std::memory_order_acquire);
155                 /**
156                         @Begin
157                         @Commit_point_define_check: n != 0
158                         @Label: Dequeue_Point
159                         @End
160                 */
161                 if (0 == n)
162                         return 0;
163                 T data = n->data($);
164                 delete (t);
165                 tail = n;
166                 return data;
167         }
168 };
169 /**
170         @Begin
171         @Class_end
172         @End
173 */
174
175 #endif