Add thread-safe priority queue with arbitrary priorities using flat combining
authorMaged Michael <magedmichael@fb.com>
Thu, 4 May 2017 02:57:21 +0000 (19:57 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Thu, 4 May 2017 03:08:57 +0000 (20:08 -0700)
Summary: This template uses flat combining and takes any sequential priority queue implementation that supports the `std::priority_queue` interface (`empty()`, `size()`, `push()`, `top()`, `pop()`) and any lock that meets the standard //Lockable// requirements to implement a thread-safe priority queue that supports arbitrary priorities. The template supports both unbounded and bounded size, and blocking, non-blocking, and timed variants of push, pop, and peek operations.

Reviewed By: djwatson

Differential Revision: D4873602

fbshipit-source-id: 96e1548b4f7427ecd2ee2ead7a19993df4441b33

folly/experimental/FlatCombiningPriorityQueue.h [new file with mode: 0644]
folly/experimental/test/FlatCombiningPriorityQueueTest.cpp [new file with mode: 0644]

diff --git a/folly/experimental/FlatCombiningPriorityQueue.h b/folly/experimental/FlatCombiningPriorityQueue.h
new file mode 100644 (file)
index 0000000..c8d1c16
--- /dev/null
@@ -0,0 +1,337 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <mutex>
+#include <queue>
+
+#include <folly/detail/Futex.h>
+#include <folly/experimental/flat_combining/FlatCombining.h>
+#include <glog/logging.h>
+
+namespace folly {
+
+/// Thread-safe priority queue based on flat combining. If the
+/// constructor parameter maxSize is greater than 0 (default = 0),
+/// then the queue is bounded. This template provides blocking,
+/// non-blocking, and timed variants of each of push(), pop(), and
+/// peek() operations. The empty() and size() functions are inherently
+/// non-blocking.
+///
+/// PriorityQueue must support the interface of std::priority_queue,
+/// specifically empty(), size(), push(), top(), and pop().  Mutex
+/// must meet the standard Lockable requirements.
+///
+/// By default FlatCombining uses a dedicated combiner thread, which
+/// yields better latency and throughput under high contention but
+/// higher overheads under low contention. If the constructor
+/// parameter dedicated is false, then there will be no dedicated
+/// combiner thread and any requester may do combining of operations
+/// requested by other threads. For more details see the comments for
+/// FlatCombining.
+///
+/// Usage examples:
+/// @code
+///   FlatCombiningPriorityQueue<int> pq(1);
+///   CHECK(pq.empty());
+///   CHECK(pq.size() == 0);
+///   int v;
+///   CHECK(!tryPop(v));
+///   CHECK(!tryPop(v, now() + seconds(1)));
+///   CHECK(!tryPeek(v));
+///   CHECK(!tryPeek(v, now() + seconds(1)));
+///   pq.push(10);
+///   CHECK(!pq.empty());
+///   CHECK(pq.size() == 1);
+///   CHECK(!pq.tryPush(20));
+///   CHECK(!pq.tryPush(20), now() + seconds(1)));
+///   peek(v);
+///   CHECK_EQ(v, 10);
+///   CHECK(pq.size() == 1);
+///   pop(v);
+///   CHECK_EQ(v, 10);
+///   CHECK(pq.empty());
+/// @encode
+
+template <
+    typename T,
+    typename PriorityQueue = std::priority_queue<T>,
+    typename Mutex = std::mutex,
+    template <typename> class Atom = std::atomic>
+class FlatCombiningPriorityQueue
+    : public folly::FlatCombining<
+          FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>,
+          Mutex,
+          Atom> {
+  using FCPQ = FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>;
+  using FC = folly::FlatCombining<FCPQ, Mutex, Atom>;
+
+ public:
+  template <
+      typename... PQArgs,
+      typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
+  explicit FlatCombiningPriorityQueue(
+      // Concurrent priority queue parameter
+      const size_t maxSize = 0,
+      // Flat combining parameters
+      const bool dedicated = true,
+      const uint32_t numRecs = 0,
+      const uint32_t maxOps = 0,
+      // (Sequential) PriorityQueue Parameters
+      PQArgs... args)
+      : FC(dedicated, numRecs, maxOps),
+        maxSize_(maxSize),
+        pq_(std::forward<PQArgs>(args)...) {}
+
+  /// Returns true iff the priority queue is empty
+  bool empty() const {
+    bool res;
+    auto fn = [&] { res = pq_.empty(); };
+    const_cast<FCPQ*>(this)->requestFC(fn);
+    return res;
+  }
+
+  /// Returns the number of items in the priority queue
+  size_t size() const {
+    size_t res;
+    auto fn = [&] { res = pq_.size(); };
+    const_cast<FCPQ*>(this)->requestFC(fn);
+    return res;
+  }
+
+  /// Non-blocking push. Succeeds if there is space in the priority
+  /// queue to insert the new item. Tries once if no time point is
+  /// provided or until the provided time_point is reached. If
+  /// successful, inserts the provided item in the priority queue
+  /// according to its priority.
+  template <class Clock = std::chrono::steady_clock>
+  bool tryPush(
+      const T& val,
+      const std::chrono::time_point<Clock>& when =
+          std::chrono::time_point<Clock>::min());
+
+  /// Non-blocking pop. Succeeds if the priority queue is
+  /// nonempty. Tries once if no time point is provided or until the
+  /// provided time_point is reached.  If successful, copies the
+  /// highest priority item and removes it from the priority queue.
+  template <class Clock = std::chrono::steady_clock>
+  bool tryPop(
+      T& val,
+      const std::chrono::time_point<Clock>& when =
+          std::chrono::time_point<Clock>::min());
+
+  /// Non-blocking peek. Succeeds if the priority queue is
+  /// nonempty. Tries once if no time point is provided or until the
+  /// provided time_point is reached.  If successful, copies the
+  /// highest priority item without removing it.
+  template <class Clock = std::chrono::steady_clock>
+  bool tryPeek(
+      T& val,
+      const std::chrono::time_point<Clock>& when =
+          std::chrono::time_point<Clock>::min());
+
+  /// Blocking push. Inserts the provided item in the priority
+  /// queue. If it is full, this function blocks until there is space
+  /// for the new item.
+  void push(const T& val) {
+    tryPush(val, std::chrono::time_point<std::chrono::steady_clock>::max());
+  }
+
+  /// Blocking pop. Copies the highest priority item and removes
+  /// it. If the priority queue is empty, this function blocks until
+  /// it is nonempty.
+  void pop(T& val) {
+    tryPop(val, std::chrono::time_point<std::chrono::steady_clock>::max());
+  }
+
+  /// Blocking peek. Copies the highest priority item without
+  /// removing it. If the priority queue is empty, this function
+  /// blocks until it is nonempty.
+  void peek(T& val) {
+    tryPeek(val, std::chrono::time_point<std::chrono::steady_clock>::max());
+  }
+
+ private:
+  size_t maxSize_;
+  PriorityQueue pq_;
+  detail::Futex<Atom> empty_;
+  detail::Futex<Atom> full_;
+
+  bool isTrue(detail::Futex<Atom>& futex) {
+    return futex.load(std::memory_order_relaxed) != 0;
+  }
+
+  void setFutex(detail::Futex<Atom>& futex, uint32_t val) {
+    futex.store(val, std::memory_order_relaxed);
+  }
+
+  bool futexSignal(detail::Futex<Atom>& futex) {
+    if (isTrue(futex)) {
+      setFutex(futex, 0);
+      return true;
+    } else {
+      return false;
+    }
+  }
+};
+
+/// Implementation
+
+template <
+    typename T,
+    typename PriorityQueue,
+    typename Mutex,
+    template <typename> class Atom>
+template <class Clock>
+inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPush(
+    const T& val,
+    const std::chrono::time_point<Clock>& when) {
+  while (true) {
+    bool res;
+    bool wake;
+
+    auto fn = [&] {
+      if (maxSize_ > 0 && pq_.size() == maxSize_) {
+        setFutex(full_, 1);
+        res = false;
+        return;
+      }
+      DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
+      try {
+        pq_.push(val);
+        wake = futexSignal(empty_);
+        res = true;
+        return;
+      } catch (const std::bad_alloc&) {
+        setFutex(full_, 1);
+        res = false;
+        return;
+      }
+    };
+    this->requestFC(fn);
+
+    if (res) {
+      if (wake) {
+        empty_.futexWake();
+      }
+      return true;
+    }
+    if (when == std::chrono::time_point<Clock>::min()) {
+      return false;
+    }
+    while (isTrue(full_)) {
+      if (when == std::chrono::time_point<Clock>::max()) {
+        full_.futexWait(1);
+      } else {
+        if (Clock::now() > when) {
+          return false;
+        } else {
+          full_.futexWaitUntil(1, when);
+        }
+      }
+    } // inner while loop
+  } // outer while loop
+}
+
+template <
+    typename T,
+    typename PriorityQueue,
+    typename Mutex,
+    template <typename> class Atom>
+template <class Clock>
+inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPop(
+    T& val,
+    const std::chrono::time_point<Clock>& when) {
+  while (true) {
+    bool res;
+    bool wake;
+
+    auto fn = [&] {
+      res = !pq_.empty();
+      if (res) {
+        val = pq_.top();
+        pq_.pop();
+        wake = futexSignal(full_);
+      } else {
+        setFutex(empty_, 1);
+      }
+    };
+    this->requestFC(fn);
+
+    if (res) {
+      if (wake) {
+        full_.futexWake();
+      }
+      return true;
+    }
+    while (isTrue(empty_)) {
+      if (when == std::chrono::time_point<Clock>::max()) {
+        empty_.futexWait(1);
+      } else {
+        if (Clock::now() > when) {
+          return false;
+        } else {
+          empty_.futexWaitUntil(1, when);
+        }
+      }
+    } // inner while loop
+  } // outer while loop
+}
+
+template <
+    typename T,
+    typename PriorityQueue,
+    typename Mutex,
+    template <typename> class Atom>
+template <class Clock>
+inline bool FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::tryPeek(
+    T& val,
+    const std::chrono::time_point<Clock>& when) {
+  while (true) {
+    bool res;
+
+    auto fn = [&] {
+      res = !pq_.empty();
+      if (res) {
+        val = pq_.top();
+      } else {
+        setFutex(empty_, 1);
+      }
+    };
+    this->requestFC(fn);
+
+    if (res) {
+      return true;
+    }
+    while (isTrue(empty_)) {
+      if (when == std::chrono::time_point<Clock>::max()) {
+        empty_.futexWait(1);
+      } else {
+        if (Clock::now() > when) {
+          return false;
+        } else {
+          empty_.futexWaitUntil(1, when);
+        }
+      }
+    } // inner while loop
+  } // outer while loop
+}
+
+} // namespace folly {
diff --git a/folly/experimental/test/FlatCombiningPriorityQueueTest.cpp b/folly/experimental/test/FlatCombiningPriorityQueueTest.cpp
new file mode 100644 (file)
index 0000000..0085f88
--- /dev/null
@@ -0,0 +1,526 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/FlatCombiningPriorityQueue.h>
+#include <folly/Benchmark.h>
+#include <folly/portability/GTest.h>
+#include <glog/logging.h>
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+
+DEFINE_bool(bench, false, "run benchmark");
+DEFINE_int32(reps, 10, "number of reps");
+DEFINE_int32(ops, 100000, "number of operations per rep");
+DEFINE_int32(size, 64, "initial size of the priority queue");
+DEFINE_int32(work, 1000, "amount of unrelated work per operation");
+
+void doWork(int work) {
+  uint64_t a = 0;
+  for (int i = work; i > 0; --i) {
+    a += i;
+  }
+  folly::doNotOptimizeAway(a);
+}
+
+/// Baseline implementation represents a conventional single-lock
+/// implementation that supports cond var blocking.
+template <
+    typename T,
+    typename PriorityQueue = std::priority_queue<T>,
+    typename Mutex = std::mutex>
+class BaselinePQ {
+ public:
+  template <
+      typename... PQArgs,
+      typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
+  explicit BaselinePQ(size_t maxSize = 0, PQArgs... args)
+      : maxSize_(maxSize), pq_(std::forward<PQArgs>(args)...) {}
+
+  bool empty() const {
+    std::lock_guard<Mutex> g(m_);
+    return pq_.empty();
+  }
+
+  size_t size() const {
+    std::lock_guard<Mutex> g(m_);
+    return pq_.size();
+  }
+
+  bool tryPush(const T& val) {
+    std::lock_guard<Mutex> g(m_);
+    if (maxSize_ > 0 && pq_.size() == maxSize_) {
+      return false;
+    }
+    DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
+    try {
+      pq_.push(val);
+      notempty_.notify_one();
+      return true;
+    } catch (const std::bad_alloc&) {
+      return false;
+    }
+  }
+
+  bool tryPop(T& val) {
+    std::lock_guard<Mutex> g(m_);
+    if (!pq_.empty()) {
+      val = pq_.top();
+      pq_.pop();
+      notfull_.notify_one();
+      return true;
+    }
+    return false;
+  }
+
+  bool tryPeek(T& val) {
+    std::lock_guard<Mutex> g(m_);
+    if (!pq_.empty()) {
+      val = pq_.top();
+      return true;
+    }
+    return false;
+  }
+
+ private:
+  Mutex m_;
+  size_t maxSize_;
+  PriorityQueue pq_;
+  std::condition_variable notempty_;
+  std::condition_variable notfull_;
+};
+
+using FCPQ = folly::FlatCombiningPriorityQueue<int>;
+using Baseline = BaselinePQ<int>;
+
+#if FOLLY_SANITIZE_THREAD
+static std::vector<int> nthr = {1, 2, 3, 4, 6, 8, 12, 16};
+#else
+static std::vector<int> nthr = {1, 2, 3, 4, 6, 8, 12, 16, 24, 32, 48, 64};
+#endif
+static uint32_t nthreads;
+
+template <typename PriorityQueue, typename Func>
+static uint64_t run_once(PriorityQueue& pq, const Func& fn) {
+  int ops = FLAGS_ops;
+  int size = FLAGS_size;
+  std::atomic<bool> start{false};
+  std::atomic<uint32_t> started{0};
+
+  for (int i = 0; i < size; ++i) {
+    CHECK(pq.tryPush(i * (ops / size)));
+  }
+
+  std::vector<std::thread> threads(nthreads);
+  for (uint32_t tid = 0; tid < nthreads; ++tid) {
+    threads[tid] = std::thread([&, tid] {
+      started.fetch_add(1);
+      while (!start.load())
+        /* nothing */;
+      fn(tid);
+    });
+  }
+
+  while (started.load() < nthreads)
+    /* nothing */;
+  auto tbegin = std::chrono::steady_clock::now();
+
+  // begin time measurement
+  start.store(true);
+
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  // end time measurement
+  uint64_t duration = 0;
+  auto tend = std::chrono::steady_clock::now();
+  duration = std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
+                 .count();
+  return duration;
+}
+
+TEST(FCPriQueue, basic) {
+  FCPQ pq;
+  CHECK(pq.empty());
+  CHECK_EQ(pq.size(), 0);
+  int v;
+  CHECK(!pq.tryPop(v));
+
+  CHECK(pq.tryPush(1));
+  CHECK(pq.tryPush(2));
+  CHECK(!pq.empty());
+  CHECK_EQ(pq.size(), 2);
+
+  pq.peek(v);
+  CHECK_EQ(v, 2); // higher value has higher priority
+  CHECK(pq.tryPeek(v));
+  CHECK_EQ(v, 2);
+  CHECK(!pq.empty());
+  CHECK_EQ(pq.size(), 2);
+
+  CHECK(pq.tryPop(v));
+  CHECK_EQ(v, 2);
+  CHECK(!pq.empty());
+  CHECK_EQ(pq.size(), 1);
+
+  CHECK(pq.tryPop(v));
+  CHECK_EQ(v, 1);
+  CHECK(pq.empty());
+  CHECK_EQ(pq.size(), 0);
+}
+
+TEST(FCPriQueue, bounded) {
+  FCPQ pq(1);
+  CHECK(pq.tryPush(1));
+  CHECK(!pq.tryPush(1));
+  CHECK_EQ(pq.size(), 1);
+  CHECK(!pq.empty());
+  int v;
+  CHECK(pq.tryPop(v));
+  CHECK_EQ(v, 1);
+  CHECK_EQ(pq.size(), 0);
+  CHECK(pq.empty());
+}
+
+TEST(FCPriQueue, timeout) {
+  FCPQ pq(1);
+  int v;
+  CHECK(!pq.tryPeek(
+      v,
+      std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
+  CHECK(!pq.tryPop(
+      v,
+      std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
+  pq.push(10);
+  CHECK(!pq.tryPush(
+      20,
+      std::chrono::steady_clock::now() + std::chrono::microseconds(1000)));
+}
+
+TEST(FCPriQueue, push_pop) {
+  int ops = 1000;
+  int work = 0;
+  std::chrono::steady_clock::time_point when =
+      std::chrono::steady_clock::now() + std::chrono::hours(24);
+  for (auto n : nthr) {
+    nthreads = n;
+    FCPQ pq(10000);
+    auto fn = [&](uint32_t tid) {
+      for (int i = tid; i < ops; i += nthreads) {
+        CHECK(pq.tryPush(i));
+        CHECK(pq.tryPush(i, when));
+        pq.push(i);
+        doWork(work);
+        int v;
+        CHECK(pq.tryPop(v));
+        CHECK(pq.tryPop(v, when));
+        pq.pop(v);
+        doWork(work);
+      }
+    };
+    run_once(pq, fn);
+  }
+}
+
+enum Exp {
+  NoFC,
+  FCNonBlock,
+  FCBlock,
+  FCTimed
+};
+
+static uint64_t test(std::string name, Exp exp, uint64_t base) {
+  int ops = FLAGS_ops;
+  int work = FLAGS_work;
+
+  uint64_t min = UINTMAX_MAX;
+  uint64_t max = 0;
+  uint64_t sum = 0;
+
+  for (int r = 0; r < FLAGS_reps; ++r) {
+    uint64_t dur;
+    switch(exp) {
+      case NoFC: {
+        Baseline pq;
+        auto fn = [&](uint32_t tid) {
+          for (int i = tid; i < ops; i += nthreads) {
+            CHECK(pq.tryPush(i));
+            doWork(work);
+            int v;
+            CHECK(pq.tryPop(v));
+            doWork(work);
+          }
+        };
+        dur = run_once(pq, fn);
+      } break;
+      case FCNonBlock: {
+        FCPQ pq;
+        auto fn = [&](uint32_t tid) {
+          for (int i = tid; i < ops; i += nthreads) {
+            CHECK(pq.tryPush(i));
+            doWork(work);
+            int v;
+            CHECK(pq.tryPop(v));
+            doWork(work);
+          }
+        };
+        dur = run_once(pq, fn);
+      } break;
+      case FCBlock: {
+        FCPQ pq;
+        auto fn = [&](uint32_t tid) {
+          for (int i = tid; i < ops; i += nthreads) {
+            pq.push(i);
+            doWork(work);
+            int v;
+            pq.pop(v);
+            doWork(work);
+          }
+        };
+        dur = run_once(pq, fn);
+      } break;
+      case FCTimed: {
+        FCPQ pq;
+        auto fn = [&](uint32_t tid) {
+          std::chrono::steady_clock::time_point when =
+              std::chrono::steady_clock::now() + std::chrono::hours(24);
+          for (int i = tid; i < ops; i += nthreads) {
+            CHECK(pq.tryPush(i, when));
+            doWork(work);
+            int v;
+            CHECK(pq.tryPop(v, when));
+            doWork(work);
+          }
+        };
+        dur = run_once(pq, fn);
+      } break;
+      default:
+        CHECK(false);
+    }
+
+    sum += dur;
+    min = std::min(min, dur);
+    max = std::max(max, dur);
+  }
+
+  uint64_t avg = sum / FLAGS_reps;
+  uint64_t res = min;
+  std::cout << name;
+  std::cout << "   " << std::setw(4) << max / FLAGS_ops << " ns";
+  std::cout << "   " << std::setw(4) << avg / FLAGS_ops << " ns";
+  std::cout << "   " << std::setw(4) << res / FLAGS_ops << " ns";
+  if (base) {
+    std::cout << " " << std::setw(3) << 100 * base / res << "%";
+  }
+  std::cout << std::endl;
+  return res;
+}
+
+TEST(FCPriQueue, bench) {
+  if (!FLAGS_bench) {
+    return;
+  }
+
+  std::cout << "Test_name, Max time, Avg time, Min time, % base min / min"
+            << std::endl;
+  for (int i : nthr) {
+    nthreads = i;
+    std::cout << "\n------------------------------------ Number of threads = "
+              << i
+              << std::endl;
+    uint64_t base =
+    test("baseline                    ", NoFC, 0);
+    test("baseline - dup              ", NoFC, base);
+    std::cout << "---- fc -------------------------------" << std::endl;
+    test("fc non-blocking             ", FCNonBlock, base);
+    test("fc non-blocking - dup       ", FCNonBlock, base);
+    test("fc timed                    ", FCTimed, base);
+    test("fc timed - dup              ", FCTimed, base);
+    test("fc blocking                 ", FCBlock, base);
+    test("fc blocking - dup           ", FCBlock, base);
+  }
+}
+
+/*
+$ numactl -N 1 folly/experimental/test/fc_pri_queue_test --bench
+
+[ RUN      ] FCPriQueue.bench
+Test_name, Max time, Avg time, Min time, % base min / min
+
+------------------------------------ Number of threads = 1
+baseline                        815 ns    793 ns    789 ns
+baseline - dup                  886 ns    827 ns    789 ns  99%
+---- fc -------------------------------
+fc non-blocking                 881 ns    819 ns    789 ns  99%
+fc non-blocking - dup           833 ns    801 ns    786 ns 100%
+fc timed                        863 ns    801 ns    781 ns 100%
+fc timed - dup                  830 ns    793 ns    782 ns 100%
+fc blocking                    1043 ns    820 ns    789 ns  99%
+fc blocking - dup               801 ns    793 ns    789 ns 100%
+
+------------------------------------ Number of threads = 2
+baseline                        579 ns    557 ns    540 ns
+baseline - dup                  905 ns    621 ns    538 ns 100%
+---- fc -------------------------------
+fc non-blocking                 824 ns    642 ns    568 ns  95%
+fc non-blocking - dup           737 ns    645 ns    591 ns  91%
+fc timed                        654 ns    590 ns    542 ns  99%
+fc timed - dup                  666 ns    586 ns    534 ns 101%
+fc blocking                     622 ns    599 ns    575 ns  93%
+fc blocking - dup               677 ns    618 ns    570 ns  94%
+
+------------------------------------ Number of threads = 3
+baseline                        740 ns    717 ns    699 ns
+baseline - dup                  742 ns    716 ns    697 ns 100%
+---- fc -------------------------------
+fc non-blocking                 730 ns    689 ns    645 ns 108%
+fc non-blocking - dup           719 ns    695 ns    639 ns 109%
+fc timed                        695 ns    650 ns    597 ns 117%
+fc timed - dup                  694 ns    654 ns    624 ns 112%
+fc blocking                     711 ns    687 ns    669 ns 104%
+fc blocking - dup               716 ns    695 ns    624 ns 112%
+
+------------------------------------ Number of threads = 4
+baseline                        777 ns    766 ns    750 ns
+baseline - dup                  778 ns    752 ns    731 ns 102%
+---- fc -------------------------------
+fc non-blocking                 653 ns    615 ns    589 ns 127%
+fc non-blocking - dup           611 ns    593 ns    563 ns 133%
+fc timed                        597 ns    577 ns    569 ns 131%
+fc timed - dup                  618 ns    575 ns    546 ns 137%
+fc blocking                     603 ns    590 ns    552 ns 135%
+fc blocking - dup               614 ns    590 ns    556 ns 134%
+
+------------------------------------ Number of threads = 6
+baseline                        925 ns    900 ns    869 ns
+baseline - dup                  930 ns    895 ns    866 ns 100%
+---- fc -------------------------------
+fc non-blocking                 568 ns    530 ns    481 ns 180%
+fc non-blocking - dup           557 ns    521 ns    488 ns 177%
+fc timed                        516 ns    496 ns    463 ns 187%
+fc timed - dup                  517 ns    500 ns    474 ns 183%
+fc blocking                     559 ns    513 ns    450 ns 193%
+fc blocking - dup               564 ns    528 ns    466 ns 186%
+
+------------------------------------ Number of threads = 8
+baseline                        999 ns    981 ns    962 ns
+baseline - dup                  998 ns    984 ns    965 ns  99%
+---- fc -------------------------------
+fc non-blocking                 491 ns    386 ns    317 ns 303%
+fc non-blocking - dup           433 ns    344 ns    298 ns 322%
+fc timed                        445 ns    348 ns    294 ns 327%
+fc timed - dup                  446 ns    357 ns    292 ns 328%
+fc blocking                     505 ns    389 ns    318 ns 302%
+fc blocking - dup               416 ns    333 ns    293 ns 328%
+
+------------------------------------ Number of threads = 12
+baseline                       1092 ns   1080 ns   1072 ns
+baseline - dup                 1085 ns   1074 ns   1065 ns 100%
+---- fc -------------------------------
+fc non-blocking                 360 ns    283 ns    258 ns 415%
+fc non-blocking - dup           340 ns    278 ns    250 ns 427%
+fc timed                        271 ns    260 ns    249 ns 429%
+fc timed - dup                  397 ns    283 ns    253 ns 423%
+fc blocking                     331 ns    279 ns    258 ns 415%
+fc blocking - dup               358 ns    280 ns    259 ns 412%
+
+------------------------------------ Number of threads = 16
+baseline                       1120 ns   1115 ns   1103 ns
+baseline - dup                 1122 ns   1118 ns   1114 ns  99%
+---- fc -------------------------------
+fc non-blocking                 339 ns    297 ns    246 ns 448%
+fc non-blocking - dup           353 ns    301 ns    264 ns 417%
+fc timed                        326 ns    287 ns    247 ns 445%
+fc timed - dup                  338 ns    294 ns    259 ns 425%
+fc blocking                     329 ns    288 ns    247 ns 445%
+fc blocking - dup               375 ns    308 ns    265 ns 415%
+
+------------------------------------ Number of threads = 24
+baseline                       1073 ns   1068 ns   1064 ns
+baseline - dup                 1075 ns   1071 ns   1069 ns  99%
+---- fc -------------------------------
+fc non-blocking                 439 ns    342 ns    278 ns 382%
+fc non-blocking - dup           389 ns    318 ns    291 ns 364%
+fc timed                        368 ns    324 ns    266 ns 398%
+fc timed - dup                  412 ns    328 ns    302 ns 352%
+fc blocking                     425 ns    345 ns    275 ns 386%
+fc blocking - dup               429 ns    340 ns    269 ns 395%
+
+------------------------------------ Number of threads = 32
+baseline                       1001 ns    990 ns    981 ns
+baseline - dup                 1002 ns    992 ns    983 ns  99%
+---- fc -------------------------------
+fc non-blocking                 404 ns    342 ns    273 ns 359%
+fc non-blocking - dup           395 ns    316 ns    259 ns 378%
+fc timed                        379 ns    330 ns    258 ns 380%
+fc timed - dup                  392 ns    335 ns    274 ns 357%
+fc blocking                     423 ns    340 ns    277 ns 353%
+fc blocking - dup               445 ns    359 ns    275 ns 356%
+
+------------------------------------ Number of threads = 48
+baseline                        978 ns    975 ns    971 ns
+baseline - dup                  977 ns    974 ns    972 ns  99%
+---- fc -------------------------------
+fc non-blocking                 424 ns    327 ns    258 ns 375%
+fc non-blocking - dup           378 ns    317 ns    256 ns 379%
+fc timed                        368 ns    311 ns    277 ns 350%
+fc timed - dup                  385 ns    310 ns    251 ns 385%
+fc blocking                     422 ns    313 ns    255 ns 380%
+fc blocking - dup               406 ns    314 ns    258 ns 376%
+
+------------------------------------ Number of threads = 64
+baseline                        993 ns    981 ns    974 ns
+baseline - dup                  984 ns    979 ns    975 ns  99%
+---- fc -------------------------------
+fc non-blocking                 353 ns    301 ns    266 ns 365%
+fc non-blocking - dup           339 ns    301 ns    271 ns 358%
+fc timed                        399 ns    321 ns    259 ns 375%
+fc timed - dup                  381 ns    300 ns    263 ns 369%
+fc blocking                     390 ns    301 ns    251 ns 387%
+fc blocking - dup               345 ns    289 ns    259 ns 374%
+[       OK ] FCPriQueue.bench (112424 ms)
+
+$ lscpu
+Architecture:          x86_64
+CPU op-mode(s):        32-bit, 64-bit
+Byte Order:            Little Endian
+CPU(s):                32
+On-line CPU(s) list:   0-31
+Thread(s) per core:    2
+Core(s) per socket:    8
+Socket(s):             2
+NUMA node(s):          2
+Vendor ID:             GenuineIntel
+CPU family:            6
+Model:                 45
+Model name:            Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz
+Stepping:              6
+CPU MHz:               2200.000
+CPU max MHz:           2200.0000
+CPU min MHz:           1200.0000
+BogoMIPS:              4399.87
+Virtualization:        VT-x
+L1d cache:             32K
+L1i cache:             32K
+L2 cache:              256K
+L3 cache:              20480K
+NUMA node0 CPU(s):     0-7,16-23
+NUMA node1 CPU(s):     8-15,24-31
+Flags:                 fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 cx16 xtpr pdcm pcid dca sse4_1 sse4_2 x2apic popcnt tsc_deadline_timer aes xsave avx lahf_lm epb tpr_shadow vnmi flexpriority ept vpid xsaveopt dtherm arat pln pts
+
+ */