aggregator: fix `doEagerInitVia`
[folly.git] / folly / test / MPMCQueueTest.cpp
index ea0fbd123d81dd10548f5684e378f0898036abb7..e606a24e9872b16883ecbe500d11052182c96108 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -45,7 +45,7 @@ void run_mt_sequencer_thread(
     int numOps,
     uint32_t init,
     TurnSequencer<Atom>& seq,
-    Atom<int>& spinThreshold,
+    Atom<uint32_t>& spinThreshold,
     int& prev,
     int i) {
   for (int op = i; op < numOps; op += numThreads) {
@@ -59,7 +59,7 @@ void run_mt_sequencer_thread(
 template <template<typename> class Atom>
 void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
   TurnSequencer<Atom> seq(init);
-  Atom<int> spinThreshold(0);
+  Atom<uint32_t> spinThreshold(0);
 
   int prev = -1;
   std::vector<std::thread> threads(numThreads);
@@ -82,6 +82,12 @@ TEST(MPMCQueue, sequencer) {
   run_mt_sequencer_test<std::atomic>(100, 10000, -100);
 }
 
+TEST(MPMCQueue, sequencer_emulated_futex) {
+  run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
+  run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
+  run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
+}
+
 TEST(MPMCQueue, sequencer_deterministic) {
   DSched sched(DSched::uniform(0));
   run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
@@ -100,17 +106,27 @@ void runElementTypeTest(T&& src) {
 }
 
 struct RefCounted {
+  static __thread int active_instances;
+
   mutable std::atomic<int> rc;
 
-  RefCounted() : rc(0) {}
+  RefCounted() : rc(0) {
+    ++active_instances;
+  }
+
+  ~RefCounted() {
+    --active_instances;
+  }
 };
+__thread int RefCounted::active_instances;
+
 
 void intrusive_ptr_add_ref(RefCounted const* p) {
   p->rc++;
 }
 
 void intrusive_ptr_release(RefCounted const* p) {
-  if (--(p->rc)) {
+  if (--(p->rc) == 0) {
     delete p;
   }
 }
@@ -123,6 +139,7 @@ TEST(MPMCQueue, lots_of_element_types) {
   runElementTypeTest(std::make_shared<char>('a'));
   runElementTypeTest(folly::make_unique<char>('a'));
   runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
+  EXPECT_EQ(RefCounted::active_instances, 0);
 }
 
 TEST(MPMCQueue, single_thread_enqdeq) {
@@ -158,7 +175,7 @@ TEST(MPMCQueue, single_thread_enqdeq) {
 TEST(MPMCQueue, tryenq_capacity_test) {
   for (size_t cap = 1; cap < 100; ++cap) {
     MPMCQueue<int> cq(cap);
-    for (int i = 0; i < cap; ++i) {
+    for (size_t i = 0; i < cap; ++i) {
       EXPECT_TRUE(cq.write(i));
     }
     EXPECT_FALSE(cq.write(100));
@@ -243,6 +260,15 @@ TEST(MPMCQueue, mt_try_enq_deq) {
   }
 }
 
+TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
+  }
+}
+
 TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
   int nts[] = { 3, 10 };
 
@@ -362,6 +388,20 @@ TEST(MPMCQueue, mt_prod_cons) {
   LOG(INFO) << PC_BENCH(MPMCQueue<int>(100000), 32, 100, n);
 }
 
+TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
+  int n = 100000;
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 1, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 1, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 1, 10, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10)), 10, 10, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 1, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 1, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 1, 10, n);
+  LOG(INFO) << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(10000)), 10, 10, n);
+  LOG(INFO)
+    << PC_BENCH((MPMCQueue<int,EmulatedFutexAtomic>(100000)), 32, 100, n);
+}
+
 template <template<typename> class Atom>
 void runNeverFailThread(
     int numThreads,
@@ -416,6 +456,17 @@ TEST(MPMCQueue, mt_never_fail) {
   }
 }
 
+TEST(MPMCQueue, mt_never_fail_emulated_futex) {
+  int nts[] = { 1, 3, 100 };
+
+  int n = 100000;
+  for (int nt : nts) {
+    uint64_t elapsed = runNeverFailTest<EmulatedFutexAtomic>(nt, n);
+    LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with "
+              << nt << " threads";
+  }
+}
+
 TEST(MPMCQueue, mt_never_fail_deterministic) {
   int nts[] = { 3, 10 };
 
@@ -660,6 +711,11 @@ TEST(MPMCQueue, queue_moving) {
   LIFECYCLE_STEP(DESTRUCTOR);
 }
 
+TEST(MPMCQueue, explicit_zero_capacity_fail) {
+  ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
+}
+
+
 int main(int argc, char ** argv) {
   testing::InitGoogleTest(&argc, argv);
   gflags::ParseCommandLineFlags(&argc, &argv, true);