logging: rename the `DEBUG` log level to `DBG`
[folly.git] / folly / experimental / LockFreeRingBuffer.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <cmath>
21 #include <cstring>
22 #include <memory>
23 #include <type_traits>
24
25 #include <boost/noncopyable.hpp>
26
27 #include <folly/Portability.h>
28 #include <folly/detail/TurnSequencer.h>
29 #include <folly/portability/TypeTraits.h>
30 #include <folly/portability/Unistd.h>
31
32 namespace folly {
33 namespace detail {
34
35 template <typename T, template <typename> class Atom>
36 class RingBufferSlot;
37 } // namespace detail
38
39 /// LockFreeRingBuffer<T> is a fixed-size, concurrent ring buffer with the
40 /// following semantics:
41 ///
42 ///  1. Writers cannot block on other writers UNLESS they are <capacity> writes
43 ///     apart from each other (writing to the same slot after a wrap-around)
44 ///  2. Writers cannot block on readers
45 ///  3. Readers can wait for writes that haven't occurred yet
46 ///  4. Readers can detect if they are lagging behind
47 ///
48 /// In this sense, reads from this buffer are best-effort but writes
49 /// are guaranteed.
50 ///
51 /// Another way to think about this is as an unbounded stream of writes. The
52 /// buffer contains the last <capacity> writes but readers can attempt to read
53 /// any part of the stream, even outside this window. The read API takes a
54 /// Cursor that can point anywhere in this stream of writes. Reads from the
55 /// "future" can optionally block but reads from the "past" will always fail.
56 ///
57
58 template <typename T, template <typename> class Atom = std::atomic>
59 class LockFreeRingBuffer: boost::noncopyable {
60   static_assert(
61       std::is_nothrow_default_constructible<T>::value,
62       "Element type must be nothrow default constructible");
63
64   static_assert(
65       FOLLY_IS_TRIVIALLY_COPYABLE(T),
66       "Element type must be trivially copyable");
67
68  public:
69   /// Opaque pointer to a past or future write.
70   /// Can be moved relative to its current location but not in absolute terms.
71   struct Cursor {
72     explicit Cursor(uint64_t initialTicket) noexcept : ticket(initialTicket) {}
73
74     /// Returns true if this cursor now points to a different
75     /// write, false otherwise.
76     bool moveForward(uint64_t steps = 1) noexcept {
77       uint64_t prevTicket = ticket;
78       ticket += steps;
79       return prevTicket != ticket;
80     }
81
82     /// Returns true if this cursor now points to a previous
83     /// write, false otherwise.
84     bool moveBackward(uint64_t steps = 1) noexcept {
85       uint64_t prevTicket = ticket;
86       if (steps > ticket) {
87         ticket = 0;
88       } else {
89         ticket -= steps;
90       }
91       return prevTicket != ticket;
92     }
93
94    protected: // for test visibility reasons
95     uint64_t ticket;
96     friend class LockFreeRingBuffer;
97   };
98
99   explicit LockFreeRingBuffer(uint32_t capacity) noexcept
100     : capacity_(capacity)
101     , slots_(new detail::RingBufferSlot<T,Atom>[capacity])
102     , ticket_(0)
103   {}
104
105   /// Perform a single write of an object of type T.
106   /// Writes can block iff a previous writer has not yet completed a write
107   /// for the same slot (before the most recent wrap-around).
108   void write(T& value) noexcept {
109     uint64_t ticket = ticket_.fetch_add(1);
110     slots_[idx(ticket)].write(turn(ticket), value);
111   }
112
113   /// Perform a single write of an object of type T.
114   /// Writes can block iff a previous writer has not yet completed a write
115   /// for the same slot (before the most recent wrap-around).
116   /// Returns a Cursor pointing to the just-written T.
117   Cursor writeAndGetCursor(T& value) noexcept {
118     uint64_t ticket = ticket_.fetch_add(1);
119     slots_[idx(ticket)].write(turn(ticket), value);
120     return Cursor(ticket);
121   }
122
123   /// Read the value at the cursor.
124   /// Returns true if the read succeeded, false otherwise. If the return
125   /// value is false, dest is to be considered partially read and in an
126   /// inconsistent state. Readers are advised to discard it.
127   bool tryRead(T& dest, const Cursor& cursor) noexcept {
128     return slots_[idx(cursor.ticket)].tryRead(dest, turn(cursor.ticket));
129   }
130
131   /// Read the value at the cursor or block if the write has not occurred yet.
132   /// Returns true if the read succeeded, false otherwise. If the return
133   /// value is false, dest is to be considered partially read and in an
134   /// inconsistent state. Readers are advised to discard it.
135   bool waitAndTryRead(T& dest, const Cursor& cursor) noexcept {
136     return slots_[idx(cursor.ticket)].waitAndTryRead(dest, turn(cursor.ticket));
137   }
138
139   /// Returns a Cursor pointing to the first write that has not occurred yet.
140   Cursor currentHead() noexcept {
141     return Cursor(ticket_.load());
142   }
143
144   /// Returns a Cursor pointing to a currently readable write.
145   /// skipFraction is a value in the [0, 1] range indicating how far into the
146   /// currently readable window to place the cursor. 0 means the
147   /// earliest readable write, 1 means the latest readable write (if any).
148   Cursor currentTail(double skipFraction = 0.0) noexcept {
149     assert(skipFraction >= 0.0 && skipFraction <= 1.0);
150     uint64_t ticket = ticket_.load();
151
152     uint64_t backStep = llround((1.0 - skipFraction) * capacity_);
153
154     // always try to move at least one step backward to something readable
155     backStep = std::max<uint64_t>(1, backStep);
156
157     // can't go back more steps than we've taken
158     backStep = std::min(ticket, backStep);
159
160     return Cursor(ticket - backStep);
161   }
162
163   ~LockFreeRingBuffer() {
164   }
165
166  private:
167   const uint32_t capacity_;
168
169   const std::unique_ptr<detail::RingBufferSlot<T,Atom>[]> slots_;
170
171   Atom<uint64_t> ticket_;
172
173   uint32_t idx(uint64_t ticket) noexcept {
174     return ticket % capacity_;
175   }
176
177   uint32_t turn(uint64_t ticket) noexcept {
178     return (uint32_t)(ticket / capacity_);
179   }
180 }; // LockFreeRingBuffer
181
182 namespace detail {
183 template <typename T, template <typename> class Atom>
184 class RingBufferSlot {
185  public:
186   explicit RingBufferSlot() noexcept
187     : sequencer_()
188     , data()
189   {
190   }
191
192   void write(const uint32_t turn, T& value) noexcept {
193     Atom<uint32_t> cutoff(0);
194     sequencer_.waitForTurn(turn * 2, cutoff, false);
195
196     // Change to an odd-numbered turn to indicate write in process
197     sequencer_.completeTurn(turn * 2);
198
199     data = std::move(value);
200     sequencer_.completeTurn(turn * 2 + 1);
201     // At (turn + 1) * 2
202   }
203
204   bool waitAndTryRead(T& dest, uint32_t turn) noexcept {
205     uint32_t desired_turn = (turn + 1) * 2;
206     Atom<uint32_t> cutoff(0);
207     if (sequencer_.tryWaitForTurn(desired_turn, cutoff, false) !=
208         TurnSequencer<Atom>::TryWaitResult::SUCCESS) {
209       return false;
210     }
211     memcpy(&dest, &data, sizeof(T));
212
213     // if it's still the same turn, we read the value successfully
214     return sequencer_.isTurn(desired_turn);
215   }
216
217   bool tryRead(T& dest, uint32_t turn) noexcept {
218     // The write that started at turn 0 ended at turn 2
219     if (!sequencer_.isTurn((turn + 1) * 2)) {
220       return false;
221     }
222     memcpy(&dest, &data, sizeof(T));
223
224     // if it's still the same turn, we read the value successfully
225     return sequencer_.isTurn((turn + 1) * 2);
226   }
227
228  private:
229   TurnSequencer<Atom> sequencer_;
230   T data;
231 }; // RingBufferSlot
232
233 } // namespace detail
234
235 } // namespace folly