LockFreeRingBuffer
[folly.git] / folly / experimental / LockFreeRingBuffer.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <boost/noncopyable.hpp>
21 #include <iostream>
22 #include <cmath>
23 #include <string.h>
24 #include <type_traits>
25 #include <unistd.h>
26
27 #include <folly/detail/TurnSequencer.h>
28 #include <folly/Portability.h>
29
30 namespace folly {
31 namespace detail {
32
33 template<typename T,
34          template<typename> class Atom>
35 class RingBufferSlot;
36 } // namespace detail
37
38 /// LockFreeRingBuffer<T> is a fixed-size, concurrent ring buffer with the
39 /// following semantics:
40 ///
41 ///  1. Writers cannot block on other writers UNLESS they are <capacity> writes
42 ///     apart from each other (writing to the same slot after a wrap-around)
43 ///  2. Writers cannot block on readers
44 ///  3. Readers can wait for writes that haven't occurred yet
45 ///  4. Readers can detect if they are lagging behind
46 ///
47 /// In this sense, reads from this buffer are best-effort but writes
48 /// are guaranteed.
49 ///
50 /// Another way to think about this is as an unbounded stream of writes. The
51 /// buffer contains the last <capacity> writes but readers can attempt to read
52 /// any part of the stream, even outside this window. The read API takes a
53 /// Cursor that can point anywhere in this stream of writes. Reads from the
54 /// "future" can optionally block but reads from the "past" will always fail.
55 ///
56
57 template<typename T, template<typename> class Atom = std::atomic>
58 class LockFreeRingBuffer: boost::noncopyable {
59
60    static_assert(std::is_nothrow_default_constructible<T>::value,
61        "Element type must be nothrow default constructible");
62
63    static_assert(FOLLY_IS_TRIVIALLY_COPYABLE(T),
64        "Element type must be trivially copyable");
65
66 public:
67   /// Opaque pointer to a past or future write.
68   /// Can be moved relative to its current location but not in absolute terms.
69   struct Cursor {
70     explicit Cursor(uint64_t initialTicket) noexcept : ticket(initialTicket) {}
71
72     void moveForward(uint64_t steps = 1) noexcept {
73       ticket += steps;
74     }
75
76     void moveBackward(uint64_t steps = 1) noexcept {
77       if (steps > ticket) {
78         ticket = 0;
79       } else {
80         ticket -= steps;
81       }
82     }
83
84   protected: // for test visibility reasons
85     uint64_t ticket;
86     friend class LockFreeRingBuffer;
87   };
88
89   explicit LockFreeRingBuffer(size_t capacity) noexcept
90     : capacity_(capacity)
91     , slots_(new detail::RingBufferSlot<T,Atom>[capacity])
92     , ticket_(0)
93   {}
94
95   /// Perform a single write of an object of type T.
96   /// Writes can block iff a previous writer has not yet completed a write
97   /// for the same slot (before the most recent wrap-around).
98   void write(T& value) noexcept {
99     uint64_t ticket = ticket_.fetch_add(1);
100     slots_[idx(ticket)].write(turn(ticket), value);
101   }
102
103   /// Read the value at the cursor.
104   /// Returns true if the read succeeded, false otherwise. If the return
105   /// value is false, dest is to be considered partially read and in an
106   /// inconsistent state. Readers are advised to discard it.
107   bool tryRead(T& dest, const Cursor& cursor) noexcept {
108     return slots_[idx(cursor.ticket)].tryRead(dest, turn(cursor.ticket));
109   }
110
111   /// Read the value at the cursor or block if the write has not occurred yet.
112   /// Returns true if the read succeeded, false otherwise. If the return
113   /// value is false, dest is to be considered partially read and in an
114   /// inconsistent state. Readers are advised to discard it.
115   bool waitAndTryRead(T& dest, const Cursor& cursor) noexcept {
116     return slots_[idx(cursor.ticket)].waitAndTryRead(dest, turn(cursor.ticket));
117   }
118
119   /// Returns a Cursor pointing to the first write that has not occurred yet.
120   Cursor currentHead() noexcept {
121     return Cursor(ticket_.load());
122   }
123
124   /// Returns a Cursor pointing to a currently readable write.
125   /// skipFraction is a value in the [0, 1] range indicating how far into the
126   /// currently readable window to place the cursor. 0 means the
127   /// earliest readable write, 1 means the latest readable write (if any).
128   Cursor currentTail(double skipFraction = 0.0) noexcept {
129     assert(skipFraction >= 0.0 && skipFraction <= 1.0);
130     uint64_t ticket = ticket_.load();
131
132     uint64_t backStep = std::llround((1.0 - skipFraction) * capacity_);
133
134     // always try to move at least one step backward to something readable
135     backStep = std::max<uint64_t>(1, backStep);
136
137     // can't go back more steps than we've taken
138     backStep = std::min(ticket, backStep);
139
140     return Cursor(ticket - backStep);
141   }
142
143   ~LockFreeRingBuffer() {
144   }
145
146 private:
147   const size_t capacity_;
148
149   const std::unique_ptr<detail::RingBufferSlot<T,Atom>[]> slots_;
150
151   Atom<uint64_t> ticket_;
152
153   uint32_t idx(uint64_t ticket) noexcept {
154     return ticket % capacity_;
155   }
156
157   uint32_t turn(uint64_t ticket) noexcept {
158     return (ticket / capacity_);
159   }
160 }; // LockFreeRingBuffer
161
162 namespace detail {
163 template<typename T, template<typename> class Atom>
164 class RingBufferSlot {
165 public:
166   explicit RingBufferSlot() noexcept
167     : sequencer_()
168     , data()
169   {
170   }
171
172   void write(const uint32_t turn, T& value) noexcept {
173     Atom<uint32_t> cutoff(0);
174     sequencer_.waitForTurn(turn * 2, cutoff, false);
175
176     // Change to an odd-numbered turn to indicate write in process
177     sequencer_.completeTurn(turn * 2);
178
179     data = std::move(value);
180     sequencer_.completeTurn(turn * 2 + 1);
181     // At (turn + 1) * 2
182   }
183
184   bool waitAndTryRead(T& dest, uint32_t turn) noexcept {
185     uint32_t desired_turn = (turn + 1) * 2;
186     Atom<uint32_t> cutoff(0);
187     if(!sequencer_.tryWaitForTurn(desired_turn, cutoff, false)) {
188       return false;
189     }
190     memcpy(&dest, &data, sizeof(T));
191
192     // if it's still the same turn, we read the value successfully
193     return sequencer_.isTurn(desired_turn);
194   }
195
196   bool tryRead(T& dest, uint32_t turn) noexcept {
197     // The write that started at turn 0 ended at turn 2
198     if (!sequencer_.isTurn((turn + 1) * 2)) {
199       return false;
200     }
201     memcpy(&dest, &data, sizeof(T));
202
203     // if it's still the same turn, we read the value successfully
204     return sequencer_.isTurn((turn + 1) * 2);
205   }
206
207
208 private:
209   TurnSequencer<Atom> sequencer_;
210   T data;
211 }; // RingBufferSlot
212
213 } // namespace detail
214
215 } // namespace folly