2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/experimental/hazptr/hazptr.h>
20 #include <folly/lang/Launder.h>
21 #include <folly/synchronization/SaturatingSemaphore.h>
23 #include <glog/logging.h>
35 template <typename> class Atom>
36 inline UnboundedQueue<
42 Atom>::UnboundedQueue() {
45 auto s = new Segment(0);
59 template <typename> class Atom>
60 inline UnboundedQueue<
66 Atom>::~UnboundedQueue() {
68 for (auto s = head(); s; s = next) {
69 next = s->nextSegment();
73 s->retire(); // hazptr
86 template <typename> class Atom>
87 FOLLY_ALWAYS_INLINE void UnboundedQueue<
93 Atom>::dequeue(T& item) noexcept {
96 dequeueCommon(s, item);
98 // Using hazptr_holder instead of hazptr_local because it is
99 // possible to call ~T() and it may happen to use hazard pointers.
100 folly::hazptr::hazptr_holder hptr;
101 auto s = hptr.get_protected(head_);
102 dequeueCommon(s, item);
106 /* try_dequeue_until */
113 size_t LgSegmentSize,
114 template <typename> class Atom>
115 template <typename Clock, typename Duration>
116 FOLLY_ALWAYS_INLINE bool UnboundedQueue<
125 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
126 if (SingleConsumer) {
128 return singleConsumerTryDequeueUntil(s, item, deadline);
130 // Using hazptr_holder instead of hazptr_local because it is
131 // possible to call ~T() and it may happen to use hazard pointers.
132 folly::hazptr::hazptr_holder hptr;
133 auto s = hptr.get_protected(head_);
134 return multiConsumerTryDequeueUntil(s, item, deadline);
145 size_t LgSegmentSize,
146 template <typename> class Atom>
147 template <typename Arg>
148 FOLLY_ALWAYS_INLINE void UnboundedQueue<
154 Atom>::enqueueImpl(Arg&& arg) {
157 enqueueCommon(s, std::forward<Arg>(arg));
159 // Using hazptr_holder instead of hazptr_local because it is
160 // possible that the T construcctor happens to use hazard
162 folly::hazptr::hazptr_holder hptr;
163 auto s = hptr.get_protected(tail_);
164 enqueueCommon(s, std::forward<Arg>(arg));
175 size_t LgSegmentSize,
176 template <typename> class Atom>
177 template <typename Arg>
178 FOLLY_ALWAYS_INLINE void UnboundedQueue<
184 Atom>::enqueueCommon(Segment* s, Arg&& arg) {
185 auto t = fetchIncrementProducerTicket();
186 if (!SingleProducer) {
187 s = findSegment(s, t);
189 DCHECK_GE(t, s->minTicket());
190 DCHECK_LT(t, (s->minTicket() + SegmentSize));
191 size_t idx = index(t);
192 Entry& e = s->entry(idx);
193 e.putItem(std::forward<Arg>(arg));
194 if (responsibleForAlloc(t)) {
195 allocNextSegment(s, t + SegmentSize);
197 if (responsibleForAdvance(t)) {
209 size_t LgSegmentSize,
210 template <typename> class Atom>
211 FOLLY_ALWAYS_INLINE void UnboundedQueue<
217 Atom>::dequeueCommon(Segment* s, T& item) noexcept {
218 auto t = fetchIncrementConsumerTicket();
219 if (!SingleConsumer) {
220 s = findSegment(s, t);
222 size_t idx = index(t);
223 Entry& e = s->entry(idx);
225 if (responsibleForAdvance(t)) {
230 /* singleConsumerTryDequeueUntil */
237 size_t LgSegmentSize,
238 template <typename> class Atom>
239 template <typename Clock, typename Duration>
240 FOLLY_ALWAYS_INLINE bool UnboundedQueue<
247 singleConsumerTryDequeueUntil(
250 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
251 auto t = consumerTicket();
252 DCHECK_GE(t, s->minTicket());
253 DCHECK_LT(t, (s->minTicket() + SegmentSize));
254 size_t idx = index(t);
255 Entry& e = s->entry(idx);
256 if (!e.tryWaitUntil(deadline)) {
259 setConsumerTicket(t + 1);
261 if (responsibleForAdvance(t)) {
267 /* multiConsumerTryDequeueUntil */
274 size_t LgSegmentSize,
275 template <typename> class Atom>
276 template <typename Clock, typename Duration>
277 FOLLY_ALWAYS_INLINE bool UnboundedQueue<
284 multiConsumerTryDequeueUntil(
287 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
289 auto t = consumerTicket();
290 if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
292 // Note that the following loop will not spin indefinitely (as
293 // long as the number of concurrently waiting consumers is not
294 // greater than SegmentSize). The algorithm guarantees in such a
295 // case that the producer reponsible for setting the next
296 // pointer is already running.
297 while ((next = s->nextSegment()) == nullptr) {
298 if (Clock::now() > deadline) {
301 asm_volatile_pause();
304 DCHECK(s != nullptr);
307 size_t idx = index(t);
308 Entry& e = s->entry(idx);
309 if (!e.tryWaitUntil(deadline)) {
312 if (!consumerTicket_.compare_exchange_weak(
313 t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
317 if (responsibleForAdvance(t)) {
331 size_t LgSegmentSize,
332 template <typename> class Atom>
333 inline typename UnboundedQueue<
346 Atom>::findSegment(Segment* s, const Ticket t) const noexcept {
347 while (t >= (s->minTicket() + SegmentSize)) {
348 Segment* next = s->nextSegment();
349 // Note that the following loop will not spin indefinitely. The
350 // algorithm guarantees that the producer reponsible for setting
351 // the next pointer is already running.
352 while (next == nullptr) {
353 asm_volatile_pause();
354 next = s->nextSegment();
356 DCHECK(next != nullptr);
362 /* allocNextSegment */
369 size_t LgSegmentSize,
370 template <typename> class Atom>
371 inline void UnboundedQueue<
377 Atom>::allocNextSegment(Segment* s, const Ticket t) {
378 auto next = new Segment(t);
380 next->acquire_ref_safe(); // hazptr
382 DEBUG_PRINT(s << " " << next);
383 DCHECK(s->nextSegment() == nullptr);
384 s->setNextSegment(next);
394 size_t LgSegmentSize,
395 template <typename> class Atom>
396 inline void UnboundedQueue<
402 Atom>::advanceTail(Segment* s) noexcept {
403 Segment* next = s->nextSegment();
404 if (!SingleProducer) {
405 // Note that the following loop will not spin indefinitely. The
406 // algorithm guarantees that the producer reponsible for setting
407 // the next pointer is already running.
408 while (next == nullptr) {
409 asm_volatile_pause();
410 next = s->nextSegment();
413 DCHECK(next != nullptr);
414 DEBUG_PRINT(s << " " << next);
425 size_t LgSegmentSize,
426 template <typename> class Atom>
427 inline void UnboundedQueue<
433 Atom>::advanceHead(Segment* s) noexcept {
434 // Note that the following loops will not spin indefinitely. The
435 // algorithm guarantees that the producers reponsible for advancing
436 // the tail pointer and setting the next pointer are already
438 while (tail() == s) {
439 asm_volatile_pause();
441 auto next = s->nextSegment();
442 while (next == nullptr) {
443 next = s->nextSegment();
445 DEBUG_PRINT(s << " " << next);
450 s->retire(); // hazptr
463 size_t LgSegmentSize,
464 template <typename> class Atom>
465 class UnboundedQueue<
472 folly::SaturatingSemaphore<MayBlock, Atom> flag_;
473 typename std::aligned_storage<sizeof(T), alignof(T)>::type item_;
476 template <typename Arg>
477 FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
478 new (&item_) T(std::forward<Arg>(arg));
482 FOLLY_ALWAYS_INLINE void takeItem(T& item) noexcept {
487 template <typename Clock, typename Duration>
488 FOLLY_ALWAYS_INLINE bool tryWaitUntil(
489 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
490 return flag_.try_wait_until(deadline);
494 FOLLY_ALWAYS_INLINE void getItem(T& item) noexcept {
495 item = std::move(*(folly::launder(itemPtr())));
499 FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
500 return static_cast<T*>(static_cast<void*>(&item_));
503 FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
506 }; // UnboundedQueue::Entry
517 size_t LgSegmentSize,
518 template <typename> class Atom>
519 class UnboundedQueue<
525 Atom>::Segment : public folly::hazptr::hazptr_obj_base_refcounted<Segment> {
526 Atom<Segment*> next_;
528 bool marked_; // used for iterative deletion
529 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
530 Entry b_[SegmentSize];
533 explicit Segment(const Ticket t) : next_(nullptr), min_(t), marked_(false) {}
536 if (!SPSC && !marked_) {
537 auto next = nextSegment();
539 if (!next->release_ref()) { // hazptr
543 next = s->nextSegment();
550 Segment* nextSegment() const noexcept {
551 return next_.load(std::memory_order_acquire);
554 void setNextSegment(Segment* s) noexcept {
555 next_.store(s, std::memory_order_release);
558 FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
559 DCHECK_EQ((min_ & (SegmentSize - 1)), 0);
563 FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
566 }; // UnboundedQueue::Segment