update readme
[c11concurrency-benchmarks.git] / iris / include / sslfqueue.h
1 #ifndef IRIS_SSLFQUEUE_H_
2 #define IRIS_SSLFQUEUE_H_
3 #include <stdint.h>
4 #include <atomic>
5 #include <vector>
6 #include "utils.h"
7
8 namespace iris {
9
10 //a single producer single consumer lockfree ring queue.
11 template<typename T>
12 class sslfqueue {
13 private:
14         size_t              cap;
15         size_t              mask;
16     T *                 buffer;
17         char                _pad0_[IRIS_CACHELINE_SIZE - sizeof(T*) - sizeof(int) * 2];
18     std::atomic<long>   head;
19         char                _pad1_[IRIS_CACHELINE_SIZE - sizeof(std::atomic<long>)];
20         long                    tail_cache;
21         char                _pad2_[IRIS_CACHELINE_SIZE - sizeof(long)];
22     std::atomic<long>   tail;
23         char                _pad3_[IRIS_CACHELINE_SIZE - sizeof(std::atomic<long>)];
24         long                    head_cache;
25         char                _pad4_[IRIS_CACHELINE_SIZE - sizeof(long)];
26
27 public:
28
29     sslfqueue(size_t capacity = 5000):head(0), tail_cache(0), tail(0), head_cache(0)
30     {
31         cap = round_up_to_next_multiple_of_2(capacity);
32         mask = cap - 1;
33                 buffer = new T[cap];
34         }
35
36         ~sslfqueue(){
37                 delete []buffer;
38         }
39
40     bool offer(const T & e) {
41         const long cur_tail = tail.load(std::memory_order_acquire);
42         const long len = cur_tail - cap;
43         if (head_cache <= len) {
44                         head_cache = head.load(std::memory_order_acquire);
45                         if (head_cache <= len)
46                                 return false; // queue is full
47         }
48
49         buffer[cur_tail & mask] = e;
50         tail.store(cur_tail + 1, std::memory_order_release);
51
52         return true;
53     }
54
55     bool poll(T & e) {
56         const long cur_head = head.load(std::memory_order_acquire);
57         if (cur_head >= tail_cache) {
58                         tail_cache = tail.load(std::memory_order_acquire);
59                         if (cur_head >= tail_cache)
60                                 return false; // queue is empty
61         }
62
63         e = buffer[cur_head & mask];
64         head.store(cur_head + 1, std::memory_order_release);
65
66         return true;
67     }
68
69     bool batch_poll(std::vector<T> & vec) {
70         long cur_head = head.load(std::memory_order_acquire);
71         if (cur_head >= tail_cache) {
72             tail_cache = tail.load(std::memory_order_acquire);
73             if (cur_head >= tail_cache)
74                 return false; // queue is empty
75         }
76
77         while (cur_head < tail_cache) {
78             vec.push_back(buffer[cur_head & mask]);
79             ++cur_head;
80         }
81
82         head.store(cur_head, std::memory_order_release);
83
84         return true;
85     }
86
87     bool empty() {
88         return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire) == 0;
89     }
90
91     bool full() {
92         return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire) == cap;
93     }
94
95     size_t size() {
96         return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire);
97     }
98 };
99
100 }
101 #endif
102
103