2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include <type_traits>
25 #include <boost/noncopyable.hpp>
27 #include <folly/Portability.h>
28 #include <folly/detail/TurnSequencer.h>
29 #include <folly/portability/TypeTraits.h>
30 #include <folly/portability/Unistd.h>
35 template <typename T, template <typename> class Atom>
39 /// LockFreeRingBuffer<T> is a fixed-size, concurrent ring buffer with the
40 /// following semantics:
42 /// 1. Writers cannot block on other writers UNLESS they are <capacity> writes
43 /// apart from each other (writing to the same slot after a wrap-around)
44 /// 2. Writers cannot block on readers
45 /// 3. Readers can wait for writes that haven't occurred yet
46 /// 4. Readers can detect if they are lagging behind
48 /// In this sense, reads from this buffer are best-effort but writes
51 /// Another way to think about this is as an unbounded stream of writes. The
52 /// buffer contains the last <capacity> writes but readers can attempt to read
53 /// any part of the stream, even outside this window. The read API takes a
54 /// Cursor that can point anywhere in this stream of writes. Reads from the
55 /// "future" can optionally block but reads from the "past" will always fail.
58 template <typename T, template <typename> class Atom = std::atomic>
59 class LockFreeRingBuffer: boost::noncopyable {
61 std::is_nothrow_default_constructible<T>::value,
62 "Element type must be nothrow default constructible");
65 FOLLY_IS_TRIVIALLY_COPYABLE(T),
66 "Element type must be trivially copyable");
69 /// Opaque pointer to a past or future write.
70 /// Can be moved relative to its current location but not in absolute terms.
72 explicit Cursor(uint64_t initialTicket) noexcept : ticket(initialTicket) {}
74 /// Returns true if this cursor now points to a different
75 /// write, false otherwise.
76 bool moveForward(uint64_t steps = 1) noexcept {
77 uint64_t prevTicket = ticket;
79 return prevTicket != ticket;
82 /// Returns true if this cursor now points to a previous
83 /// write, false otherwise.
84 bool moveBackward(uint64_t steps = 1) noexcept {
85 uint64_t prevTicket = ticket;
91 return prevTicket != ticket;
94 protected: // for test visibility reasons
96 friend class LockFreeRingBuffer;
99 explicit LockFreeRingBuffer(uint32_t capacity) noexcept
100 : capacity_(capacity)
101 , slots_(new detail::RingBufferSlot<T,Atom>[capacity])
105 /// Perform a single write of an object of type T.
106 /// Writes can block iff a previous writer has not yet completed a write
107 /// for the same slot (before the most recent wrap-around).
108 void write(T& value) noexcept {
109 uint64_t ticket = ticket_.fetch_add(1);
110 slots_[idx(ticket)].write(turn(ticket), value);
113 /// Perform a single write of an object of type T.
114 /// Writes can block iff a previous writer has not yet completed a write
115 /// for the same slot (before the most recent wrap-around).
116 /// Returns a Cursor pointing to the just-written T.
117 Cursor writeAndGetCursor(T& value) noexcept {
118 uint64_t ticket = ticket_.fetch_add(1);
119 slots_[idx(ticket)].write(turn(ticket), value);
120 return Cursor(ticket);
123 /// Read the value at the cursor.
124 /// Returns true if the read succeeded, false otherwise. If the return
125 /// value is false, dest is to be considered partially read and in an
126 /// inconsistent state. Readers are advised to discard it.
127 bool tryRead(T& dest, const Cursor& cursor) noexcept {
128 return slots_[idx(cursor.ticket)].tryRead(dest, turn(cursor.ticket));
131 /// Read the value at the cursor or block if the write has not occurred yet.
132 /// Returns true if the read succeeded, false otherwise. If the return
133 /// value is false, dest is to be considered partially read and in an
134 /// inconsistent state. Readers are advised to discard it.
135 bool waitAndTryRead(T& dest, const Cursor& cursor) noexcept {
136 return slots_[idx(cursor.ticket)].waitAndTryRead(dest, turn(cursor.ticket));
139 /// Returns a Cursor pointing to the first write that has not occurred yet.
140 Cursor currentHead() noexcept {
141 return Cursor(ticket_.load());
144 /// Returns a Cursor pointing to a currently readable write.
145 /// skipFraction is a value in the [0, 1] range indicating how far into the
146 /// currently readable window to place the cursor. 0 means the
147 /// earliest readable write, 1 means the latest readable write (if any).
148 Cursor currentTail(double skipFraction = 0.0) noexcept {
149 assert(skipFraction >= 0.0 && skipFraction <= 1.0);
150 uint64_t ticket = ticket_.load();
152 uint64_t backStep = llround((1.0 - skipFraction) * capacity_);
154 // always try to move at least one step backward to something readable
155 backStep = std::max<uint64_t>(1, backStep);
157 // can't go back more steps than we've taken
158 backStep = std::min(ticket, backStep);
160 return Cursor(ticket - backStep);
163 ~LockFreeRingBuffer() {
167 const uint32_t capacity_;
169 const std::unique_ptr<detail::RingBufferSlot<T,Atom>[]> slots_;
171 Atom<uint64_t> ticket_;
173 uint32_t idx(uint64_t ticket) noexcept {
174 return ticket % capacity_;
177 uint32_t turn(uint64_t ticket) noexcept {
178 return (uint32_t)(ticket / capacity_);
180 }; // LockFreeRingBuffer
183 template <typename T, template <typename> class Atom>
184 class RingBufferSlot {
186 explicit RingBufferSlot() noexcept
192 void write(const uint32_t turn, T& value) noexcept {
193 Atom<uint32_t> cutoff(0);
194 sequencer_.waitForTurn(turn * 2, cutoff, false);
196 // Change to an odd-numbered turn to indicate write in process
197 sequencer_.completeTurn(turn * 2);
199 data = std::move(value);
200 sequencer_.completeTurn(turn * 2 + 1);
204 bool waitAndTryRead(T& dest, uint32_t turn) noexcept {
205 uint32_t desired_turn = (turn + 1) * 2;
206 Atom<uint32_t> cutoff(0);
207 if (sequencer_.tryWaitForTurn(desired_turn, cutoff, false) !=
208 TurnSequencer<Atom>::TryWaitResult::SUCCESS) {
211 memcpy(&dest, &data, sizeof(T));
213 // if it's still the same turn, we read the value successfully
214 return sequencer_.isTurn(desired_turn);
217 bool tryRead(T& dest, uint32_t turn) noexcept {
218 // The write that started at turn 0 ended at turn 2
219 if (!sequencer_.isTurn((turn + 1) * 2)) {
222 memcpy(&dest, &data, sizeof(T));
224 // if it's still the same turn, we read the value successfully
225 return sequencer_.isTurn((turn + 1) * 2);
229 TurnSequencer<Atom> sequencer_;
233 } // namespace detail