Codemod folly::make_unique to std::make_unique
[folly.git] / folly / test / MPMCQueueTest.cpp
index 25b85a92a4ebff668c6743a5a942155b683f40ce..f556ebbf326d9442822abd153d52e23be0d5da97 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-#include <folly/MPMCQueue.h>
 #include <folly/Format.h>
+#include <folly/MPMCQueue.h>
 #include <folly/Memory.h>
+#include <folly/portability/GTest.h>
 #include <folly/portability/SysResource.h>
 #include <folly/portability/SysTime.h>
 #include <folly/portability/Unistd.h>
+#include <folly/stop_watch.h>
 #include <folly/test/DeterministicSchedule.h>
 
 #include <boost/intrusive_ptr.hpp>
-#include <memory>
+#include <boost/thread/barrier.hpp>
 #include <functional>
+#include <memory>
 #include <thread>
 #include <utility>
 
-#include <gtest/gtest.h>
-
 FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr);
 
 using namespace folly;
@@ -101,9 +102,9 @@ TEST(MPMCQueue, sequencer_deterministic) {
   run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
 }
 
-template <typename T>
+template <bool Dynamic = false, typename T>
 void runElementTypeTest(T&& src) {
-  MPMCQueue<T> cq(10);
+  MPMCQueue<T, std::atomic, Dynamic> cq(10);
   cq.blockingWrite(std::forward<T>(src));
   T dest;
   cq.blockingRead(dest);
@@ -118,7 +119,7 @@ void runElementTypeTest(T&& src) {
 }
 
 struct RefCounted {
-  static __thread int active_instances;
+  static FOLLY_TLS int active_instances;
 
   mutable std::atomic<int> rc;
 
@@ -130,8 +131,7 @@ struct RefCounted {
     --active_instances;
   }
 };
-__thread int RefCounted::active_instances;
-
+FOLLY_TLS int RefCounted::active_instances;
 
 void intrusive_ptr_add_ref(RefCounted const* p) {
   p->rc++;
@@ -149,12 +149,26 @@ TEST(MPMCQueue, lots_of_element_types) {
   runElementTypeTest(std::make_pair(10, string("def")));
   runElementTypeTest(vector<string>{{"abc"}});
   runElementTypeTest(std::make_shared<char>('a'));
-  runElementTypeTest(folly::make_unique<char>('a'));
+  runElementTypeTest(std::make_unique<char>('a'));
   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
   EXPECT_EQ(RefCounted::active_instances, 0);
 }
 
+TEST(MPMCQueue, lots_of_element_types_dynamic) {
+  runElementTypeTest<true>(10);
+  runElementTypeTest<true>(string("abc"));
+  runElementTypeTest<true>(std::make_pair(10, string("def")));
+  runElementTypeTest<true>(vector<string>{{"abc"}});
+  runElementTypeTest<true>(std::make_shared<char>('a'));
+  runElementTypeTest<true>(std::make_unique<char>('a'));
+  runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
+  EXPECT_EQ(RefCounted::active_instances, 0);
+}
+
 TEST(MPMCQueue, single_thread_enqdeq) {
+  // Non-dynamic version only.
+  // False positive for dynamic version. Capacity can be temporarily
+  // higher than specified.
   MPMCQueue<int> cq(10);
 
   for (int pass = 0; pass < 10; ++pass) {
@@ -185,6 +199,9 @@ TEST(MPMCQueue, single_thread_enqdeq) {
 }
 
 TEST(MPMCQueue, tryenq_capacity_test) {
+  // Non-dynamic version only.
+  // False positive for dynamic version. Capacity can be temporarily
+  // higher than specified.
   for (size_t cap = 1; cap < 100; ++cap) {
     MPMCQueue<int> cq(cap);
     for (size_t i = 0; i < cap; ++i) {
@@ -195,6 +212,9 @@ TEST(MPMCQueue, tryenq_capacity_test) {
 }
 
 TEST(MPMCQueue, enq_capacity_test) {
+  // Non-dynamic version only.
+  // False positive for dynamic version. Capacity can be temporarily
+  // higher than specified.
   for (auto cap : { 1, 100, 10000 }) {
     MPMCQueue<int> cq(cap);
     for (int i = 0; i < cap; ++i) {
@@ -215,11 +235,11 @@ TEST(MPMCQueue, enq_capacity_test) {
   }
 }
 
-template <template<typename> class Atom>
+template <template<typename> class Atom, bool Dynamic = false>
 void runTryEnqDeqThread(
     int numThreads,
     int n, /*numOps*/
-    MPMCQueue<int, Atom>& cq,
+    MPMCQueue<int, Atom, Dynamic>& cq,
     std::atomic<uint64_t>& sum,
     int t) {
   uint64_t threadSum = 0;
@@ -242,18 +262,18 @@ void runTryEnqDeqThread(
   sum += threadSum;
 }
 
-template <template<typename> class Atom>
+template <template<typename> class Atom, bool Dynamic = false>
 void runTryEnqDeqTest(int numThreads, int numOps) {
   // write and read aren't linearizable, so we don't have
   // hard guarantees on their individual behavior.  We can still test
   // correctness in aggregate
-  MPMCQueue<int,Atom> cq(numThreads);
+  MPMCQueue<int,Atom, Dynamic> cq(numThreads);
 
   uint64_t n = numOps;
   vector<std::thread> threads(numThreads);
   std::atomic<uint64_t> sum(0);
   for (int t = 0; t < numThreads; ++t) {
-    threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom>,
+    threads[t] = DSched::thread(std::bind(runTryEnqDeqThread<Atom, Dynamic>,
           numThreads, n, std::ref(cq), std::ref(sum), t));
   }
   for (auto& t : threads) {
@@ -272,6 +292,15 @@ TEST(MPMCQueue, mt_try_enq_deq) {
   }
 }
 
+TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
+  }
+}
+
 TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
   int nts[] = { 1, 3, 100 };
 
@@ -281,6 +310,15 @@ TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
   }
 }
 
+TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
+  }
+}
+
 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
   int nts[] = { 3, 10 };
 
@@ -297,6 +335,14 @@ TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
       DSched sched(DSched::uniformSubset(seed, 2));
       runTryEnqDeqTest<DeterministicAtomic>(nt, n);
     }
+    {
+      DSched sched(DSched::uniform(seed));
+      runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
+    }
+    {
+      DSched sched(DSched::uniformSubset(seed, 2));
+      runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
+    }
   }
 }
 
@@ -415,10 +461,11 @@ string producerConsumerBench(Q&& queue,
   long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
       (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
   uint64_t failures = failed;
+  size_t allocated = q.allocatedCapacity();
 
   return folly::sformat(
       "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
-      "handoff, {} failures",
+      "handoff, {} failures, {} allocated",
       qName,
       numProducers,
       writer.methodName(),
@@ -426,134 +473,254 @@ string producerConsumerBench(Q&& queue,
       nanosPer,
       csw,
       n,
-      failures);
+      failures,
+      allocated);
 }
 
-TEST(MPMCQueue, mt_prod_cons_deterministic) {
+template <bool Dynamic = false>
+void runMtProdConsDeterministic(long seed) {
   // we use the Bench method, but perf results are meaningless under DSched
-  DSched sched(DSched::uniform(0));
+  DSched sched(DSched::uniform(seed));
+
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
+                                                Dynamic>>>> callers;
+  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+                       DeterministicAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+                       DeterministicAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+                       DeterministicAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       DeterministicAtomic, Dynamic>>>(milliseconds(1)));
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       DeterministicAtomic, Dynamic>>>(seconds(2)));
+  size_t cap;
 
-  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic>>>>
-      callers;
-  callers.emplace_back(
-      make_unique<BlockingWriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
-  callers.emplace_back(
-      make_unique<WriteIfNotFullCaller<MPMCQueue<int, DeterministicAtomic>>>());
-  callers.emplace_back(
-      make_unique<WriteCaller<MPMCQueue<int, DeterministicAtomic>>>());
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
-          milliseconds(1)));
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int, DeterministicAtomic>>>(
-          seconds(2)));
+  for (const auto& caller : callers) {
+    cap = 10;
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+        "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+          + folly::to<std::string>(cap)+")",
+        1,
+        1,
+        1000,
+        *caller);
+    cap = 100;
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+        "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+          + folly::to<std::string>(cap)+")",
+        10,
+        10,
+        1000,
+        *caller);
+    cap = 10;
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+        "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+          + folly::to<std::string>(cap)+")",
+        1,
+        1,
+        1000,
+        *caller);
+    cap = 100;
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+        "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+          + folly::to<std::string>(cap)+")",
+        10,
+        10,
+        1000,
+        *caller);
+    cap = 1;
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
+        "MPMCQueue<int, DeterministicAtomic, Dynamic>("
+          + folly::to<std::string>(cap)+")",
+        10,
+        10,
+        1000,
+        *caller);
+  }
+}
+
+void runMtProdConsDeterministicDynamic(
+  long seed,
+  uint32_t prods,
+  uint32_t cons,
+  uint32_t numOps,
+  size_t cap,
+  size_t minCap,
+  size_t mult
+) {
+  // we use the Bench method, but perf results are meaningless under DSched
+  DSched sched(DSched::uniform(seed));
+
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, DeterministicAtomic,
+                                                true>>>> callers;
+  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+                       DeterministicAtomic, true>>>());
+  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+                       DeterministicAtomic, true>>>());
+  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+                       DeterministicAtomic, true>>>());
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       DeterministicAtomic, true>>>(milliseconds(1)));
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       DeterministicAtomic, true>>>(seconds(2)));
 
   for (const auto& caller : callers) {
-    LOG(INFO)
-        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
-                                 "MPMCQueue<int, DeterministicAtomic>(10)",
-                                 1,
-                                 1,
-                                 1000,
-                                 *caller);
-    LOG(INFO)
-        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
-                                 "MPMCQueue<int, DeterministicAtomic>(100)",
-                                 10,
-                                 10,
-                                 1000,
-                                 *caller);
-    LOG(INFO)
-        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(10),
-                                 "MPMCQueue<int, DeterministicAtomic>(10)",
-                                 1,
-                                 1,
-                                 1000,
-                                 *caller);
-    LOG(INFO)
-        << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(100),
-                                 "MPMCQueue<int, DeterministicAtomic>(100)",
-                                 10,
-                                 10,
-                                 1000,
-                                 *caller);
-    LOG(INFO) << producerConsumerBench(MPMCQueue<int, DeterministicAtomic>(1),
-                                       "MPMCQueue<int, DeterministicAtomic>(1)",
-                                       10,
-                                       10,
-                                       1000,
-                                       *caller);
+    LOG(INFO) <<
+      producerConsumerBench(
+        MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
+        "MPMCQueue<int, DeterministicAtomic, true>("
+          + folly::to<std::string>(cap) + ", "
+          + folly::to<std::string>(minCap) + ", "
+          + folly::to<std::string>(mult)+")",
+        prods,
+        cons,
+        numOps,
+        *caller);
   }
 }
 
+TEST(MPMCQueue, mt_prod_cons_deterministic) {
+  runMtProdConsDeterministic(0);
+}
+
+TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
+  runMtProdConsDeterministic<true>(0);
+}
+
+template <typename T>
+void setFromEnv(T& var, const char* envvar) {
+  char* str = std::getenv(envvar);
+  if (str) { var = atoi(str); }
+}
+
+TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
+  long seed = 0;
+  uint32_t prods = 10;
+  uint32_t cons = 10;
+  uint32_t numOps = 1000;
+  size_t cap = 10000;
+  size_t minCap = 9;
+  size_t mult = 3;
+  setFromEnv(seed, "SEED");
+  setFromEnv(prods, "PRODS");
+  setFromEnv(cons, "CONS");
+  setFromEnv(numOps, "NUM_OPS");
+  setFromEnv(cap, "CAP");
+  setFromEnv(minCap, "MIN_CAP");
+  setFromEnv(mult, "MULT");
+  runMtProdConsDeterministicDynamic(
+    seed, prods, cons, numOps, cap, minCap, mult);
+}
+
 #define PC_BENCH(q, np, nc, ...) \
     producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
 
-TEST(MPMCQueue, mt_prod_cons) {
+template <bool Dynamic = false>
+void runMtProdCons() {
   int n = 100000;
-  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int>>>> callers;
-  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int>>>());
-  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int>>>());
-  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int>>>());
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(milliseconds(1)));
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int>>>(seconds(2)));
+  setFromEnv(n, "NUM_OPS");
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, std::atomic, Dynamic>>>>
+    callers;
+  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+                       std::atomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+                       std::atomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int, std::atomic,
+                       Dynamic>>>());
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       std::atomic, Dynamic>>>(milliseconds(1)));
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       std::atomic, Dynamic>>>(seconds(2)));
   for (const auto& caller : callers) {
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 1, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 1, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 1, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10), 10, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 1, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 1, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 1, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(10000), 10, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+                          1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+                          10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+                          1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10)),
+                          10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+                          1, 1, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+                          10, 1, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+                          1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(10000)),
+                          10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, std::atomic, Dynamic>(100000)),
+                          32, 100, n, *caller);
   }
 }
 
-TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+TEST(MPMCQueue, mt_prod_cons) {
+  runMtProdCons();
+}
+
+TEST(MPMCQueue, mt_prod_cons_dynamic) {
+  runMtProdCons</* Dynamic = */ true>();
+}
+
+template <bool Dynamic = false>
+void runMtProdConsEmulatedFutex() {
   int n = 100000;
-  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic>>>>
-      callers;
-  callers.emplace_back(
-      make_unique<BlockingWriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
-  callers.emplace_back(
-      make_unique<WriteIfNotFullCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
-  callers.emplace_back(
-      make_unique<WriteCaller<MPMCQueue<int, EmulatedFutexAtomic>>>());
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
-          milliseconds(1)));
-  callers.emplace_back(
-      make_unique<TryWriteUntilCaller<MPMCQueue<int, EmulatedFutexAtomic>>>(
-          seconds(2)));
+  vector<unique_ptr<WriteMethodCaller<MPMCQueue<int, EmulatedFutexAtomic,
+                                                Dynamic>>>> callers;
+  callers.emplace_back(make_unique<BlockingWriteCaller<MPMCQueue<int,
+                       EmulatedFutexAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteIfNotFullCaller<MPMCQueue<int,
+                       EmulatedFutexAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<WriteCaller<MPMCQueue<int,
+                       EmulatedFutexAtomic, Dynamic>>>());
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       EmulatedFutexAtomic, Dynamic>>>(milliseconds(1)));
+  callers.emplace_back(make_unique<TryWriteUntilCaller<MPMCQueue<int,
+                       EmulatedFutexAtomic, Dynamic>>>(seconds(2)));
   for (const auto& caller : callers) {
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 1, n, *caller);
-    LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 1, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 1, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 1, 10, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 1, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10)), 10, 10, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 1, 10, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 1, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10)), 10, 10, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 1, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 1, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 1, 10, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 10, 1, n, *caller);
     LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(10000)), 10, 10, n, *caller);
-    LOG(INFO) << PC_BENCH(
-        (MPMCQueue<int, EmulatedFutexAtomic>(100000)), 32, 100, n, *caller);
+      (MPMCQueue<int, EmulatedFutexAtomic, Dynamic>(10000)), 1, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
+                           (10000)), 10, 10, n, *caller);
+    LOG(INFO) << PC_BENCH((MPMCQueue<int, EmulatedFutexAtomic, Dynamic>
+                           (100000)), 32, 100, n, *caller);
   }
 }
 
-template <template <typename> class Atom>
+TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+  runMtProdConsEmulatedFutex();
+}
+
+TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
+  runMtProdConsEmulatedFutex</* Dynamic = */ true>();
+}
+
+template <template <typename> class Atom, bool Dynamic = false>
 void runNeverFailThread(int numThreads,
                         int n, /*numOps*/
-                        MPMCQueue<int, Atom>& cq,
+                        MPMCQueue<int, Atom, Dynamic>& cq,
                         std::atomic<uint64_t>& sum,
                         int t) {
   uint64_t threadSum = 0;
@@ -569,10 +736,10 @@ void runNeverFailThread(int numThreads,
   sum += threadSum;
 }
 
-template <template <typename> class Atom>
+template <template <typename> class Atom, bool Dynamic = false>
 uint64_t runNeverFailTest(int numThreads, int numOps) {
   // always #enq >= #deq
-  MPMCQueue<int, Atom> cq(numThreads);
+  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
 
   uint64_t n = numOps;
   auto beginMicro = nowMicro();
@@ -580,7 +747,7 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
   vector<std::thread> threads(numThreads);
   std::atomic<uint64_t> sum(0);
   for (int t = 0; t < numThreads; ++t) {
-    threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom>,
+    threads[t] = DSched::thread(std::bind(runNeverFailThread<Atom, Dynamic>,
                                           numThreads,
                                           n,
                                           std::ref(cq),
@@ -596,51 +763,58 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
   return nowMicro() - beginMicro;
 }
 
-TEST(MPMCQueue, mt_never_fail) {
-  int nts[] = {1, 3, 100};
-
-  int n = 100000;
+template <template<typename> class Atom, bool Dynamic = false>
+void runMtNeverFail(std::vector<int>& nts, int n) {
   for (int nt : nts) {
-    uint64_t elapsed = runNeverFailTest<std::atomic>(nt, n);
+    uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
               << " threads";
   }
 }
 
-TEST(MPMCQueue, mt_never_fail_emulated_futex) {
-  int nts[] = {1, 3, 100};
+// All the never_fail tests are for the non-dynamic version only.
+// False positive for dynamic version. Some writeIfNotFull() and
+// tryWriteUntil() operations may fail in transient conditions related
+// to expansion.
 
+TEST(MPMCQueue, mt_never_fail) {
+  std::vector<int> nts {1, 3, 100};
   int n = 100000;
-  for (int nt : nts) {
-    uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
-    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
-              << " threads";
-  }
+  runMtNeverFail<std::atomic>(nts, n);
 }
 
-TEST(MPMCQueue, mt_never_fail_deterministic) {
-  int nts[] = {3, 10};
+TEST(MPMCQueue, mt_never_fail_emulated_futex) {
+  std::vector<int> nts {1, 3, 100};
+  int n = 100000;
+  runMtNeverFail<EmulatedFutexAtomic>(nts, n);
+}
 
-  long seed = 0; // nowMicro() % 10000;
+template<bool Dynamic = false>
+void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
   LOG(INFO) << "using seed " << seed;
-
-  int n = 1000;
   for (int nt : nts) {
     {
       DSched sched(DSched::uniform(seed));
-      runNeverFailTest<DeterministicAtomic>(nt, n);
+      runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
     }
     {
       DSched sched(DSched::uniformSubset(seed, 2));
-      runNeverFailTest<DeterministicAtomic>(nt, n);
+      runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
     }
   }
 }
 
-template <class Clock, template <typename> class Atom>
+TEST(MPMCQueue, mt_never_fail_deterministic) {
+  std::vector<int> nts {3, 10};
+  long seed = 0; // nowMicro() % 10000;
+  int n = 1000;
+  runMtNeverFailDeterministic(nts, n, seed);
+}
+
+template <class Clock, template <typename> class Atom, bool Dynamic>
 void runNeverFailUntilThread(int numThreads,
                              int n, /*numOps*/
-                             MPMCQueue<int, Atom>& cq,
+                             MPMCQueue<int, Atom, Dynamic>& cq,
                              std::atomic<uint64_t>& sum,
                              int t) {
   uint64_t threadSum = 0;
@@ -657,10 +831,10 @@ void runNeverFailUntilThread(int numThreads,
   sum += threadSum;
 }
 
-template <class Clock, template <typename> class Atom>
+template <class Clock, template <typename> class Atom, bool Dynamic = false>
 uint64_t runNeverFailTest(int numThreads, int numOps) {
   // always #enq >= #deq
-  MPMCQueue<int, Atom> cq(numThreads);
+  MPMCQueue<int, Atom, Dynamic> cq(numThreads);
 
   uint64_t n = numOps;
   auto beginMicro = nowMicro();
@@ -668,12 +842,13 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
   vector<std::thread> threads(numThreads);
   std::atomic<uint64_t> sum(0);
   for (int t = 0; t < numThreads; ++t) {
-    threads[t] = DSched::thread(std::bind(runNeverFailUntilThread<Clock, Atom>,
-                                          numThreads,
-                                          n,
-                                          std::ref(cq),
-                                          std::ref(sum),
-                                          t));
+    threads[t] = DSched::thread(std::bind(
+                                  runNeverFailUntilThread<Clock, Atom, Dynamic>,
+                                  numThreads,
+                                  n,
+                                  std::ref(cq),
+                                  std::ref(sum),
+                                  t));
   }
   for (auto& t : threads) {
     DSched::join(t);
@@ -684,30 +859,38 @@ uint64_t runNeverFailTest(int numThreads, int numOps) {
   return nowMicro() - beginMicro;
 }
 
-TEST(MPMCQueue, mt_never_fail_until_system) {
-  int nts[] = {1, 3, 100};
-
-  int n = 100000;
+template <bool Dynamic = false>
+void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
   for (int nt : nts) {
     uint64_t elapsed =
-        runNeverFailTest<std::chrono::system_clock, std::atomic>(nt, n);
+      runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(nt, n);
     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
               << " threads";
   }
 }
 
-TEST(MPMCQueue, mt_never_fail_until_steady) {
-  int nts[] = {1, 3, 100};
-
+TEST(MPMCQueue, mt_never_fail_until_system) {
+  std::vector<int> nts {1, 3, 100};
   int n = 100000;
+  runMtNeverFailUntilSystem(nts, n);
+}
+
+template <bool Dynamic = false>
+void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
   for (int nt : nts) {
     uint64_t elapsed =
-        runNeverFailTest<std::chrono::steady_clock, std::atomic>(nt, n);
+      runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(nt, n);
     LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
               << " threads";
   }
 }
 
+TEST(MPMCQueue, mt_never_fail_until_steady) {
+  std::vector<int> nts {1, 3, 100};
+  int n = 100000;
+  runMtNeverFailUntilSteady(nts, n);
+}
+
 enum LifecycleEvent {
   NOTHING = -1,
   DEFAULT_CONSTRUCTOR,
@@ -795,7 +978,8 @@ void runPerfectForwardingTest() {
   EXPECT_EQ(lc_outstanding(), 0);
 
   {
-    MPMCQueue<Lifecycle<R>> queue(50);
+    // Non-dynamic only. False positive for dynamic.
+    MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
     LIFECYCLE_STEP(NOTHING);
 
     for (int pass = 0; pass < 10; ++pass) {
@@ -871,19 +1055,21 @@ TEST(MPMCQueue, perfect_forwarding_relocatable) {
   runPerfectForwardingTest<std::true_type>();
 }
 
-TEST(MPMCQueue, queue_moving) {
+template <bool Dynamic = false>
+void run_queue_moving() {
   lc_snap();
   EXPECT_EQ(lc_outstanding(), 0);
 
   {
-    MPMCQueue<Lifecycle<std::false_type>> a(50);
+    MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
     LIFECYCLE_STEP(NOTHING);
 
     a.blockingWrite();
     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
 
     // move constructor
-    MPMCQueue<Lifecycle<std::false_type>> b = std::move(a);
+    MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b
+      = std::move(a);
     LIFECYCLE_STEP(NOTHING);
     EXPECT_EQ(a.capacity(), 0);
     EXPECT_EQ(a.size(), 0);
@@ -894,7 +1080,7 @@ TEST(MPMCQueue, queue_moving) {
     LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
 
     // move operator
-    MPMCQueue<Lifecycle<std::false_type>> c;
+    MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
     LIFECYCLE_STEP(NOTHING);
     c = std::move(b);
     LIFECYCLE_STEP(NOTHING);
@@ -909,7 +1095,7 @@ TEST(MPMCQueue, queue_moving) {
 
       {
         // swap
-        MPMCQueue<Lifecycle<std::false_type>> d(10);
+        MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
         LIFECYCLE_STEP(NOTHING);
         std::swap(c, d);
         LIFECYCLE_STEP(NOTHING);
@@ -934,6 +1120,124 @@ TEST(MPMCQueue, queue_moving) {
   LIFECYCLE_STEP(DESTRUCTOR);
 }
 
+TEST(MPMCQueue, queue_moving) {
+  run_queue_moving();
+}
+
+TEST(MPMCQueue, queue_moving_dynamic) {
+  run_queue_moving<true>();
+}
+
 TEST(MPMCQueue, explicit_zero_capacity_fail) {
   ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
+
+  using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
+  ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
+}
+
+template <bool Dynamic>
+void testTryReadUntil() {
+  MPMCQueue<int, std::atomic, Dynamic> q{1};
+
+  const auto wait = std::chrono::milliseconds(100);
+  stop_watch<> watch;
+  bool rets[2];
+  int vals[2];
+  std::vector<std::thread> threads;
+  boost::barrier b{3};
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&, i] {
+      b.wait();
+      rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
+    });
+  }
+
+  b.wait();
+  EXPECT_TRUE(q.write(42));
+
+  for (int i = 0; i < 2; i++) {
+    threads[i].join();
+  }
+
+  for (int i = 0; i < 2; i++) {
+    int other = (i + 1) % 2;
+    if (rets[i]) {
+      EXPECT_EQ(42, vals[i]);
+      EXPECT_FALSE(rets[other]);
+    }
+  }
+
+  EXPECT_TRUE(watch.elapsed(wait));
+}
+
+template <bool Dynamic>
+void testTryWriteUntil() {
+  MPMCQueue<int, std::atomic, Dynamic> q{1};
+  EXPECT_TRUE(q.write(42));
+
+  const auto wait = std::chrono::milliseconds(100);
+  stop_watch<> watch;
+  bool rets[2];
+  std::vector<std::thread> threads;
+  boost::barrier b{3};
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&, i] {
+      b.wait();
+      rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
+    });
+  }
+
+  b.wait();
+  int x;
+  EXPECT_TRUE(q.read(x));
+  EXPECT_EQ(42, x);
+
+  for (int i = 0; i < 2; i++) {
+    threads[i].join();
+  }
+  EXPECT_TRUE(q.read(x));
+
+  for (int i = 0; i < 2; i++) {
+    int other = (i + 1) % 2;
+    if (rets[i]) {
+      EXPECT_EQ(i, x);
+      EXPECT_FALSE(rets[other]);
+    }
+  }
+
+  EXPECT_TRUE(watch.elapsed(wait));
+}
+
+TEST(MPMCQueue, try_read_until) {
+  testTryReadUntil<false>();
+}
+
+TEST(MPMCQueue, try_read_until_dynamic) {
+  testTryReadUntil<true>();
+}
+
+TEST(MPMCQueue, try_write_until) {
+  testTryWriteUntil<false>();
+}
+
+TEST(MPMCQueue, try_write_until_dynamic) {
+  testTryWriteUntil<true>();
+}
+
+template <bool Dynamic>
+void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
+  CHECK(q.write(1));
+  /* The following must not block forever */
+  q.tryWriteUntil(
+      std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
+}
+
+TEST(MPMCQueue, try_write_until_timeout) {
+  folly::MPMCQueue<int, std::atomic, false> queue(1);
+  testTimeout<false>(queue);
+}
+
+TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
+  folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
+  testTimeout<true>(queue);
 }