1 #ifndef IRIS_SSLFQUEUE_H_
2 #define IRIS_SSLFQUEUE_H_
10 //a single producer single consumer lockfree ring queue.
17 char _pad0_[IRIS_CACHELINE_SIZE - sizeof(T*) - sizeof(int) * 2];
18 std::atomic<long> head;
19 char _pad1_[IRIS_CACHELINE_SIZE - sizeof(std::atomic<long>)];
21 char _pad2_[IRIS_CACHELINE_SIZE - sizeof(long)];
22 std::atomic<long> tail;
23 char _pad3_[IRIS_CACHELINE_SIZE - sizeof(std::atomic<long>)];
25 char _pad4_[IRIS_CACHELINE_SIZE - sizeof(long)];
29 sslfqueue(size_t capacity = 5000):head(0), tail_cache(0), tail(0), head_cache(0)
31 cap = round_up_to_next_multiple_of_2(capacity);
40 bool offer(const T & e) {
41 const long cur_tail = tail.load(std::memory_order_acquire);
42 const long len = cur_tail - cap;
43 if (head_cache <= len) {
44 head_cache = head.load(std::memory_order_acquire);
45 if (head_cache <= len)
46 return false; // queue is full
49 buffer[cur_tail & mask] = e;
50 tail.store(cur_tail + 1, std::memory_order_release);
56 const long cur_head = head.load(std::memory_order_acquire);
57 if (cur_head >= tail_cache) {
58 tail_cache = tail.load(std::memory_order_acquire);
59 if (cur_head >= tail_cache)
60 return false; // queue is empty
63 e = buffer[cur_head & mask];
64 head.store(cur_head + 1, std::memory_order_release);
69 bool batch_poll(std::vector<T> & vec) {
70 long cur_head = head.load(std::memory_order_acquire);
71 if (cur_head >= tail_cache) {
72 tail_cache = tail.load(std::memory_order_acquire);
73 if (cur_head >= tail_cache)
74 return false; // queue is empty
77 while (cur_head < tail_cache) {
78 vec.push_back(buffer[cur_head & mask]);
82 head.store(cur_head, std::memory_order_release);
88 return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire) == 0;
92 return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire) == cap;
96 return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire);