edits
[c11concurrency-benchmarks.git] / iris / include / lfringbuffer.h
1 #ifndef IRIS_LF_RINGBUFFER_H_
2 #define IRIS_LF_RINGBUFFER_H_
3 #include <atomic>
4
5 #include "define.h"
6 #include "utils.h"
7 #include <queue>
8 #include <assert.h>
9 namespace iris {
10
11 // A single reader single writer lockfree ring buffer
12 // this should be used as following:
13 // A thread calls acquire() method repeatedly 
14 // until the return value is nonzero to allocate @size bytes of memory.
15 // And release() method should be called in the order of allocation 
16 // with the same @size parameter to declare the release.
17 class lfringbuffer {
18 private:
19     size_t                      cap;
20     size_t                      mask;
21     char                     *  buffer;
22     char                        __pad0__[IRIS_CACHELINE_SIZE - sizeof(size_t) * 2 - sizeof(char*)];
23     unsigned long               head;
24     unsigned long               tail_cache;
25     char                        __pad1__[IRIS_CACHELINE_SIZE - sizeof(unsigned long) * 2];
26     std::atomic<unsigned long>  tail;
27     char                        __pad2__[IRIS_CACHELINE_SIZE - sizeof(std::atomic<unsigned long>)];
28 public:
29     lfringbuffer(size_t size) {
30         cap = round_up_to_next_multiple_of_2(size);
31         mask = cap - 1;
32         buffer = new char[cap];
33         head = 0;
34         tail = cap;
35         tail_cache = cap;
36     }
37
38     ~lfringbuffer() {
39         delete[] buffer;
40         buffer = nullptr;
41     }
42
43     // acquire() tries to allocate @size bytes of memory
44     // from the buffer, the address of the memory acquired
45     // is stored in @ptr.
46     // returns the actual bytes allocated insided the buffer 
47     // which might be larger than the requested size, since
48     // internally the buffer is implemented by a array and 
49     // there is unusable memory need to be taken into account
50     // when the request can not be fulfilled using only the memory
51     // left at the end of the buffer.
52     // 0 will be returned if there is not enough free space.
53     size_t acquire(size_t size, char *& ptr) {
54         assert(size);
55         size_t total_free = tail_cache - head;
56         if (iris_unlikely(total_free < size)) {
57             // load the latest tail
58             tail_cache = tail.load(std::memory_order_acquire);
59             total_free = tail_cache - head;
60             if (total_free < size)
61                 return 0; // no enough freespace
62         }
63
64         // now check if there is enough memory at the rear
65         size_t rear_free = cap - (head & mask);
66         if (iris_likely(rear_free >= size)) {
67             // ok, take the rear chunk
68             ptr = buffer + (head & mask);
69             head += size;
70             return size;
71         }
72
73         // now check if there is enough memory at the front
74         size_t front_free = tail_cache & mask;
75         if (front_free < size) {
76             tail_cache = tail.load(std::memory_order_acquire);
77             front_free = tail_cache & mask;
78             if (front_free < size)
79                 return 0;// no enough space
80         }
81
82         ptr = buffer;
83
84         // throw away the rear fragmentation
85         head += rear_free + size;
86
87         return rear_free + size;
88     }
89
90     // release() method releases @size bytes of memory allocated from
91     // acquire() method. @size should be the same as the one retuned from
92     // acquire().
93     void   release(size_t size) {
94         tail.fetch_add(size, std::memory_order_release);
95     }
96
97     size_t freespace() {
98         return tail.load(std::memory_order_acquire) - head;
99     }
100 };
101
102 }
103 #endif