minor fix
[cdsspec-compiler.git] / benchmark / spsc-bugfix / queue.h
1 #include <unrelacy.h>
2 #include <atomic>
3
4 #include "eventcount.h"
5
6 template<typename T>
7 class spsc_queue
8 {
9         /**
10         @Begin
11         @Global_define:
12                 typedef struct tag_elem {
13                         Tag id;
14                         T data;
15                 } tag_elem_t;
16
17                 Tag tag;
18                 spec_queue<tag_elem> __queue;
19
20                 static bool _is_elem_equals(void *ptr1, void *ptr2) {
21                         // ...
22                         // return if the elements pointed to are equal
23                 }
24
25         @Happens-before:
26                 Enqueue -> Dequeue              
27         @End
28         **/
29 public:
30         spsc_queue()
31         {
32                 node* n = new node ();
33                 head = n;
34                 tail = n;
35         }
36
37         ~spsc_queue()
38         {
39                 RL_ASSERT(head == tail);
40                 delete ((node*)head($));
41         }
42         
43         /**
44                 @Begin
45                 @Commit_point_set:
46                         Enqueue = Enqueue_Success_Point
47                 @ID: tag.current()
48                 @Action:
49                         __queue.enqueue(tag_elem_t(tag.current(), node(data));
50                         tag.next();
51                 @End
52         */
53         void enqueue(T data)
54         {
55                 node* n = new node (data);
56                 head($)->next.store(n, std::memory_order_release);
57                 /**
58                         @Begin
59                         @Commit_point_define: true
60                         @Label: Enqueue_Success_Point
61                         @End
62                 */
63                 head = n;
64                 // #Mark delete this
65                 ec.signal();
66         }
67
68         /**
69                 @Begin
70                 @Commit_point_set:
71                         Dequeue = Try_Dequeue_Success_Point
72                 @ID: __queue.peak().tag
73                 @Check: __queue.size() > 0 && _is_elem_equals(&RET, &__queue.peek().data)
74                 @Action: __queue.dequeue();
75                 @End
76         */
77         T dequeue()
78         {
79                 T data = try_dequeue();
80                 while (0 == data)
81                 {
82                         int cmp = ec.get();
83                         data = try_dequeue();
84                         if (data)
85                                 break;
86                         ec.wait(cmp);
87                         data = try_dequeue();
88                         if (data)
89                                 break;
90                 }
91                 return data;
92         }
93
94 private:
95         struct node
96         {
97                 std::atomic<node*> next;
98                 rl::var<T> data;
99
100                 node(T data = T())
101                         : data(data)
102                 {
103                         next = 0;
104                 }
105         };
106
107         rl::var<node*> head;
108         rl::var<node*> tail;
109
110         eventcount ec;
111
112         T try_dequeue()
113         {
114                 node* t = tail($);
115                 node* n = t->next.load(std::memory_order_acquire);
116                 /**
117                         @Begin
118                         @Commit_point_define: ATOMIC_RET != NULL
119                         @Label: Try_Dequeue_Success_Point
120                         @End
121                 */
122                 if (0 == n)
123                         return 0;
124                 T data = n->data($);
125                 delete (t);
126                 tail = n;
127                 return data;
128         }
129 };