From 17c08956e0c46684e3a3a9d3b5e9bc733a09c0ea Mon Sep 17 00:00:00 2001 From: Maged Michael Date: Thu, 14 Dec 2017 10:33:06 -0800 Subject: [PATCH] Dynamic Bounded Queue Summary: DynamicBoundedQueue supports: - Dynamic memory usage that grows and shrink in proportion to the number of elements in the queue. - Adjustable capacity that helps throttle pathological cases of producer-consumer imbalance that may lead to excessive memory usage. - The adjustable capacity can also help prevent deadlock by allowing users to temporarily increase capacity substantially to guarantee accommodating producer requests that cannot wait. - SPSC, SPMC, MPSC, MPMC variants. - Blocking and spinning-only variants. - Inter-operable non-waiting, timed until, timed for, and waiting variants of producer and consumer operations. - Optional variable element weights. Reviewed By: djwatson Differential Revision: D6462123 fbshipit-source-id: 5c7a7d88c3fa273211702af9ff78f6302afd2276 --- folly/concurrency/DynamicBoundedQueue.h | 739 ++++++++++++ .../test/DynamicBoundedQueueTest.cpp | 1017 +++++++++++++++++ 2 files changed, 1756 insertions(+) create mode 100644 folly/concurrency/DynamicBoundedQueue.h create mode 100644 folly/concurrency/test/DynamicBoundedQueueTest.cpp diff --git a/folly/concurrency/DynamicBoundedQueue.h b/folly/concurrency/DynamicBoundedQueue.h new file mode 100644 index 00000000..8e7e839f --- /dev/null +++ b/folly/concurrency/DynamicBoundedQueue.h @@ -0,0 +1,739 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +#include +#include + +namespace folly { + +/// DynamicBoundedQueue supports: + +/// - Dynamic memory usage that grows and shrink in proportion to the +/// number of elements in the queue. +/// - Adjustable capacity that helps throttle pathological cases of +/// producer-consumer imbalance that may lead to excessive memory +/// usage. +/// - The adjustable capacity can also help prevent deadlock by +/// allowing users to temporarily increase capacity substantially to +/// guarantee accommodating producer requests that cannot wait. +/// - SPSC, SPMC, MPSC, MPMC variants. +/// - Blocking and spinning-only variants. +/// - Inter-operable non-waiting, timed until, timed for, and waiting +/// variants of producer and consumer operations. +/// - Optional variable element weights. +/// +/// Element Weights +/// - Queue elements may have variable weights (calculated using a +/// template parameter) that are by default 1. +/// - Element weights count towards the queue's capacity. +/// - Elements weights are not priorities and do not affect element +/// order. Queues with variable element weights follow FIFO order, +/// the same as default queues. +/// +/// When to use DynamicBoundedQueue: +/// - If a small maximum capacity may lead to deadlock or performance +/// degradation under bursty patterns and a larger capacity is +/// sufficient. +/// - If the typical queue size is expected to be much lower than the +/// maximum capacity +/// - If an unbounded queue is susceptible to growing too much. +/// - If support for variable element weights is needed. +/// +/// When not to use DynamicBoundedQueue? +/// - If dynamic memory allocation is unacceptable or if the maximum +/// capacity needs to be small, then use fixed-size MPMCQueue or (if +/// non-blocking SPSC) ProducerConsumerQueue. +/// - If there is no risk of the queue growing too much, then use +/// UnboundedQueue. +/// +/// Setting capacity +/// - The general rule is to set the capacity as high as acceptable. +/// The queue performs best when it is not near full capacity. +/// - The implementation may allow extra slack in capacity (~10%) for +/// amortizing some costly steps. Therefore, precise capacity is not +/// guaranteed and cannot be relied on for synchronization; i.e., +/// this queue cannot be used as a semaphore. +/// +/// Performance expectations: +/// - As long as the queue size is below capacity in the common case, +/// performance is comparable to MPMCQueue and better in cases of +/// higher producer demand. +/// - Performance degrades gracefully at full capacity. +/// - It is recommended to measure performance with different variants +/// when applicable, e.g., DMPMC vs DMPSC. Depending on the use +/// case, sometimes the variant with the higher sequential overhead +/// may yield better results due to, for example, more favorable +/// producer-consumer balance or favorable timing for avoiding +/// costly blocking. +/// - See DynamicBoundedQueueTest.cpp for some benchmark results. +/// +/// Template prameters: +/// - T: element type +/// - SingleProducer: true if there can be only one producer at a +/// time. +/// - SingleConsumer: true if there can be only one consumer at a +/// time. +/// - MayBlock: true if producers or consumers may block. +/// - LgSegmentSize (default 8): Log base 2 of number of elements per +/// UnboundedQueue segment. +/// - LgAlign (default 7): Log base 2 of alignment directive; can be +/// used to balance scalability (avoidance of false sharing) with +/// memory efficiency. +/// - WeightFn (DefaultWeightFn): A customizable weight computing type +/// for computing the weights of elements. The default weight is 1. +/// +/// Template Aliases: +/// DSPSCQueue +/// DMPSCQueue +/// DSPMCQueue +/// DMPMCQueue +/// +/// Functions: +/// Constructor +/// Takes a capacity value as an argument. +/// +/// Producer functions: +/// void enqueue(const T&); +/// void enqueue(T&&); +/// Adds an element to the end of the queue. Waits until +/// capacity is available if necessary. +/// bool try_enqueue(const T&); +/// bool try_enqueue(T&&); +/// Tries to add an element to the end of the queue if +/// capacity allows it. Returns true if successful. Otherwise +/// Returns false. +/// bool try_enqueue_until(const T&, time_point&); +/// bool try_enqueue_until(T&&, time_point&); +/// Tries to add an element to the end of the queue if +/// capacity allows it until the specified timepoint. Returns +/// true if successful, otherwise false. +/// bool try_enqueue_for(const T&, duration&); +/// bool try_enqueue_for(T&&, duration&); +/// Tries to add an element to the end of the queue if +/// capacity allows it until the specified timepoint. Returns +/// true if successful, otherwise false. +/// +/// Consumer functions: +/// void dequeue(T&); +/// Extracts an element from the front of the queue. Waits +/// until an element is available if necessary. +/// bool try_dequeue(T&); +/// Tries to extracts an element from the front of the queue +/// if available. Returns true if successful, otherwise false. +/// bool try_dequeue_until(T&, time_point&); +/// Tries to extracts an element from the front of the queue +/// if available until the specified time_point. Returns true +/// if successful. Otherwise Returns false. +/// bool try_dequeue_for(T&, duration&); +/// Tries to extracts an element from the front of the queue +/// if available until the expiration of the specified +/// duration. Returns true if successful. Otherwise Returns +/// false. +/// +/// Secondary functions: +/// void reset_capacity(size_t capacity); +/// Changes the capacity of the queue. Does not affect the +/// current contents of the queue. Guaranteed only to affect +/// subsequent enqueue operations. May or may not affect +/// concurrent operations. Capacity must be at least 1000. +/// Weight weight(); +/// Returns an estimate of the total weight of the elements in +/// the queue. +/// size_t size(); +/// Returns an estimate of the total number of elements. +/// bool empty(); +/// Returns true only if the queue was empty during the call. +/// Note: weight(), size(), and empty() are guaranteed to be +/// accurate only if there are no concurrent changes to the queue. +/// +/// Usage example with default weight: +/// @code +/// /* DMPSC, doesn't block, 1024 int elements per segment */ +/// DMPSCQueue q(100000); +/// ASSERT_TRUE(q.empty()); +/// ASSERT_EQ(q.size(), 0); +/// q.enqueue(1)); +/// ASSERT_TRUE(q.try_enqueue(2)); +/// ASSERT_TRUE(q.try_enqueue_until(3, deadline)); +/// ASSERT_TRUE(q.try_enqueue(4, duration)); +/// // ... enqueue more elements until capacity is full +/// // See above comments about imprecise capacity guarantees +/// ASSERT_FALSE(q.try_enqueue(100001)); // can't enqueue but can't wait +/// size_t sz = q.size(); +/// ASSERT_GE(sz, 100000); +/// q.reset_capacity(1000000000); // set huge capacity +/// ASSERT_TRUE(q.try_enqueue(100001)); // now enqueue succeeds +/// q.reset_capacity(100000); // set capacity back to 100,000 +/// ASSERT_FALSE(q.try_enqueue(100002)); +/// ASSERT_EQ(q.size(), sz + 1); +/// int v; +/// q.dequeue(v); +/// ASSERT_EQ(v, 1); +/// ASSERT_TRUE(q.try_dequeue(v)); +/// ASSERT_EQ(v, 2); +/// ASSERT_TRUE(q.try_dequeue_until(v, deadline)); +/// ASSERT_EQ(v, 3); +/// ASSERT_TRUE(q.try_dequeue_for(v, duration)); +/// ASSERT_EQ(v, 4); +/// ASSERT_EQ(q.size(), sz - 3); +/// @endcode +/// +/// Usage example with custom weights: +/// @code +/// struct CustomWeightFn { +/// uint64_t operator()(int val) { return val / 100; } +/// }; +/// DMPMCQueue q(20); +/// ASSERT_TRUE(q.empty()); +/// q.enqueue(100); +/// ASSERT_TRUE(q.try_enqueue(200)); +/// ASSERT_TRUE(q.try_enqueue_until(500, now() + seconds(1))); +/// ASSERT_EQ(q.size(), 3); +/// ASSERT_EQ(q.weight(), 8); +/// ASSERT_FALSE(q.try_enqueue_for(1700, microseconds(1))); +/// q.reset_capacity(1000000); // set capacity to 1000000 instead of 20 +/// ASSERT_TRUE(q.try_enqueue_for(1700, microseconds(1))); +/// q.reset_capacity(20); // set capacity to 20 again +/// ASSERT_FALSE(q.try_enqueue(100)); +/// ASSERT_EQ(q.size(), 4); +/// ASSERT_EQ(q.weight(), 25); +/// int v; +/// q.dequeue(v); +/// ASSERT_EQ(v, 100); +/// ASSERT_TRUE(q.try_dequeue(v)); +/// ASSERT_EQ(v, 200); +/// ASSERT_TRUE(q.try_dequeue_until(v, now() + seconds(1))); +/// ASSERT_EQ(v, 500); +/// ASSERT_EQ(q.size(), 1); +/// ASSERT_EQ(q.weight(), 17); +/// @endcode +/// +/// Design: +/// - The implementation is on top of UnboundedQueue. +/// - The main FIFO functionality is in UnboundedQueue. +/// DynamicBoundedQueue manages keeping the total queue weight +/// within the specified capacity. +/// - For the sake of scalability, the data structures are designed to +/// minimize interference between producers on one side and +/// consumers on the other. +/// - Producers add to a debit variable the weight of the added +/// element and check capacity. +/// - Consumers add to a credit variable the weight of the removed +/// element. +/// - Producers, for the sake of scalability, use fetch_add to add to +/// the debit variable and subtract if it exceeded capacity, +/// rather than using compare_exchange to avoid overshooting. +/// - Consumers, infrequently, transfer credit to a transfer variable +/// and unblock any blocked producers. The transfer variable can be +/// used by producers to decrease their debit when needed. +/// - Note that a low capacity will trigger frequent credit transfer +/// by consumers that may degrade performance. Capacity should not +/// be set too low. +/// - Transfer of credit by consumers is triggered when the amount of +/// credit reaches a threshold (1/10 of capacity). +/// - The waiting of consumers is handled in UnboundedQueue. +/// The waiting of producers is handled in this template. +/// - For a producer operation, if the difference between debit and +/// capacity (plus some slack to account for the transfer threshold) +/// does not accommodate the weight of the new element, it first +/// tries to transfer credit that may have already been made +/// available by consumers. If this is insufficient and MayBlock is +/// true, then the producer uses a futex to block until new credit +/// is transferred by a consumer. +/// +/// Memory Usage: +/// - Aside from three cache lines for managing capacity, the memory +/// for queue elements is managed using UnboundedQueue and grows and +/// shrinks dynamically with the number of elements. +/// - The template parameter LgAlign can be used to reduce memory usage +/// at the cost of increased chance of false sharing. + +template +struct DefaultWeightFn { + template + uint64_t operator()(Arg&&) const noexcept { + return 1; + } +}; + +template < + typename T, + bool SingleProducer, + bool SingleConsumer, + bool MayBlock, + size_t LgSegmentSize = 8, + size_t LgAlign = 7, + typename WeightFn = DefaultWeightFn, + template class Atom = std::atomic> +class DynamicBoundedQueue { + using Weight = uint64_t; + + enum WaitingState : uint32_t { + NOTWAITING = 0, + WAITING = 1, + }; + + static constexpr bool SPSC = SingleProducer && SingleConsumer; + static constexpr size_t Align = 1u << LgAlign; + + static_assert(LgAlign < 16, "LgAlign must be < 16"); + + /// Data members + + // Read mostly by producers + alignas(Align) Atom debit_; // written frequently only by producers + Atom capacity_; // written rarely by capacity resets + + // Read mostly by consumers + alignas(Align) Atom credit_; // written frequently only by consumers + Atom threshold_; // written rarely only by capacity resets + + // Normally written and read rarely by producers and consumers + // May be read frequently by producers when capacity is full + alignas(Align) Atom transfer_; + detail::Futex waiting_; + + // Underlying unbounded queue + UnboundedQueue< + T, + SingleProducer, + SingleConsumer, + MayBlock, + LgSegmentSize, + LgAlign, + Atom> + q_; + + public: + /** constructor */ + explicit DynamicBoundedQueue(Weight capacity) + : debit_(0), + capacity_(capacity + threshold(capacity)), // capacity slack + credit_(0), + threshold_(threshold(capacity)), + transfer_(0) {} + + /** destructor */ + ~DynamicBoundedQueue() {} + + /// Enqueue functions + + /** enqueue */ + FOLLY_ALWAYS_INLINE void enqueue(const T& v) { + enqueueImpl(v); + } + + FOLLY_ALWAYS_INLINE void enqueue(T&& v) { + enqueueImpl(std::move(v)); + } + + /** try_enqueue */ + FOLLY_ALWAYS_INLINE bool try_enqueue(const T& v) { + return tryEnqueueImpl(v); + } + + FOLLY_ALWAYS_INLINE bool try_enqueue(T&& v) { + return tryEnqueueImpl(std::move(v)); + } + + /** try_enqueue_until */ + template + FOLLY_ALWAYS_INLINE bool try_enqueue_until( + const T& v, + const std::chrono::time_point& deadline) { + return tryEnqueueUntilImpl(v, deadline); + } + + template + FOLLY_ALWAYS_INLINE bool try_enqueue_until( + T&& v, + const std::chrono::time_point& deadline) { + return tryEnqueueUntilImpl(std::move(v), deadline); + } + + /** try_enqueue_for */ + template + FOLLY_ALWAYS_INLINE bool try_enqueue_for( + const T& v, + const std::chrono::duration& duration) { + return tryEnqueueForImpl(v, duration); + } + + template + FOLLY_ALWAYS_INLINE bool try_enqueue_for( + T&& v, + const std::chrono::duration& duration) { + return tryEnqueueForImpl(std::move(v), duration); + } + + /// Dequeue functions + + /** dequeue */ + FOLLY_ALWAYS_INLINE void dequeue(T& elem) { + q_.dequeue(elem); + addCredit(WeightFn()(elem)); + } + + /** try_dequeue */ + FOLLY_ALWAYS_INLINE bool try_dequeue(T& elem) { + if (q_.try_dequeue(elem)) { + addCredit(WeightFn()(elem)); + return true; + } + return false; + } + + /** try_dequeue_until */ + template + FOLLY_ALWAYS_INLINE bool try_dequeue_until( + T& elem, + const std::chrono::time_point& deadline) { + if (q_.try_dequeue_until(elem, deadline)) { + addCredit(WeightFn()(elem)); + return true; + } + return false; + } + + /** try_dequeue_for */ + template + FOLLY_ALWAYS_INLINE bool try_dequeue_for( + T& elem, + const std::chrono::duration& duration) { + if (q_.try_dequeue_for(elem, duration)) { + addCredit(WeightFn()(elem)); + return true; + } + return false; + } + + /// Secondary functions + + /** reset_capacity */ + void reset_capacity(Weight capacity) noexcept { + Weight thresh = threshold(capacity); + capacity_.store(capacity + thresh, std::memory_order_release); + threshold_.store(thresh, std::memory_order_release); + } + + /** weight */ + Weight weight() const noexcept { + auto d = getDebit(); + auto c = getCredit(); + auto t = getTransfer(); + return d > (c + t) ? d - (c + t) : 0; + } + + /** size */ + size_t size() const noexcept { + return q_.size(); + } + + /** empty */ + bool empty() const noexcept { + return q_.empty(); + } + + private: + /// Private functions /// + + // Calculation of threshold to move credits in bulk from consumers + // to producers + constexpr Weight threshold(Weight capacity) const noexcept { + return capacity / 10; + } + + // Functions called frequently by producers + + template + FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& v) { + tryEnqueueUntilImpl( + std::forward(v), std::chrono::steady_clock::time_point::max()); + } + + template + FOLLY_ALWAYS_INLINE bool tryEnqueueImpl(Arg&& v) { + return tryEnqueueUntilImpl( + std::forward(v), std::chrono::steady_clock::time_point::min()); + } + + template + FOLLY_ALWAYS_INLINE bool tryEnqueueUntilImpl( + Arg&& v, + const std::chrono::time_point& deadline) { + Weight weight = WeightFn()(std::forward(v)); + if (LIKELY(tryAddDebit(weight))) { + q_.enqueue(std::forward(v)); + return true; + } + return tryEnqueueUntilSlow(std::forward(v), deadline); + } + + template + FOLLY_ALWAYS_INLINE bool tryEnqueueForImpl( + Arg&& v, + const std::chrono::duration& duration) { + if (LIKELY(tryEnqueueImpl(std::forward(v)))) { + return true; + } + auto deadline = std::chrono::steady_clock::now() + duration; + return tryEnqueueUntilSlow(std::forward(v), deadline); + } + + FOLLY_ALWAYS_INLINE bool tryAddDebit(Weight weight) noexcept { + Weight capacity = getCapacity(); + Weight before = fetchAddDebit(weight); + if (LIKELY(before + weight <= capacity)) { + return true; + } else { + subDebit(weight); + return false; + } + } + + FOLLY_ALWAYS_INLINE Weight getCapacity() const noexcept { + return capacity_.load(std::memory_order_acquire); + } + + FOLLY_ALWAYS_INLINE Weight fetchAddDebit(Weight weight) noexcept { + Weight before; + if (SingleProducer) { + before = getDebit(); + debit_.store(before + weight, std::memory_order_relaxed); + } else { + before = debit_.fetch_add(weight, std::memory_order_acq_rel); + } + return before; + } + + FOLLY_ALWAYS_INLINE Weight getDebit() const noexcept { + return debit_.load(std::memory_order_acquire); + } + + // Functions called frequently by consumers + + FOLLY_ALWAYS_INLINE void addCredit(Weight weight) noexcept { + Weight before = fetchAddCredit(weight); + Weight thresh = getThreshold(); + if (before + weight >= thresh && before < thresh) { + transferCredit(); + } + } + + FOLLY_ALWAYS_INLINE Weight fetchAddCredit(Weight weight) noexcept { + Weight before; + if (SingleConsumer) { + before = getCredit(); + credit_.store(before + weight, std::memory_order_relaxed); + } else { + before = credit_.fetch_add(weight, std::memory_order_acq_rel); + } + return before; + } + + FOLLY_ALWAYS_INLINE Weight getCredit() const noexcept { + return credit_.load(std::memory_order_acquire); + } + + FOLLY_ALWAYS_INLINE Weight getThreshold() const noexcept { + return threshold_.load(std::memory_order_acquire); + } + + /** Functions called infrequently by producers */ + + void subDebit(Weight weight) noexcept { + Weight before; + if (SingleProducer) { + before = getDebit(); + debit_.store(before - weight, std::memory_order_relaxed); + } else { + before = debit_.fetch_sub(weight, std::memory_order_acq_rel); + } + DCHECK_GE(before, weight); + } + + template + bool tryEnqueueUntilSlow( + Arg&& v, + const std::chrono::time_point& deadline) { + Weight weight = WeightFn()(std::forward(v)); + if (canEnqueue(deadline, weight)) { + q_.enqueue(std::forward(v)); + return true; + } else { + return false; + } + } + + template + bool canEnqueue( + const std::chrono::time_point& deadline, + Weight weight) noexcept { + Weight capacity = getCapacity(); + while (true) { + tryReduceDebit(); + Weight debit = getDebit(); + if ((debit + weight <= capacity) && tryAddDebit(weight)) { + return true; + } + if (Clock::now() >= deadline) { + return false; + } + if (MayBlock) { + if (canBlock(weight, capacity)) { + waiting_.futexWaitUntil(WAITING, deadline); + } + } else { + asm_volatile_pause(); + } + } + } + + bool canBlock(Weight weight, Weight capacity) noexcept { + waiting_.store(WAITING, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_seq_cst); + tryReduceDebit(); + Weight debit = getDebit(); + return debit + weight > capacity; + } + + bool tryReduceDebit() noexcept { + Weight w = takeTransfer(); + if (w > 0) { + subDebit(w); + } + return w > 0; + } + + Weight takeTransfer() noexcept { + Weight w = getTransfer(); + if (w > 0) { + w = transfer_.exchange(0, std::memory_order_acq_rel); + } + return w; + } + + Weight getTransfer() const noexcept { + return transfer_.load(std::memory_order_acquire); + } + + /** Functions called infrequently by consumers */ + + void transferCredit() noexcept { + Weight credit = takeCredit(); + transfer_.fetch_add(credit, std::memory_order_acq_rel); + if (MayBlock) { + std::atomic_thread_fence(std::memory_order_seq_cst); + waiting_.store(NOTWAITING, std::memory_order_relaxed); + waiting_.futexWake(); + } + } + + Weight takeCredit() noexcept { + Weight credit; + if (SingleConsumer) { + credit = credit_.load(std::memory_order_relaxed); + credit_.store(0, std::memory_order_relaxed); + } else { + credit = credit_.exchange(0, std::memory_order_acq_rel); + } + return credit; + } + +}; // DynamicBoundedQueue + +/// Aliases + +/** DSPSCQueue */ +template < + typename T, + bool MayBlock, + size_t LgSegmentSize = 8, + size_t LgAlign = 7, + typename WeightFn = DefaultWeightFn, + template class Atom = std::atomic> +using DSPSCQueue = DynamicBoundedQueue< + T, + true, + true, + MayBlock, + LgSegmentSize, + LgAlign, + WeightFn, + Atom>; + +/** DMPSCQueue */ +template < + typename T, + bool MayBlock, + size_t LgSegmentSize = 8, + size_t LgAlign = 7, + typename WeightFn = DefaultWeightFn, + template class Atom = std::atomic> +using DMPSCQueue = DynamicBoundedQueue< + T, + false, + true, + MayBlock, + LgSegmentSize, + LgAlign, + WeightFn, + Atom>; + +/** DSPMCQueue */ +template < + typename T, + bool MayBlock, + size_t LgSegmentSize = 8, + size_t LgAlign = 7, + typename WeightFn = DefaultWeightFn, + template class Atom = std::atomic> +using DSPMCQueue = DynamicBoundedQueue< + T, + true, + false, + MayBlock, + LgSegmentSize, + LgAlign, + WeightFn, + Atom>; + +/** DMPMCQueue */ +template < + typename T, + bool MayBlock, + size_t LgSegmentSize = 8, + size_t LgAlign = 7, + typename WeightFn = DefaultWeightFn, + template class Atom = std::atomic> +using DMPMCQueue = DynamicBoundedQueue< + T, + false, + false, + MayBlock, + LgSegmentSize, + LgAlign, + WeightFn, + Atom>; + +} // namespace folly diff --git a/folly/concurrency/test/DynamicBoundedQueueTest.cpp b/folly/concurrency/test/DynamicBoundedQueueTest.cpp new file mode 100644 index 00000000..bce25ec0 --- /dev/null +++ b/folly/concurrency/test/DynamicBoundedQueueTest.cpp @@ -0,0 +1,1017 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include + +#include +#include + +DEFINE_bool(bench, false, "run benchmark"); +DEFINE_int32(reps, 10, "number of reps"); +DEFINE_int32(ops, 1000000, "number of operations per rep"); +DEFINE_int64(capacity, 1000000, "capacity"); + +template +using DSPSC = folly::DSPSCQueue; + +template +using DMPSC = folly::DMPSCQueue; + +template +using DSPMC = folly::DSPMCQueue; + +template +using DMPMC = folly::DMPMCQueue; + +template