Consistency in namespace-closing comments
[folly.git] / folly / experimental / FlatCombiningPriorityQueue.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <chrono>
21 #include <memory>
22 #include <mutex>
23 #include <queue>
24
25 #include <folly/detail/Futex.h>
26 #include <folly/experimental/flat_combining/FlatCombining.h>
27 #include <glog/logging.h>
28
29 namespace folly {
30
31 /// Thread-safe priority queue based on flat combining. If the
32 /// constructor parameter maxSize is greater than 0 (default = 0),
33 /// then the queue is bounded. This template provides blocking,
34 /// non-blocking, and timed variants of each of push(), pop(), and
35 /// peek() operations. The empty() and size() functions are inherently
36 /// non-blocking.
37 ///
38 /// PriorityQueue must support the interface of std::priority_queue,
39 /// specifically empty(), size(), push(), top(), and pop().  Mutex
40 /// must meet the standard Lockable requirements.
41 ///
42 /// By default FlatCombining uses a dedicated combiner thread, which
43 /// yields better latency and throughput under high contention but
44 /// higher overheads under low contention. If the constructor
45 /// parameter dedicated is false, then there will be no dedicated
46 /// combiner thread and any requester may do combining of operations
47 /// requested by other threads. For more details see the comments for
48 /// FlatCombining.
49 ///
50 /// Usage examples:
51 /// @code
52 ///   FlatCombiningPriorityQueue<int> pq(1);
53 ///   CHECK(pq.empty());
54 ///   CHECK(pq.size() == 0);
55 ///   int v;
56 ///   CHECK(!tryPop(v));
57 ///   CHECK(!tryPop(v, now() + seconds(1)));
58 ///   CHECK(!tryPeek(v));
59 ///   CHECK(!tryPeek(v, now() + seconds(1)));
60 ///   pq.push(10);
61 ///   CHECK(!pq.empty());
62 ///   CHECK(pq.size() == 1);
63 ///   CHECK(!pq.tryPush(20));
64 ///   CHECK(!pq.tryPush(20), now() + seconds(1)));
65 ///   peek(v);
66 ///   CHECK_EQ(v, 10);
67 ///   CHECK(pq.size() == 1);
68 ///   pop(v);
69 ///   CHECK_EQ(v, 10);
70 ///   CHECK(pq.empty());
71 /// @encode
72
73 template <
74     typename T,
75     typename PriorityQueue = std::priority_queue<T>,
76     typename Mutex = std::mutex,
77     template <typename> class Atom = std::atomic>
78 class FlatCombiningPriorityQueue
79     : public folly::FlatCombining<
80           FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>,
81           Mutex,
82           Atom> {
83   using FCPQ = FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>;
84   using FC = folly::FlatCombining<FCPQ, Mutex, Atom>;
85
86  public:
87   template <
88       typename... PQArgs,
89       typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
90   explicit FlatCombiningPriorityQueue(
91       // Concurrent priority queue parameter
92       const size_t maxSize = 0,
93       // Flat combining parameters
94       const bool dedicated = true,
95       const uint32_t numRecs = 0,
96       const uint32_t maxOps = 0,
97       // (Sequential) PriorityQueue Parameters
98       PQArgs... args)
99       : FC(dedicated, numRecs, maxOps),
100         maxSize_(maxSize),
101         pq_(std::forward<PQArgs>(args)...) {}
102
103   /// Returns true iff the priority queue is empty
104   bool empty() const {
105     bool res;
106     auto fn = [&] { res = pq_.empty(); };
107     const_cast<FCPQ*>(this)->requestFC(fn);
108     return res;
109   }
110
111   /// Returns the number of items in the priority queue
112   size_t size() const {
113     size_t res;
114     auto fn = [&] { res = pq_.size(); };
115     const_cast<FCPQ*>(this)->requestFC(fn);
116     return res;
117   }
118
119   /// Non-blocking push. Succeeds if there is space in the priority
120   /// queue to insert the new item. Tries once if no time point is
121   /// provided or until the provided time_point is reached. If
122   /// successful, inserts the provided item in the priority queue
123   /// according to its priority.
124   template <class Clock = std::chrono::steady_clock>
125   bool tryPush(
126       const T& val,
127       const std::chrono::time_point<Clock>& when =
128           std::chrono::time_point<Clock>::min());
129
130   /// Non-blocking pop. Succeeds if the priority queue is
131   /// nonempty. Tries once if no time point is provided or until the
132   /// provided time_point is reached.  If successful, copies the
133   /// highest priority item and removes it from the priority queue.
134   template <class Clock = std::chrono::steady_clock>
135   bool tryPop(
136       T& val,
137       const std::chrono::time_point<Clock>& when =
138           std::chrono::time_point<Clock>::min());
139
140   /// Non-blocking peek. Succeeds if the priority queue is
141   /// nonempty. Tries once if no time point is provided or until the
142   /// provided time_point is reached.  If successful, copies the
143   /// highest priority item without removing it.
144   template <class Clock = std::chrono::steady_clock>
145   bool tryPeek(
146       T& val,
147       const std::chrono::time_point<Clock>& when =
148           std::chrono::time_point<Clock>::min());
149
150   /// Blocking push. Inserts the provided item in the priority
151   /// queue. If it is full, this function blocks until there is space
152   /// for the new item.
153   void push(const T& val) {
154     tryPush(val, std::chrono::time_point<std::chrono::steady_clock>::max());
155   }
156
157   /// Blocking pop. Copies the highest priority item and removes
158   /// it. If the priority queue is empty, this function blocks until
159   /// it is nonempty.
160   void pop(T& val) {
161     tryPop(val, std::chrono::time_point<std::chrono::steady_clock>::max());
162   }
163
164   /// Blocking peek. Copies the highest priority item without
165   /// removing it. If the priority queue is empty, this function
166   /// blocks until it is nonempty.
167   void peek(T& val) {
168     tryPeek(val, std::chrono::time_point<std::chrono::steady_clock>::max());
169   }
170
171  private:
172   size_t maxSize_;
173   PriorityQueue pq_;
174   detail::Futex<Atom> empty_;
175   detail::Futex<Atom> full_;
176
177   bool isTrue(detail::Futex<Atom>& futex) {
178     return futex.load(std::memory_order_relaxed) != 0;
179   }
180
181   void setFutex(detail::Futex<Atom>& futex, uint32_t val) {
182     futex.store(val, std::memory_order_relaxed);
183   }
184
185   bool futexSignal(detail::Futex<Atom>& futex) {
186     if (isTrue(futex)) {
187       setFutex(futex, 0);
188       return true;
189     } else {
190       return false;
191     }
192   }
193 };
194
195 /// Implementation
196
197 template <
198     typename T,
199     typename PriorityQueue,
200     typename Mutex,
201     template <typename> class Atom>
202 template <class Clock>
203 inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPush(
204     const T& val,
205     const std::chrono::time_point<Clock>& when) {
206   while (true) {
207     bool res;
208     bool wake;
209
210     auto fn = [&] {
211       if (maxSize_ > 0 && pq_.size() == maxSize_) {
212         setFutex(full_, 1);
213         res = false;
214         return;
215       }
216       DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
217       try {
218         pq_.push(val);
219         wake = futexSignal(empty_);
220         res = true;
221         return;
222       } catch (const std::bad_alloc&) {
223         setFutex(full_, 1);
224         res = false;
225         return;
226       }
227     };
228     this->requestFC(fn);
229
230     if (res) {
231       if (wake) {
232         empty_.futexWake();
233       }
234       return true;
235     }
236     if (when == std::chrono::time_point<Clock>::min()) {
237       return false;
238     }
239     while (isTrue(full_)) {
240       if (when == std::chrono::time_point<Clock>::max()) {
241         full_.futexWait(1);
242       } else {
243         if (Clock::now() > when) {
244           return false;
245         } else {
246           full_.futexWaitUntil(1, when);
247         }
248       }
249     } // inner while loop
250   } // outer while loop
251 }
252
253 template <
254     typename T,
255     typename PriorityQueue,
256     typename Mutex,
257     template <typename> class Atom>
258 template <class Clock>
259 inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPop(
260     T& val,
261     const std::chrono::time_point<Clock>& when) {
262   while (true) {
263     bool res;
264     bool wake;
265
266     auto fn = [&] {
267       res = !pq_.empty();
268       if (res) {
269         val = pq_.top();
270         pq_.pop();
271         wake = futexSignal(full_);
272       } else {
273         setFutex(empty_, 1);
274       }
275     };
276     this->requestFC(fn);
277
278     if (res) {
279       if (wake) {
280         full_.futexWake();
281       }
282       return true;
283     }
284     while (isTrue(empty_)) {
285       if (when == std::chrono::time_point<Clock>::max()) {
286         empty_.futexWait(1);
287       } else {
288         if (Clock::now() > when) {
289           return false;
290         } else {
291           empty_.futexWaitUntil(1, when);
292         }
293       }
294     } // inner while loop
295   } // outer while loop
296 }
297
298 template <
299     typename T,
300     typename PriorityQueue,
301     typename Mutex,
302     template <typename> class Atom>
303 template <class Clock>
304 inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPeek(
305     T& val,
306     const std::chrono::time_point<Clock>& when) {
307   while (true) {
308     bool res;
309
310     auto fn = [&] {
311       res = !pq_.empty();
312       if (res) {
313         val = pq_.top();
314       } else {
315         setFutex(empty_, 1);
316       }
317     };
318     this->requestFC(fn);
319
320     if (res) {
321       return true;
322     }
323     while (isTrue(empty_)) {
324       if (when == std::chrono::time_point<Clock>::max()) {
325         empty_.futexWait(1);
326       } else {
327         if (Clock::now() > when) {
328           return false;
329         } else {
330           empty_.futexWaitUntil(1, when);
331         }
332       }
333     } // inner while loop
334   } // outer while loop
335 }
336
337 } // namespace folly