Consistently have the namespace closing comment
[folly.git] / folly / test / ThreadCachedIntTest.cpp
index a84a82fd0052d8fc6f179267234f1fd38e04ad33..f4a77d3fc249551e0cb241d9df8d05070fdc5e71 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-#include "folly/ThreadCachedInt.h"
-#include "folly/Hash.h"
+#include <folly/ThreadCachedInt.h>
 
 #include <atomic>
+#include <condition_variable>
+#include <memory>
 #include <thread>
-#include <gtest/gtest.h>
-#include <gflags/gflags.h>
+
 #include <glog/logging.h>
-#include "folly/Benchmark.h"
+
+#include <folly/Benchmark.h>
+#include <folly/Hash.h>
+#include <folly/portability/GFlags.h>
+#include <folly/portability/GTest.h>
+#include <folly/system/ThreadId.h>
 
 using namespace folly;
 
+using std::unique_ptr;
+using std::vector;
+
+using Counter = ThreadCachedInt<int64_t>;
+
+class ThreadCachedIntTest : public testing::Test {
+ public:
+  uint32_t GetDeadThreadsTotal(const Counter& counter) {
+    return counter.readFast();
+  }
+};
+
+// Multithreaded tests.  Creates a specified number of threads each of
+// which iterates a different amount and dies.
+
+namespace {
+// Set cacheSize to be large so cached data moves to target_ only when
+// thread dies.
+Counter g_counter_for_mt_slow(0, UINT32_MAX);
+Counter g_counter_for_mt_fast(0, UINT32_MAX);
+
+// Used to sync between threads.  The value of this variable is the
+// maximum iteration index upto which Runner() is allowed to go.
+uint32_t g_sync_for_mt(0);
+std::condition_variable cv;
+std::mutex cv_m;
+
+// Performs the specified number of iterations.  Within each
+// iteration, it increments counter 10 times.  At the beginning of
+// each iteration it checks g_sync_for_mt to see if it can proceed,
+// otherwise goes into a loop sleeping and rechecking.
+void Runner(Counter* counter, uint32_t iterations) {
+  for (uint32_t i = 0; i < iterations; ++i) {
+    std::unique_lock<std::mutex> lk(cv_m);
+    cv.wait(lk, [i] { return i < g_sync_for_mt; });
+    for (uint32_t j = 0; j < 10; ++j) {
+      counter->increment(1);
+    }
+  }
+}
+} // namespace
+
+// Slow test with fewer threads where there are more busy waits and
+// many calls to readFull().  This attempts to test as many of the
+// code paths in Counter as possible to ensure that counter values are
+// properly passed from thread local state, both at calls to
+// readFull() and at thread death.
+TEST_F(ThreadCachedIntTest, MultithreadedSlow) {
+  static constexpr uint32_t kNumThreads = 20;
+  g_sync_for_mt = 0;
+  vector<unique_ptr<std::thread>> threads(kNumThreads);
+  // Creates kNumThreads threads.  Each thread performs a different
+  // number of iterations in Runner() - threads[0] performs 1
+  // iteration, threads[1] performs 2 iterations, threads[2] performs
+  // 3 iterations, and so on.
+  for (uint32_t i = 0; i < kNumThreads; ++i) {
+    threads[i] =
+        std::make_unique<std::thread>(Runner, &g_counter_for_mt_slow, i + 1);
+  }
+  // Variable to grab current counter value.
+  int32_t counter_value;
+  // The expected value of the counter.
+  int32_t total = 0;
+  // The expected value of GetDeadThreadsTotal().
+  int32_t dead_total = 0;
+  // Each iteration of the following thread allows one additional
+  // iteration of the threads.  Given that the threads perform
+  // different number of iterations from 1 through kNumThreads, one
+  // thread will complete in each of the iterations of the loop below.
+  for (uint32_t i = 0; i < kNumThreads; ++i) {
+    // Allow upto iteration i on all threads.
+    {
+      std::lock_guard<std::mutex> lk(cv_m);
+      g_sync_for_mt = i + 1;
+    }
+    cv.notify_all();
+    total += (kNumThreads - i) * 10;
+    // Loop until the counter reaches its expected value.
+    do {
+      counter_value = g_counter_for_mt_slow.readFull();
+    } while (counter_value < total);
+    // All threads have done what they can until iteration i, now make
+    // sure they don't go further by checking 10 more times in the
+    // following loop.
+    for (uint32_t j = 0; j < 10; ++j) {
+      counter_value = g_counter_for_mt_slow.readFull();
+      EXPECT_EQ(total, counter_value);
+    }
+    dead_total += (i + 1) * 10;
+    EXPECT_GE(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
+  }
+  // All threads are done.
+  for (uint32_t i = 0; i < kNumThreads; ++i) {
+    threads[i]->join();
+  }
+  counter_value = g_counter_for_mt_slow.readFull();
+  EXPECT_EQ(total, counter_value);
+  EXPECT_EQ(total, dead_total);
+  EXPECT_EQ(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
+}
+
+// Fast test with lots of threads and only one call to readFull()
+// at the end.
+TEST_F(ThreadCachedIntTest, MultithreadedFast) {
+  static constexpr uint32_t kNumThreads = 1000;
+  g_sync_for_mt = 0;
+  vector<unique_ptr<std::thread>> threads(kNumThreads);
+  // Creates kNumThreads threads.  Each thread performs a different
+  // number of iterations in Runner() - threads[0] performs 1
+  // iteration, threads[1] performs 2 iterations, threads[2] performs
+  // 3 iterations, and so on.
+  for (uint32_t i = 0; i < kNumThreads; ++i) {
+    threads[i] =
+        std::make_unique<std::thread>(Runner, &g_counter_for_mt_fast, i + 1);
+  }
+  // Let the threads run to completion.
+  {
+    std::lock_guard<std::mutex> lk(cv_m);
+    g_sync_for_mt = kNumThreads;
+  }
+  cv.notify_all();
+  // The expected value of the counter.
+  uint32_t total = 0;
+  for (uint32_t i = 0; i < kNumThreads; ++i) {
+    total += (kNumThreads - i) * 10;
+  }
+  // Wait for all threads to complete.
+  for (uint32_t i = 0; i < kNumThreads; ++i) {
+    threads[i]->join();
+  }
+  int32_t counter_value = g_counter_for_mt_fast.readFull();
+  EXPECT_EQ(total, counter_value);
+  EXPECT_EQ(total, GetDeadThreadsTotal(g_counter_for_mt_fast));
+}
+
 TEST(ThreadCachedInt, SingleThreadedNotCached) {
   ThreadCachedInt<int64_t> val(0, 0);
   EXPECT_EQ(0, val.readFast());
@@ -138,7 +278,7 @@ MAKE_MT_CACHE_SIZE_BM(32);
     std::vector<std::thread> threads;                           \
     for (int i = 0; i < FLAGS_numThreads; ++i) {                \
       threads.push_back(std::thread([&]() {                     \
-            for (int i = 0; i < iterPerThread; ++i) {           \
+            for (int j = 0; j < iterPerThread; ++j) {           \
               inc_stmt;                                         \
             }                                                   \
           }));                                                  \
@@ -162,9 +302,8 @@ struct ShardedAtomicInt {
   std::atomic<int64_t> ints_[kBuckets_];
 
   inline void inc(int64_t val = 1) {
-    int bucket = hash::twang_mix64(
-      uint64_t(pthread_self())) & (kBuckets_ - 1);
-    std::atomic_fetch_add(&ints_[bucket], val);
+    int buck = hash::twang_mix64(folly::getCurrentThreadID()) & (kBuckets_ - 1);
+    std::atomic_fetch_add(&ints_[buck], val);
   }
 
   // read the first few and extrapolate
@@ -239,9 +378,9 @@ BENCHMARK_DRAW_LINE();
 
 int main(int argc, char** argv) {
   testing::InitGoogleTest(&argc, argv);
-  google::ParseCommandLineFlags(&argc, &argv, true);
-  google::SetCommandLineOptionWithMode(
-    "bm_min_usec", "10000", google::SET_FLAG_IF_DEFAULT
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  gflags::SetCommandLineOptionWithMode(
+    "bm_min_usec", "10000", gflags::SET_FLAG_IF_DEFAULT
   );
   if (FLAGS_benchmark) {
     folly::runBenchmarks();