more data structures
[cdsspec-compiler.git] / benchmark / spsc-bugfix / queue.h
diff --git a/benchmark/spsc-bugfix/queue.h b/benchmark/spsc-bugfix/queue.h
new file mode 100644 (file)
index 0000000..802d455
--- /dev/null
@@ -0,0 +1,129 @@
+#include <unrelacy.h>
+#include <atomic>
+
+#include "eventcount.h"
+
+template<typename T>
+class spsc_queue
+{
+       /**
+       @Begin
+       @Global_define:
+               typedef struct tag_elem {
+                       Tag id;
+                       T data;
+               } tag_elem_t;
+
+               Tag tag;
+               spec_queue<tag_elem> __queue;
+
+               static bool _is_elem_equals(void *ptr1, void *ptr2) {
+                       // ...
+                       // return if the elements pointed to are equal
+               }
+
+       @Happens-before:
+               Enqueue -> Dequeue              
+       @End
+       **/
+public:
+       spsc_queue()
+       {
+               node* n = new node ();
+               head = n;
+               tail = n;
+       }
+
+       ~spsc_queue()
+       {
+               RL_ASSERT(head == tail);
+               delete ((node*)head($));
+       }
+       
+       /**
+               @Begin
+               @Commit_point_set:
+                       Enqueue = Enqueue_Success_Point
+               @ID: tag.current()
+               @Action:
+                       __queue.enqueue(tag_elem_t(tag.current(), node(data));
+                       tag.next();
+               @End
+       */
+       void enqueue(T data)
+       {
+               node* n = new node (data);
+               head($)->next.store(n, std::memory_order_release);
+               /**
+                       @Begin
+                       @Commit_point_define: true
+                       @Label: Enqueue_Success_Point
+                       @End
+               */
+               head = n;
+               // #Mark delete this
+               ec.signal();
+       }
+
+       /**
+               @Begin
+               @Commit_point_set:
+                       Dequeue = Try_Dequeue_Success_Point
+               @ID: __queue.peak().tag
+               @Check: __queue.size() > 0 && _is_elem_equals(&RET, &__queue.peek().data)
+               @Action: __queue.dequeue();
+               @End
+       */
+       T dequeue()
+       {
+               T data = try_dequeue();
+               while (0 == data)
+               {
+                       int cmp = ec.get();
+                       data = try_dequeue();
+                       if (data)
+                               break;
+                       ec.wait(cmp);
+                       data = try_dequeue();
+                       if (data)
+                               break;
+               }
+               return data;
+       }
+
+private:
+       struct node
+       {
+               std::atomic<node*> next;
+               rl::var<T> data;
+
+               node(T data = T())
+                       : data(data)
+               {
+                       next = 0;
+               }
+       };
+
+       rl::var<node*> head;
+       rl::var<node*> tail;
+
+       eventcount ec;
+
+       T try_dequeue()
+       {
+               node* t = tail($);
+               node* n = t->next.load(std::memory_order_acquire);
+               /**
+                       @Begin
+                       @Commit_point_define: ATOMIC_RET != NULL
+                       @Label: Try_Dequeue_Success_Point
+                       @End
+               */
+               if (0 == n)
+                       return 0;
+               T data = n->data($);
+               delete (t);
+               tail = n;
+               return data;
+       }
+};