X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Ftest%2FSynchronizedTestLib-inl.h;h=9c380bcedb59418871bdbb6bc9e94f7e0bdc7849;hp=c289ab4df58f4a783eac9a6e56808660c6659c95;hb=6b66499c48ba83883c6922326cd5d618e7e3a05d;hpb=a2b94586d9000f0b87b9653d237fb5e5960a25be diff --git a/folly/test/SynchronizedTestLib-inl.h b/folly/test/SynchronizedTestLib-inl.h index c289ab4d..9c380bce 100644 --- a/folly/test/SynchronizedTestLib-inl.h +++ b/folly/test/SynchronizedTestLib-inl.h @@ -23,22 +23,93 @@ #include #include #include +#include #include #include #include #include #include +namespace folly { +namespace sync_tests { + inline std::mt19937& getRNG() { static const auto seed = folly::randomNumberSeed(); static std::mt19937 rng(seed); return rng; } -template -Integral2 random(Integral1 low, Integral2 up) { - std::uniform_int_distribution<> range(low, up); - return range(getRNG()); +void randomSleep(std::chrono::milliseconds min, std::chrono::milliseconds max) { + std::uniform_int_distribution<> range(min.count(), max.count()); + std::chrono::milliseconds duration(range(getRNG())); + std::this_thread::sleep_for(duration); +} + +/* + * Run a functon simultaneously in a number of different threads. + * + * The function will be passed the index number of the thread it is running in. + * This function makes an attempt to synchronize the start of the threads as + * best as possible. It waits for all threads to be allocated and started + * before invoking the function. + */ +template +void runParallel(size_t numThreads, const Function& function) { + std::vector threads; + threads.reserve(numThreads); + + // Variables used to synchronize all threads to try and start them + // as close to the same time as possible + // + // TODO: At the moment Synchronized doesn't work with condition variables. + // Update this to use Synchronized once the condition_variable support lands. + std::mutex threadsReadyMutex; + size_t threadsReady = 0; + std::condition_variable readyCV; + std::mutex goMutex; + bool go = false; + std::condition_variable goCV; + + auto worker = [&](size_t threadIndex) { + // Signal that we are ready + { + std::lock_guard lock(threadsReadyMutex); + ++threadsReady; + } + readyCV.notify_one(); + + // Wait until we are given the signal to start + // The purpose of this is to try and make sure all threads start + // as close to the same time as possible. + { + std::unique_lock lock(goMutex); + goCV.wait(lock, [&] { return go; }); + } + + function(threadIndex); + }; + + // Start all of the threads + for (size_t threadIndex = 0; threadIndex < numThreads; ++threadIndex) { + threads.emplace_back([threadIndex, &worker]() { worker(threadIndex); }); + } + + // Wait for all threads to become ready + { + std::unique_lock lock(threadsReadyMutex); + readyCV.wait(lock, [&] { return threadsReady == numThreads; }); + } + { + std::lock_guard lock(goMutex); + go = true; + } + // Now signal the threads that they can go + goCV.notify_all(); + + // Wait for all threads to finish + for (auto& thread : threads) { + thread.join(); + } } template @@ -48,23 +119,23 @@ void testBasic() { obj->resize(1000); auto obj2 = obj; - EXPECT_EQ(obj2->size(), 1000); + EXPECT_EQ(1000, obj2->size()); SYNCHRONIZED (obj) { obj.push_back(10); - EXPECT_EQ(obj.size(), 1001); - EXPECT_EQ(obj.back(), 10); - EXPECT_EQ(obj2->size(), 1000); + EXPECT_EQ(1001, obj.size()); + EXPECT_EQ(10, obj.back()); + EXPECT_EQ(1000, obj2->size()); UNSYNCHRONIZED(obj) { - EXPECT_EQ(obj->size(), 1001); + EXPECT_EQ(1001, obj->size()); } } SYNCHRONIZED_CONST (obj) { - EXPECT_EQ(obj.size(), 1001); + EXPECT_EQ(1001, obj.size()); UNSYNCHRONIZED(obj) { - EXPECT_EQ(obj->size(), 1001); + EXPECT_EQ(1001, obj->size()); } } @@ -72,9 +143,9 @@ void testBasic() { lockedObj.front() = 2; } - EXPECT_EQ(obj->size(), 1001); - EXPECT_EQ(obj->back(), 10); - EXPECT_EQ(obj2->size(), 1000); + EXPECT_EQ(1001, obj->size()); + EXPECT_EQ(10, obj->back()); + EXPECT_EQ(1000, obj2->size()); EXPECT_EQ(FB_ARG_2_OR_1(1, 2), 2); EXPECT_EQ(FB_ARG_2_OR_1(1), 1); @@ -82,43 +153,30 @@ void testBasic() { template void testConcurrency() { folly::Synchronized, Mutex> v; - - struct Local { - static bool threadMain(int i, - folly::Synchronized, Mutex>& pv) { - usleep(::random(100 * 1000, 1000 * 1000)); - - // Test operator-> - pv->push_back(2 * i); - - // Aaand test the SYNCHRONIZED macro - SYNCHRONIZED (v, pv) { - v.push_back(2 * i + 1); - } - - return true; + static const size_t numThreads = 100; + // Note: I initially tried using itersPerThread = 1000, + // which works fine for most lock types, but std::shared_timed_mutex + // appears to be extraordinarily slow. It could take around 30 seconds + // to run this test with 1000 iterations per thread using shared_timed_mutex. + static const size_t itersPerThread = 100; + + auto pushNumbers = [&](size_t threadIdx) { + // Test lock() + for (size_t n = 0; n < itersPerThread; ++n) { + v->push_back((itersPerThread * threadIdx) + n); + sched_yield(); } }; - - std::vector results; - - static const size_t threads = 100; - FOR_EACH_RANGE (i, 0, threads) { - results.push_back(std::thread([&, i]() { Local::threadMain(i, v); })); - } - - FOR_EACH (i, results) { - i->join(); - } + runParallel(numThreads, pushNumbers); std::vector result; v.swap(result); - EXPECT_EQ(result.size(), 2 * threads); + EXPECT_EQ(numThreads * itersPerThread, result.size()); sort(result.begin(), result.end()); - FOR_EACH_RANGE (i, 0, 2 * threads) { - EXPECT_EQ(result[i], i); + for (size_t i = 0; i < itersPerThread * numThreads; ++i) { + EXPECT_EQ(i, result[i]); } } @@ -126,50 +184,30 @@ template void testDualLocking() { folly::Synchronized, Mutex> v; folly::Synchronized, Mutex> m; - struct Local { - static bool threadMain( - int i, - folly::Synchronized, Mutex>& pv, - folly::Synchronized, Mutex>& pm) { - - usleep(::random(100 * 1000, 1000 * 1000)); - - if (i & 1) { - SYNCHRONIZED_DUAL (v, pv, m, pm) { - v.push_back(i); - m[i] = i + 1; - } - } else { - SYNCHRONIZED_DUAL (m, pm, v, pv) { - v.push_back(i); - m[i] = i + 1; - } + auto dualLockWorker = [&](size_t threadIdx) { + if (threadIdx & 1) { + SYNCHRONIZED_DUAL(lv, v, lm, m) { + lv.push_back(threadIdx); + lm[threadIdx] = threadIdx + 1; + } + } else { + SYNCHRONIZED_DUAL(lm, m, lv, v) { + lv.push_back(threadIdx); + lm[threadIdx] = threadIdx + 1; } - - return true; } }; - - std::vector results; - - static const size_t threads = 100; - FOR_EACH_RANGE (i, 0, threads) { - results.push_back( - std::thread([&, i]() { Local::threadMain(i, v, m); })); - } - - FOR_EACH (i, results) { - i->join(); - } + static const size_t numThreads = 100; + runParallel(numThreads, dualLockWorker); std::vector result; v.swap(result); - EXPECT_EQ(result.size(), threads); + EXPECT_EQ(numThreads, result.size()); sort(result.begin(), result.end()); - FOR_EACH_RANGE (i, 0, threads) { - EXPECT_EQ(result[i], i); + for (size_t i = 0; i < numThreads; ++i) { + EXPECT_EQ(i, result[i]); } } @@ -177,152 +215,139 @@ template void testDualLockingWithConst() { folly::Synchronized, Mutex> v; folly::Synchronized, Mutex> m; - struct Local { - static bool threadMain( - int i, - folly::Synchronized, Mutex>& pv, - const folly::Synchronized, Mutex>& pm) { - - usleep(::random(100 * 1000, 1000 * 1000)); - - if (i & 1) { - SYNCHRONIZED_DUAL (v, pv, m, pm) { - (void)m.size(); - v.push_back(i); - } - } else { - SYNCHRONIZED_DUAL (m, pm, v, pv) { - (void)m.size(); - v.push_back(i); - } + auto dualLockWorker = [&](size_t threadIdx) { + const auto& cm = m; + if (threadIdx & 1) { + SYNCHRONIZED_DUAL(lv, v, lm, cm) { + (void)lm.size(); + lv.push_back(threadIdx); + } + } else { + SYNCHRONIZED_DUAL(lm, cm, lv, v) { + (void)lm.size(); + lv.push_back(threadIdx); } - - return true; } }; - - std::vector results; - - static const size_t threads = 100; - FOR_EACH_RANGE (i, 0, threads) { - results.push_back( - std::thread([&, i]() { Local::threadMain(i, v, m); })); - } - - FOR_EACH (i, results) { - i->join(); - } + static const size_t numThreads = 100; + runParallel(numThreads, dualLockWorker); std::vector result; v.swap(result); - EXPECT_EQ(result.size(), threads); + EXPECT_EQ(numThreads, result.size()); sort(result.begin(), result.end()); - FOR_EACH_RANGE (i, 0, threads) { - EXPECT_EQ(result[i], i); + for (size_t i = 0; i < numThreads; ++i) { + EXPECT_EQ(i, result[i]); } } template void testTimedSynchronized() { folly::Synchronized, Mutex> v; - - struct Local { - static bool threadMain(int i, - folly::Synchronized, Mutex>& pv) { - usleep(::random(100 * 1000, 1000 * 1000)); - - // Test operator-> - pv->push_back(2 * i); - - // Aaand test the TIMED_SYNCHRONIZED macro - for (;;) - TIMED_SYNCHRONIZED (10, v, pv) { - if (v) { - usleep(::random(15 * 1000, 150 * 1000)); - v->push_back(2 * i + 1); - return true; - } - else { - // do nothing - usleep(::random(10 * 1000, 100 * 1000)); - } + folly::Synchronized numTimeouts; + + auto worker = [&](size_t threadIdx) { + // Test operator-> + v->push_back(2 * threadIdx); + + // Aaand test the TIMED_SYNCHRONIZED macro + for (;;) + TIMED_SYNCHRONIZED(5, lv, v) { + if (lv) { + // Sleep for a random time to ensure we trigger timeouts + // in other threads + randomSleep( + std::chrono::milliseconds(5), std::chrono::milliseconds(15)); + lv->push_back(2 * threadIdx + 1); + return; } - return true; - } + SYNCHRONIZED(numTimeouts) { + ++numTimeouts; + } + } }; - std::vector results; - - static const size_t threads = 100; - FOR_EACH_RANGE (i, 0, threads) { - results.push_back(std::thread([&, i]() { Local::threadMain(i, v); })); - } - - FOR_EACH (i, results) { - i->join(); - } + static const size_t numThreads = 100; + runParallel(numThreads, worker); std::vector result; v.swap(result); - EXPECT_EQ(result.size(), 2 * threads); + EXPECT_EQ(2 * numThreads, result.size()); sort(result.begin(), result.end()); - FOR_EACH_RANGE (i, 0, 2 * threads) { - EXPECT_EQ(result[i], i); + for (size_t i = 0; i < 2 * numThreads; ++i) { + EXPECT_EQ(i, result[i]); + } + // We generally expect a large number of number timeouts here. + // I'm not adding a check for it since it's theoretically possible that + // we might get 0 timeouts depending on the CPU scheduling if our threads + // don't get to run very often. + uint64_t finalNumTimeouts = 0; + SYNCHRONIZED(numTimeouts) { + finalNumTimeouts = numTimeouts; } + LOG(INFO) << "testTimedSynchronized: " << finalNumTimeouts << " timeouts"; } template void testTimedSynchronizedWithConst() { folly::Synchronized, Mutex> v; - - struct Local { - static bool threadMain(int i, - folly::Synchronized, Mutex>& pv) { - usleep(::random(100 * 1000, 1000 * 1000)); - - // Test operator-> - pv->push_back(i); - - usleep(::random(5 * 1000, 1000 * 1000)); - // Test TIMED_SYNCHRONIZED_CONST - for (;;) { - TIMED_SYNCHRONIZED_CONST (10, v, pv) { - if (v) { - auto found = std::find(v->begin(), v->end(), i); - CHECK(found != v->end()); - return true; - } else { - // do nothing - usleep(::random(10 * 1000, 100 * 1000)); + folly::Synchronized numTimeouts; + + auto worker = [&](size_t threadIdx) { + // Test operator-> + v->push_back(threadIdx); + + // Test TIMED_SYNCHRONIZED_CONST + for (;;) { + TIMED_SYNCHRONIZED_CONST(10, lv, v) { + if (lv) { + // Sleep while holding the lock. + // + // This will block other threads from acquiring the write lock to add + // their thread index to v, but it won't block threads that have + // entered the for loop and are trying to acquire a read lock. + // + // For lock types that give preference to readers rather than writers, + // this will tend to serialize all threads on the wlock() above. + randomSleep( + std::chrono::milliseconds(5), std::chrono::milliseconds(15)); + auto found = std::find(lv->begin(), lv->end(), threadIdx); + CHECK(found != lv->end()); + return; + } else { + SYNCHRONIZED(numTimeouts) { + ++numTimeouts; } } } } }; - std::vector results; - - static const size_t threads = 100; - FOR_EACH_RANGE (i, 0, threads) { - results.push_back(std::thread([&, i]() { Local::threadMain(i, v); })); - } - - FOR_EACH (i, results) { - i->join(); - } + static const size_t numThreads = 100; + runParallel(numThreads, worker); std::vector result; v.swap(result); - EXPECT_EQ(result.size(), threads); + EXPECT_EQ(numThreads, result.size()); sort(result.begin(), result.end()); - FOR_EACH_RANGE (i, 0, threads) { - EXPECT_EQ(result[i], i); + for (size_t i = 0; i < numThreads; ++i) { + EXPECT_EQ(i, result[i]); + } + // We generally expect a small number of timeouts here. + // For locks that give readers preference over writers this should usually + // be 0. With locks that give writers preference we do see a small-ish + // number of read timeouts. + uint64_t finalNumTimeouts = 0; + SYNCHRONIZED(numTimeouts) { + finalNumTimeouts = numTimeouts; } + LOG(INFO) << "testTimedSynchronizedWithConst: " << finalNumTimeouts + << " timeouts"; } template void testConstCopy() { @@ -332,10 +357,10 @@ template void testConstCopy() { std::vector result; v.copy(&result); - EXPECT_EQ(result, input); + EXPECT_EQ(input, result); result = v.copy(); - EXPECT_EQ(result, input); + EXPECT_EQ(input, result); } struct NotCopiableNotMovable { @@ -352,3 +377,5 @@ template void testInPlaceConstruction() { folly::construct_in_place, 5, "a" ); } +} +}