X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffibers%2Ftest%2FFibersTest.cpp;h=eeee0b2e9ed8e38778ebce07a7fc8a25925984be;hb=2c0ece4a278ae289abd4b08e01d1613bf326c1e7;hp=e962145aeaa5d24970ce4db72510257aba0693bf;hpb=235fb4c082b0a8093892fc1149b84223701de1c4;p=folly.git diff --git a/folly/fibers/test/FibersTest.cpp b/folly/fibers/test/FibersTest.cpp index e962145a..eeee0b2e 100644 --- a/folly/fibers/test/FibersTest.cpp +++ b/folly/fibers/test/FibersTest.cpp @@ -17,19 +17,20 @@ #include #include -#include - -#include #include #include +#include #include +#include #include #include #include #include +#include #include #include +#include using namespace folly::fibers; @@ -1541,104 +1542,251 @@ TEST(FiberManager, nestedFiberManagers) { outerEvb.loopForever(); } -static size_t sNumAwaits; - -void runBenchmark(size_t numAwaits, size_t toSend) { - sNumAwaits = numAwaits; - - FiberManager fiberManager(folly::make_unique()); - auto& loopController = - dynamic_cast(fiberManager.loopController()); +TEST(FiberManager, semaphore) { + constexpr size_t kTasks = 10; + constexpr size_t kIterations = 10000; + constexpr size_t kNumTokens = 10; - std::queue> pendingRequests; - static const size_t maxOutstanding = 5; + Semaphore sem(kNumTokens); + int counterA = 0; + int counterB = 0; - auto loop = [&fiberManager, &loopController, &pendingRequests, &toSend]() { - if (pendingRequests.size() == maxOutstanding || toSend == 0) { - if (pendingRequests.empty()) { - return; + auto task = [&sem, kTasks, kIterations, kNumTokens]( + int& counter, folly::fibers::Baton& baton) { + FiberManager manager(folly::make_unique()); + folly::EventBase evb; + dynamic_cast(manager.loopController()) + .attachEventBase(evb); + + { + std::shared_ptr completionCounter( + &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); }); + + for (size_t i = 0; i < kTasks; ++i) { + manager.addTask([&, completionCounter]() { + for (size_t i = 0; i < kIterations; ++i) { + sem.wait(); + ++counter; + sem.signal(); + --counter; + + EXPECT_LT(counter, kNumTokens); + EXPECT_GE(counter, 0); + } + }); } - pendingRequests.front().setValue(0); - pendingRequests.pop(); - } else { - fiberManager.addTask([&pendingRequests]() { - for (size_t i = 0; i < sNumAwaits; ++i) { - auto result = await([&pendingRequests](Promise promise) { - pendingRequests.push(std::move(promise)); - }); - DCHECK_EQ(result, 0); - } - }); - if (--toSend == 0) { - loopController.stop(); - } + baton.wait(); } + evb.loopForever(); }; - loopController.loop(std::move(loop)); -} + folly::fibers::Baton batonA; + folly::fibers::Baton batonB; + std::thread threadA([&] { task(counterA, batonA); }); + std::thread threadB([&] { task(counterB, batonB); }); + + batonA.post(); + batonB.post(); + threadA.join(); + threadB.join(); -BENCHMARK(FiberManagerBasicOneAwait, iters) { - runBenchmark(1, iters); + EXPECT_LT(counterA, kNumTokens); + EXPECT_LT(counterB, kNumTokens); + EXPECT_GE(counterA, 0); + EXPECT_GE(counterB, 0); } -BENCHMARK(FiberManagerBasicFiveAwaits, iters) { - runBenchmark(5, iters); +template +void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) { + thread_local BatchDispatcher batchDispatcher( + executor, [=](std::vector&& batch) { + EXPECT_EQ(batchSize, batch.size()); + std::vector results; + for (auto& it : batch) { + results.push_back(folly::to(it)); + } + return results; + }); + + auto indexCopy = index; + auto result = batchDispatcher.add(std::move(indexCopy)); + EXPECT_EQ(folly::to(index), result.get()); } -BENCHMARK(FiberManagerCreateDestroy, iters) { - for (size_t i = 0; i < iters; ++i) { - folly::EventBase evb; - auto& fm = folly::fibers::getFiberManager(evb); - fm.addTask([]() {}); - evb.loop(); - } +TEST(FiberManager, batchDispatchTest) { + folly::EventBase evb; + auto& executor = getFiberManager(evb); + + // Launch multiple fibers with a single id. + executor.add([&]() { + int batchSize = 10; + for (int i = 0; i < batchSize; i++) { + executor.add( + [=, &executor]() { singleBatchDispatch(executor, batchSize, i); }); + } + }); + evb.loop(); + + // Reuse the same BatchDispatcher to batch once again. + executor.add([&]() { + int batchSize = 10; + for (int i = 0; i < batchSize; i++) { + executor.add( + [=, &executor]() { singleBatchDispatch(executor, batchSize, i); }); + } + }); + evb.loop(); } -BENCHMARK(FiberManagerAllocateDeallocatePattern, iters) { - static const size_t kNumAllocations = 10000; +template +folly::Future> doubleBatchInnerDispatch( + ExecutorT& executor, + int totalNumberOfElements, + std::vector input) { + thread_local BatchDispatcher< + std::vector, + std::vector, + ExecutorT> + batchDispatcher(executor, [=](std::vector>&& batch) { + std::vector> results; + int numberOfElements = 0; + for (auto& unit : batch) { + numberOfElements += unit.size(); + std::vector result; + for (auto& element : unit) { + result.push_back(folly::to(element)); + } + results.push_back(std::move(result)); + } + EXPECT_EQ(totalNumberOfElements, numberOfElements); + return results; + }); - FiberManager::Options opts; - opts.maxFibersPoolSize = 0; + return batchDispatcher.add(std::move(input)); +} - FiberManager fiberManager(folly::make_unique(), opts); +/** + * Batch values in groups of 5, and then call inner dispatch. + */ +template +void doubleBatchOuterDispatch( + ExecutorT& executor, + int totalNumberOfElements, + int index) { + thread_local BatchDispatcher + batchDispatcher(executor, [=, &executor](std::vector&& batch) { + EXPECT_EQ(totalNumberOfElements, batch.size()); + std::vector results; + std::vector>> + innerDispatchResultFutures; + + std::vector group; + for (auto unit : batch) { + group.push_back(unit); + if (group.size() == 5) { + auto localGroup = group; + group.clear(); + + innerDispatchResultFutures.push_back(doubleBatchInnerDispatch( + executor, totalNumberOfElements, localGroup)); + } + } - for (size_t iter = 0; iter < iters; ++iter) { - EXPECT_EQ(0, fiberManager.fibersPoolSize()); + folly::collectAll( + innerDispatchResultFutures.begin(), innerDispatchResultFutures.end()) + .then([&]( + std::vector>> innerDispatchResults) { + for (auto& unit : innerDispatchResults) { + for (auto& element : unit.value()) { + results.push_back(element); + } + } + }) + .get(); + return results; + }); - size_t fibersRun = 0; + auto indexCopy = index; + auto result = batchDispatcher.add(std::move(indexCopy)); + EXPECT_EQ(folly::to(index), result.get()); +} - for (size_t i = 0; i < kNumAllocations; ++i) { - fiberManager.addTask([&fibersRun] { ++fibersRun; }); - fiberManager.loopUntilNoReady(); +TEST(FiberManager, doubleBatchDispatchTest) { + folly::EventBase evb; + auto& executor = getFiberManager(evb); + + // Launch multiple fibers with a single id. + executor.add([&]() { + int totalNumberOfElements = 20; + for (int i = 0; i < totalNumberOfElements; i++) { + executor.add([=, &executor]() { + doubleBatchOuterDispatch(executor, totalNumberOfElements, i); + }); } - - EXPECT_EQ(10000, fibersRun); - EXPECT_EQ(0, fiberManager.fibersPoolSize()); - } + }); + evb.loop(); } -BENCHMARK(FiberManagerAllocateLargeChunk, iters) { - static const size_t kNumAllocations = 10000; - - FiberManager::Options opts; - opts.maxFibersPoolSize = 0; +template +void batchDispatchExceptionHandling(ExecutorT& executor, int i) { + thread_local BatchDispatcher batchDispatcher( + executor, [=, &executor](std::vector &&) -> std::vector { + throw std::runtime_error("Surprise!!"); + }); - FiberManager fiberManager(folly::make_unique(), opts); + EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error); +} - for (size_t iter = 0; iter < iters; ++iter) { - EXPECT_EQ(0, fiberManager.fibersPoolSize()); +TEST(FiberManager, batchDispatchExceptionHandlingTest) { + folly::EventBase evb; + auto& executor = getFiberManager(evb); + + // Launch multiple fibers with a single id. + executor.add([&]() { + int totalNumberOfElements = 5; + for (int i = 0; i < totalNumberOfElements; i++) { + executor.add( + [=, &executor]() { batchDispatchExceptionHandling(executor, i); }); + } + }); + evb.loop(); +} - size_t fibersRun = 0; +/** + * Test that we can properly track fiber stack usage. + * + * This functionality can only be enabled when ASAN is disabled, so avoid + * running this test with ASAN. + */ +#ifndef FOLLY_SANITIZE_ADDRESS +TEST(FiberManager, recordStack) { + std::thread([] { + folly::fibers::FiberManager::Options opts; + opts.recordStackEvery = 1; + + FiberManager fm(folly::make_unique(), opts); + auto& loopController = + dynamic_cast(fm.loopController()); + + constexpr size_t n = 1000; + int s = 0; + fm.addTask([&]() { + int b[n] = {0}; + for (size_t i = 0; i < n; ++i) { + b[i] = i; + } + for (size_t i = 0; i + 1 < n; ++i) { + s += b[i] * b[i + 1]; + } + }); - for (size_t i = 0; i < kNumAllocations; ++i) { - fiberManager.addTask([&fibersRun] { ++fibersRun; }); - } + (void)s; - fiberManager.loopUntilNoReady(); + loopController.loop([&]() { loopController.stop(); }); - EXPECT_EQ(10000, fibersRun); - EXPECT_EQ(0, fiberManager.fibersPoolSize()); - } + // Check that we properly accounted fiber stack usage. + EXPECT_LT(n * sizeof(int), fm.stackHighWatermark()); + }).join(); } +#endif