Move folly/Bits.h to folly/lang/
[folly.git] / folly / concurrency / UnboundedQueue-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/experimental/hazptr/hazptr.h>
20 #include <folly/lang/Launder.h>
21 #include <folly/synchronization/SaturatingSemaphore.h>
22
23 #include <glog/logging.h>
24
25 namespace folly {
26
27 /* constructor */
28
29 template <
30     typename T,
31     bool SingleProducer,
32     bool SingleConsumer,
33     bool MayBlock,
34     size_t LgSegmentSize,
35     template <typename> class Atom>
36 inline UnboundedQueue<
37     T,
38     SingleProducer,
39     SingleConsumer,
40     MayBlock,
41     LgSegmentSize,
42     Atom>::UnboundedQueue() {
43   setProducerTicket(0);
44   setConsumerTicket(0);
45   auto s = new Segment(0);
46   DEBUG_PRINT(s);
47   setTail(s);
48   setHead(s);
49 }
50
51 /* destructor */
52
53 template <
54     typename T,
55     bool SingleProducer,
56     bool SingleConsumer,
57     bool MayBlock,
58     size_t LgSegmentSize,
59     template <typename> class Atom>
60 inline UnboundedQueue<
61     T,
62     SingleProducer,
63     SingleConsumer,
64     MayBlock,
65     LgSegmentSize,
66     Atom>::~UnboundedQueue() {
67   Segment* next;
68   for (auto s = head(); s; s = next) {
69     next = s->nextSegment();
70     if (SPSC) {
71       delete s;
72     } else {
73       s->retire(); // hazptr
74     }
75   }
76 }
77
78 /* dequeue */
79
80 template <
81     typename T,
82     bool SingleProducer,
83     bool SingleConsumer,
84     bool MayBlock,
85     size_t LgSegmentSize,
86     template <typename> class Atom>
87 FOLLY_ALWAYS_INLINE void UnboundedQueue<
88     T,
89     SingleProducer,
90     SingleConsumer,
91     MayBlock,
92     LgSegmentSize,
93     Atom>::dequeue(T& item) noexcept {
94   if (SPSC) {
95     auto s = head();
96     dequeueCommon(s, item);
97   } else {
98     // Using hazptr_holder instead of hazptr_local because it is
99     // possible to call ~T() and it may happen to use hazard pointers.
100     folly::hazptr::hazptr_holder hptr;
101     auto s = hptr.get_protected(head_);
102     dequeueCommon(s, item);
103   }
104 }
105
106 /* try_dequeue_until */
107
108 template <
109     typename T,
110     bool SingleProducer,
111     bool SingleConsumer,
112     bool MayBlock,
113     size_t LgSegmentSize,
114     template <typename> class Atom>
115 template <typename Clock, typename Duration>
116 FOLLY_ALWAYS_INLINE bool UnboundedQueue<
117     T,
118     SingleProducer,
119     SingleConsumer,
120     MayBlock,
121     LgSegmentSize,
122     Atom>::
123     try_dequeue_until(
124         T& item,
125         const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
126   if (SingleConsumer) {
127     auto s = head();
128     return singleConsumerTryDequeueUntil(s, item, deadline);
129   } else {
130     // Using hazptr_holder instead of hazptr_local because it is
131     // possible to call ~T() and it may happen to use hazard pointers.
132     folly::hazptr::hazptr_holder hptr;
133     auto s = hptr.get_protected(head_);
134     return multiConsumerTryDequeueUntil(s, item, deadline);
135   }
136 }
137
138 /* enqueueImpl */
139
140 template <
141     typename T,
142     bool SingleProducer,
143     bool SingleConsumer,
144     bool MayBlock,
145     size_t LgSegmentSize,
146     template <typename> class Atom>
147 template <typename Arg>
148 FOLLY_ALWAYS_INLINE void UnboundedQueue<
149     T,
150     SingleProducer,
151     SingleConsumer,
152     MayBlock,
153     LgSegmentSize,
154     Atom>::enqueueImpl(Arg&& arg) {
155   if (SPSC) {
156     auto s = tail();
157     enqueueCommon(s, std::forward<Arg>(arg));
158   } else {
159     // Using hazptr_holder instead of hazptr_local because it is
160     // possible that the T construcctor happens to use hazard
161     // pointers.
162     folly::hazptr::hazptr_holder hptr;
163     auto s = hptr.get_protected(tail_);
164     enqueueCommon(s, std::forward<Arg>(arg));
165   }
166 }
167
168 /* enqueueCommon */
169
170 template <
171     typename T,
172     bool SingleProducer,
173     bool SingleConsumer,
174     bool MayBlock,
175     size_t LgSegmentSize,
176     template <typename> class Atom>
177 template <typename Arg>
178 FOLLY_ALWAYS_INLINE void UnboundedQueue<
179     T,
180     SingleProducer,
181     SingleConsumer,
182     MayBlock,
183     LgSegmentSize,
184     Atom>::enqueueCommon(Segment* s, Arg&& arg) {
185   auto t = fetchIncrementProducerTicket();
186   if (!SingleProducer) {
187     s = findSegment(s, t);
188   }
189   DCHECK_GE(t, s->minTicket());
190   DCHECK_LT(t, (s->minTicket() + SegmentSize));
191   size_t idx = index(t);
192   Entry& e = s->entry(idx);
193   e.putItem(std::forward<Arg>(arg));
194   if (responsibleForAlloc(t)) {
195     allocNextSegment(s, t + SegmentSize);
196   }
197   if (responsibleForAdvance(t)) {
198     advanceTail(s);
199   }
200 }
201
202 /* dequeueCommon */
203
204 template <
205     typename T,
206     bool SingleProducer,
207     bool SingleConsumer,
208     bool MayBlock,
209     size_t LgSegmentSize,
210     template <typename> class Atom>
211 FOLLY_ALWAYS_INLINE void UnboundedQueue<
212     T,
213     SingleProducer,
214     SingleConsumer,
215     MayBlock,
216     LgSegmentSize,
217     Atom>::dequeueCommon(Segment* s, T& item) noexcept {
218   auto t = fetchIncrementConsumerTicket();
219   if (!SingleConsumer) {
220     s = findSegment(s, t);
221   }
222   size_t idx = index(t);
223   Entry& e = s->entry(idx);
224   e.takeItem(item);
225   if (responsibleForAdvance(t)) {
226     advanceHead(s);
227   }
228 }
229
230 /* singleConsumerTryDequeueUntil */
231
232 template <
233     typename T,
234     bool SingleProducer,
235     bool SingleConsumer,
236     bool MayBlock,
237     size_t LgSegmentSize,
238     template <typename> class Atom>
239 template <typename Clock, typename Duration>
240 FOLLY_ALWAYS_INLINE bool UnboundedQueue<
241     T,
242     SingleProducer,
243     SingleConsumer,
244     MayBlock,
245     LgSegmentSize,
246     Atom>::
247     singleConsumerTryDequeueUntil(
248         Segment* s,
249         T& item,
250         const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
251   auto t = consumerTicket();
252   DCHECK_GE(t, s->minTicket());
253   DCHECK_LT(t, (s->minTicket() + SegmentSize));
254   size_t idx = index(t);
255   Entry& e = s->entry(idx);
256   if (!e.tryWaitUntil(deadline)) {
257     return false;
258   }
259   setConsumerTicket(t + 1);
260   e.takeItem(item);
261   if (responsibleForAdvance(t)) {
262     advanceHead(s);
263   }
264   return true;
265 }
266
267 /* multiConsumerTryDequeueUntil */
268
269 template <
270     typename T,
271     bool SingleProducer,
272     bool SingleConsumer,
273     bool MayBlock,
274     size_t LgSegmentSize,
275     template <typename> class Atom>
276 template <typename Clock, typename Duration>
277 FOLLY_ALWAYS_INLINE bool UnboundedQueue<
278     T,
279     SingleProducer,
280     SingleConsumer,
281     MayBlock,
282     LgSegmentSize,
283     Atom>::
284     multiConsumerTryDequeueUntil(
285         Segment* s,
286         T& item,
287         const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
288   while (true) {
289     auto t = consumerTicket();
290     if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
291       Segment* next;
292       // Note that the following loop will not spin indefinitely (as
293       // long as the number of concurrently waiting consumers is not
294       // greater than SegmentSize). The algorithm guarantees in such a
295       // case that the producer reponsible for setting the next
296       // pointer is already running.
297       while ((next = s->nextSegment()) == nullptr) {
298         if (Clock::now() > deadline) {
299           return false;
300         }
301         asm_volatile_pause();
302       }
303       s = next;
304       DCHECK(s != nullptr);
305       continue;
306     }
307     size_t idx = index(t);
308     Entry& e = s->entry(idx);
309     if (!e.tryWaitUntil(deadline)) {
310       return false;
311     }
312     if (!consumerTicket_.compare_exchange_weak(
313             t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
314       continue;
315     }
316     e.takeItem(item);
317     if (responsibleForAdvance(t)) {
318       advanceHead(s);
319     }
320     return true;
321   }
322 }
323
324 /* findSegment */
325
326 template <
327     typename T,
328     bool SingleProducer,
329     bool SingleConsumer,
330     bool MayBlock,
331     size_t LgSegmentSize,
332     template <typename> class Atom>
333 inline typename UnboundedQueue<
334     T,
335     SingleProducer,
336     SingleConsumer,
337     MayBlock,
338     LgSegmentSize,
339     Atom>::Segment*
340 UnboundedQueue<
341     T,
342     SingleProducer,
343     SingleConsumer,
344     MayBlock,
345     LgSegmentSize,
346     Atom>::findSegment(Segment* s, const Ticket t) const noexcept {
347   while (t >= (s->minTicket() + SegmentSize)) {
348     Segment* next = s->nextSegment();
349     // Note that the following loop will not spin indefinitely. The
350     // algorithm guarantees that the producer reponsible for setting
351     // the next pointer is already running.
352     while (next == nullptr) {
353       asm_volatile_pause();
354       next = s->nextSegment();
355     }
356     DCHECK(next != nullptr);
357     s = next;
358   }
359   return s;
360 }
361
362 /* allocNextSegment */
363
364 template <
365     typename T,
366     bool SingleProducer,
367     bool SingleConsumer,
368     bool MayBlock,
369     size_t LgSegmentSize,
370     template <typename> class Atom>
371 inline void UnboundedQueue<
372     T,
373     SingleProducer,
374     SingleConsumer,
375     MayBlock,
376     LgSegmentSize,
377     Atom>::allocNextSegment(Segment* s, const Ticket t) {
378   auto next = new Segment(t);
379   if (!SPSC) {
380     next->acquire_ref_safe(); // hazptr
381   }
382   DEBUG_PRINT(s << " " << next);
383   DCHECK(s->nextSegment() == nullptr);
384   s->setNextSegment(next);
385 }
386
387 /* advanceTail */
388
389 template <
390     typename T,
391     bool SingleProducer,
392     bool SingleConsumer,
393     bool MayBlock,
394     size_t LgSegmentSize,
395     template <typename> class Atom>
396 inline void UnboundedQueue<
397     T,
398     SingleProducer,
399     SingleConsumer,
400     MayBlock,
401     LgSegmentSize,
402     Atom>::advanceTail(Segment* s) noexcept {
403   Segment* next = s->nextSegment();
404   if (!SingleProducer) {
405     // Note that the following loop will not spin indefinitely. The
406     // algorithm guarantees that the producer reponsible for setting
407     // the next pointer is already running.
408     while (next == nullptr) {
409       asm_volatile_pause();
410       next = s->nextSegment();
411     }
412   }
413   DCHECK(next != nullptr);
414   DEBUG_PRINT(s << " " << next);
415   setTail(next);
416 }
417
418 /* advanceHead */
419
420 template <
421     typename T,
422     bool SingleProducer,
423     bool SingleConsumer,
424     bool MayBlock,
425     size_t LgSegmentSize,
426     template <typename> class Atom>
427 inline void UnboundedQueue<
428     T,
429     SingleProducer,
430     SingleConsumer,
431     MayBlock,
432     LgSegmentSize,
433     Atom>::advanceHead(Segment* s) noexcept {
434   // Note that the following loops will not spin indefinitely. The
435   // algorithm guarantees that the producers reponsible for advancing
436   // the tail pointer and setting the next pointer are already
437   // running.
438   while (tail() == s) {
439     asm_volatile_pause();
440   }
441   auto next = s->nextSegment();
442   while (next == nullptr) {
443     next = s->nextSegment();
444   }
445   DEBUG_PRINT(s << " " << next);
446   setHead(next);
447   if (SPSC) {
448     delete s;
449   } else {
450     s->retire(); // hazptr
451   }
452 }
453
454 /**
455  *  Entry
456  */
457
458 template <
459     typename T,
460     bool SingleProducer,
461     bool SingleConsumer,
462     bool MayBlock,
463     size_t LgSegmentSize,
464     template <typename> class Atom>
465 class UnboundedQueue<
466     T,
467     SingleProducer,
468     SingleConsumer,
469     MayBlock,
470     LgSegmentSize,
471     Atom>::Entry {
472   folly::SaturatingSemaphore<MayBlock, Atom> flag_;
473   typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
474
475  public:
476   template <typename Arg>
477   FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
478     new (&item_) T(std::forward<Arg>(arg));
479     flag_.post();
480   }
481
482   FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
483     flag_.wait();
484     getItem(item);
485   }
486
487   template <typename Clock, typename Duration>
488   FOLLY_ALWAYS_INLINE bool tryWaitUntil(
489       const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
490     return flag_.try_wait_until(deadline);
491   }
492
493  private:
494   FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
495     item = std::move(*(folly::launder(itemPtr())));
496     destroyItem();
497   }
498
499   FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
500     return static_cast<T*>(static_cast<void*>(&item_));
501   }
502
503   FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
504     itemPtr()->~T();
505   }
506 }; // UnboundedQueue::Entry
507
508 /**
509  *  Segment
510  */
511
512 template <
513     typename T,
514     bool SingleProducer,
515     bool SingleConsumer,
516     bool MayBlock,
517     size_t LgSegmentSize,
518     template <typename> class Atom>
519 class UnboundedQueue<
520     T,
521     SingleProducer,
522     SingleConsumer,
523     MayBlock,
524     LgSegmentSize,
525     Atom>::Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
526   Atom<Segment*> next_;
527   const Ticket min_;
528   bool marked_; // used for iterative deletion
529   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
530   Entry b_[SegmentSize];
531
532  public:
533   explicit Segment(const Ticket t) : next_(nullptr), min_(t), marked_(false) {}
534
535   ~Segment() {
536     if (!SPSC && !marked_) {
537       auto next = nextSegment();
538       while (next) {
539         if (!next->release_ref()) { // hazptr
540           return;
541         }
542         auto s = next;
543         next = s->nextSegment();
544         s->marked_ = true;
545         delete s;
546       }
547     }
548   }
549
550   Segment* nextSegment() const noexcept {
551     return next_.load(std::memory_order_acquire);
552   }
553
554   void setNextSegment(Segment* s) noexcept {
555     next_.store(s, std::memory_order_release);
556   }
557
558   FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
559     DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
560     return min_;
561   }
562
563   FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
564     return b_[index];
565   }
566 }; // UnboundedQueue::Segment
567
568 } // namespace folly