/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <folly/ThreadCachedInt.h>
#include <atomic>
+#include <condition_variable>
#include <thread>
#include <glog/logging.h>
-#include <gtest/gtest.h>
#include <folly/Benchmark.h>
#include <folly/Hash.h>
+#include <folly/ThreadId.h>
#include <folly/portability/GFlags.h>
+#include <folly/portability/GTest.h>
using namespace folly;
+using std::unique_ptr;
+using std::vector;
+
+using Counter = ThreadCachedInt<int64_t>;
+
+class ThreadCachedIntTest : public testing::Test {
+ public:
+ uint32_t GetDeadThreadsTotal(const Counter& counter) {
+ return counter.readFast();
+ }
+};
+
+// Multithreaded tests. Creates a specified number of threads each of
+// which iterates a different amount and dies.
+
+namespace {
+// Set cacheSize to be large so cached data moves to target_ only when
+// thread dies.
+Counter g_counter_for_mt_slow(0, UINT32_MAX);
+Counter g_counter_for_mt_fast(0, UINT32_MAX);
+
+// Used to sync between threads. The value of this variable is the
+// maximum iteration index upto which Runner() is allowed to go.
+uint32_t g_sync_for_mt(0);
+std::condition_variable cv;
+std::mutex cv_m;
+
+// Performs the specified number of iterations. Within each
+// iteration, it increments counter 10 times. At the beginning of
+// each iteration it checks g_sync_for_mt to see if it can proceed,
+// otherwise goes into a loop sleeping and rechecking.
+void Runner(Counter* counter, uint32_t iterations) {
+ for (uint32_t i = 0; i < iterations; ++i) {
+ std::unique_lock<std::mutex> lk(cv_m);
+ cv.wait(lk, [i] { return i < g_sync_for_mt; });
+ for (uint32_t j = 0; j < 10; ++j) {
+ counter->increment(1);
+ }
+ }
+}
+}
+
+// Slow test with fewer threads where there are more busy waits and
+// many calls to readFull(). This attempts to test as many of the
+// code paths in Counter as possible to ensure that counter values are
+// properly passed from thread local state, both at calls to
+// readFull() and at thread death.
+TEST_F(ThreadCachedIntTest, MultithreadedSlow) {
+ static constexpr uint32_t kNumThreads = 20;
+ g_sync_for_mt = 0;
+ vector<unique_ptr<std::thread>> threads(kNumThreads);
+ // Creates kNumThreads threads. Each thread performs a different
+ // number of iterations in Runner() - threads[0] performs 1
+ // iteration, threads[1] performs 2 iterations, threads[2] performs
+ // 3 iterations, and so on.
+ for (uint32_t i = 0; i < kNumThreads; ++i) {
+ threads[i].reset(new std::thread(Runner, &g_counter_for_mt_slow, i + 1));
+ }
+ // Variable to grab current counter value.
+ int32_t counter_value;
+ // The expected value of the counter.
+ int32_t total = 0;
+ // The expected value of GetDeadThreadsTotal().
+ int32_t dead_total = 0;
+ // Each iteration of the following thread allows one additional
+ // iteration of the threads. Given that the threads perform
+ // different number of iterations from 1 through kNumThreads, one
+ // thread will complete in each of the iterations of the loop below.
+ for (uint32_t i = 0; i < kNumThreads; ++i) {
+ // Allow upto iteration i on all threads.
+ {
+ std::lock_guard<std::mutex> lk(cv_m);
+ g_sync_for_mt = i + 1;
+ }
+ cv.notify_all();
+ total += (kNumThreads - i) * 10;
+ // Loop until the counter reaches its expected value.
+ do {
+ counter_value = g_counter_for_mt_slow.readFull();
+ } while (counter_value < total);
+ // All threads have done what they can until iteration i, now make
+ // sure they don't go further by checking 10 more times in the
+ // following loop.
+ for (uint32_t j = 0; j < 10; ++j) {
+ counter_value = g_counter_for_mt_slow.readFull();
+ EXPECT_EQ(total, counter_value);
+ }
+ dead_total += (i + 1) * 10;
+ EXPECT_GE(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
+ }
+ // All threads are done.
+ for (uint32_t i = 0; i < kNumThreads; ++i) {
+ threads[i]->join();
+ }
+ counter_value = g_counter_for_mt_slow.readFull();
+ EXPECT_EQ(total, counter_value);
+ EXPECT_EQ(total, dead_total);
+ EXPECT_EQ(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
+}
+
+// Fast test with lots of threads and only one call to readFull()
+// at the end.
+TEST_F(ThreadCachedIntTest, MultithreadedFast) {
+ static constexpr uint32_t kNumThreads = 1000;
+ g_sync_for_mt = 0;
+ vector<unique_ptr<std::thread>> threads(kNumThreads);
+ // Creates kNumThreads threads. Each thread performs a different
+ // number of iterations in Runner() - threads[0] performs 1
+ // iteration, threads[1] performs 2 iterations, threads[2] performs
+ // 3 iterations, and so on.
+ for (uint32_t i = 0; i < kNumThreads; ++i) {
+ threads[i].reset(new std::thread(Runner, &g_counter_for_mt_fast, i + 1));
+ }
+ // Let the threads run to completion.
+ {
+ std::lock_guard<std::mutex> lk(cv_m);
+ g_sync_for_mt = kNumThreads;
+ }
+ cv.notify_all();
+ // The expected value of the counter.
+ uint32_t total = 0;
+ for (uint32_t i = 0; i < kNumThreads; ++i) {
+ total += (kNumThreads - i) * 10;
+ }
+ // Wait for all threads to complete.
+ for (uint32_t i = 0; i < kNumThreads; ++i) {
+ threads[i]->join();
+ }
+ int32_t counter_value = g_counter_for_mt_fast.readFull();
+ EXPECT_EQ(total, counter_value);
+ EXPECT_EQ(total, GetDeadThreadsTotal(g_counter_for_mt_fast));
+}
+
TEST(ThreadCachedInt, SingleThreadedNotCached) {
ThreadCachedInt<int64_t> val(0, 0);
EXPECT_EQ(0, val.readFast());
std::vector<std::thread> threads; \
for (int i = 0; i < FLAGS_numThreads; ++i) { \
threads.push_back(std::thread([&]() { \
- for (int i = 0; i < iterPerThread; ++i) { \
+ for (int j = 0; j < iterPerThread; ++j) { \
inc_stmt; \
} \
})); \
std::atomic<int64_t> ints_[kBuckets_];
inline void inc(int64_t val = 1) {
- int bucket = hash::twang_mix64(
- uint64_t(pthread_self())) & (kBuckets_ - 1);
- std::atomic_fetch_add(&ints_[bucket], val);
+ int buck = hash::twang_mix64(folly::getCurrentThreadID()) & (kBuckets_ - 1);
+ std::atomic_fetch_add(&ints_[buck], val);
}
// read the first few and extrapolate