--- /dev/null
+#include <unrelacy.h>
+#include <atomic>
+
+#include "eventcount.h"
+
+template<typename T>
+class spsc_queue
+{
+ /**
+ @Begin
+ @Global_define:
+ typedef struct tag_elem {
+ Tag id;
+ T data;
+ } tag_elem_t;
+
+ Tag tag;
+ spec_queue<tag_elem> __queue;
+
+ static bool _is_elem_equals(void *ptr1, void *ptr2) {
+ // ...
+ // return if the elements pointed to are equal
+ }
+
+ @Happens-before:
+ Enqueue -> Dequeue
+ @End
+ **/
+public:
+ spsc_queue()
+ {
+ node* n = new node ();
+ head = n;
+ tail = n;
+ }
+
+ ~spsc_queue()
+ {
+ RL_ASSERT(head == tail);
+ delete ((node*)head($));
+ }
+
+ /**
+ @Begin
+ @Commit_point_set:
+ Enqueue = Enqueue_Success_Point
+ @ID: tag.current()
+ @Action:
+ __queue.enqueue(tag_elem_t(tag.current(), node(data));
+ tag.next();
+ @End
+ */
+ void enqueue(T data)
+ {
+ node* n = new node (data);
+ head($)->next.store(n, std::memory_order_release);
+ /**
+ @Begin
+ @Commit_point_define: true
+ @Label: Enqueue_Success_Point
+ @End
+ */
+ head = n;
+ // #Mark delete this
+ ec.signal();
+ }
+
+ /**
+ @Begin
+ @Commit_point_set:
+ Dequeue = Try_Dequeue_Success_Point
+ @ID: __queue.peak().tag
+ @Check: __queue.size() > 0 && _is_elem_equals(&RET, &__queue.peek().data)
+ @Action: __queue.dequeue();
+ @End
+ */
+ T dequeue()
+ {
+ T data = try_dequeue();
+ while (0 == data)
+ {
+ int cmp = ec.get();
+ data = try_dequeue();
+ if (data)
+ break;
+ ec.wait(cmp);
+ data = try_dequeue();
+ if (data)
+ break;
+ }
+ return data;
+ }
+
+private:
+ struct node
+ {
+ std::atomic<node*> next;
+ rl::var<T> data;
+
+ node(T data = T())
+ : data(data)
+ {
+ next = 0;
+ }
+ };
+
+ rl::var<node*> head;
+ rl::var<node*> tail;
+
+ eventcount ec;
+
+ T try_dequeue()
+ {
+ node* t = tail($);
+ node* n = t->next.load(std::memory_order_acquire);
+ /**
+ @Begin
+ @Commit_point_define: ATOMIC_RET != NULL
+ @Label: Try_Dequeue_Success_Point
+ @End
+ */
+ if (0 == n)
+ return 0;
+ T data = n->data($);
+ delete (t);
+ tail = n;
+ return data;
+ }
+};