#include <thread>
#include <vector>
-#include <gtest/gtest.h>
-
-#include <folly/Benchmark.h>
#include <folly/Memory.h>
#include <folly/futures/Future.h>
+#include <folly/Conv.h>
#include <folly/fibers/AddTasks.h>
+#include <folly/fibers/BatchDispatcher.h>
#include <folly/fibers/EventBaseLoopController.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerMap.h>
#include <folly/fibers/GenericBaton.h>
+#include <folly/fibers/Semaphore.h>
#include <folly/fibers/SimpleLoopController.h>
#include <folly/fibers/WhenN.h>
+#include <folly/portability/GTest.h>
using namespace folly::fibers;
outerEvb.loopForever();
}
-static size_t sNumAwaits;
-
-void runBenchmark(size_t numAwaits, size_t toSend) {
- sNumAwaits = numAwaits;
-
- FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
- auto& loopController =
- dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
+TEST(FiberManager, semaphore) {
+ constexpr size_t kTasks = 10;
+ constexpr size_t kIterations = 10000;
+ constexpr size_t kNumTokens = 10;
- std::queue<Promise<int>> pendingRequests;
- static const size_t maxOutstanding = 5;
+ Semaphore sem(kNumTokens);
+ int counterA = 0;
+ int counterB = 0;
- auto loop = [&fiberManager, &loopController, &pendingRequests, &toSend]() {
- if (pendingRequests.size() == maxOutstanding || toSend == 0) {
- if (pendingRequests.empty()) {
- return;
+ auto task = [&sem, kTasks, kIterations, kNumTokens](
+ int& counter, folly::fibers::Baton& baton) {
+ FiberManager manager(folly::make_unique<EventBaseLoopController>());
+ folly::EventBase evb;
+ dynamic_cast<EventBaseLoopController&>(manager.loopController())
+ .attachEventBase(evb);
+
+ {
+ std::shared_ptr<folly::EventBase> completionCounter(
+ &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
+
+ for (size_t i = 0; i < kTasks; ++i) {
+ manager.addTask([&, completionCounter]() {
+ for (size_t i = 0; i < kIterations; ++i) {
+ sem.wait();
+ ++counter;
+ sem.signal();
+ --counter;
+
+ EXPECT_LT(counter, kNumTokens);
+ EXPECT_GE(counter, 0);
+ }
+ });
}
- pendingRequests.front().setValue(0);
- pendingRequests.pop();
- } else {
- fiberManager.addTask([&pendingRequests]() {
- for (size_t i = 0; i < sNumAwaits; ++i) {
- auto result = await([&pendingRequests](Promise<int> promise) {
- pendingRequests.push(std::move(promise));
- });
- DCHECK_EQ(result, 0);
- }
- });
- if (--toSend == 0) {
- loopController.stop();
- }
+ baton.wait();
}
+ evb.loopForever();
};
- loopController.loop(std::move(loop));
-}
+ folly::fibers::Baton batonA;
+ folly::fibers::Baton batonB;
+ std::thread threadA([&] { task(counterA, batonA); });
+ std::thread threadB([&] { task(counterB, batonB); });
+
+ batonA.post();
+ batonB.post();
+ threadA.join();
+ threadB.join();
-BENCHMARK(FiberManagerBasicOneAwait, iters) {
- runBenchmark(1, iters);
+ EXPECT_LT(counterA, kNumTokens);
+ EXPECT_LT(counterB, kNumTokens);
+ EXPECT_GE(counterA, 0);
+ EXPECT_GE(counterB, 0);
}
-BENCHMARK(FiberManagerBasicFiveAwaits, iters) {
- runBenchmark(5, iters);
+template <typename ExecutorT>
+void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
+ thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
+ executor, [=](std::vector<int>&& batch) {
+ EXPECT_EQ(batchSize, batch.size());
+ std::vector<std::string> results;
+ for (auto& it : batch) {
+ results.push_back(folly::to<std::string>(it));
+ }
+ return results;
+ });
+
+ auto indexCopy = index;
+ auto result = batchDispatcher.add(std::move(indexCopy));
+ EXPECT_EQ(folly::to<std::string>(index), result.get());
}
-BENCHMARK(FiberManagerCreateDestroy, iters) {
- for (size_t i = 0; i < iters; ++i) {
- folly::EventBase evb;
- auto& fm = folly::fibers::getFiberManager(evb);
- fm.addTask([]() {});
- evb.loop();
- }
+TEST(FiberManager, batchDispatchTest) {
+ folly::EventBase evb;
+ auto& executor = getFiberManager(evb);
+
+ // Launch multiple fibers with a single id.
+ executor.add([&]() {
+ int batchSize = 10;
+ for (int i = 0; i < batchSize; i++) {
+ executor.add(
+ [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+ }
+ });
+ evb.loop();
+
+ // Reuse the same BatchDispatcher to batch once again.
+ executor.add([&]() {
+ int batchSize = 10;
+ for (int i = 0; i < batchSize; i++) {
+ executor.add(
+ [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+ }
+ });
+ evb.loop();
}
-BENCHMARK(FiberManagerAllocateDeallocatePattern, iters) {
- static const size_t kNumAllocations = 10000;
+template <typename ExecutorT>
+folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
+ ExecutorT& executor,
+ int totalNumberOfElements,
+ std::vector<int> input) {
+ thread_local BatchDispatcher<
+ std::vector<int>,
+ std::vector<std::string>,
+ ExecutorT>
+ batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
+ std::vector<std::vector<std::string>> results;
+ int numberOfElements = 0;
+ for (auto& unit : batch) {
+ numberOfElements += unit.size();
+ std::vector<std::string> result;
+ for (auto& element : unit) {
+ result.push_back(folly::to<std::string>(element));
+ }
+ results.push_back(std::move(result));
+ }
+ EXPECT_EQ(totalNumberOfElements, numberOfElements);
+ return results;
+ });
- FiberManager::Options opts;
- opts.maxFibersPoolSize = 0;
+ return batchDispatcher.add(std::move(input));
+}
- FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
+/**
+ * Batch values in groups of 5, and then call inner dispatch.
+ */
+template <typename ExecutorT>
+void doubleBatchOuterDispatch(
+ ExecutorT& executor,
+ int totalNumberOfElements,
+ int index) {
+ thread_local BatchDispatcher<int, std::string, ExecutorT>
+ batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
+ EXPECT_EQ(totalNumberOfElements, batch.size());
+ std::vector<std::string> results;
+ std::vector<folly::Future<std::vector<std::string>>>
+ innerDispatchResultFutures;
+
+ std::vector<int> group;
+ for (auto unit : batch) {
+ group.push_back(unit);
+ if (group.size() == 5) {
+ auto localGroup = group;
+ group.clear();
+
+ innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
+ executor, totalNumberOfElements, localGroup));
+ }
+ }
- for (size_t iter = 0; iter < iters; ++iter) {
- EXPECT_EQ(0, fiberManager.fibersPoolSize());
+ folly::collectAll(
+ innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
+ .then([&](
+ std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
+ for (auto& unit : innerDispatchResults) {
+ for (auto& element : unit.value()) {
+ results.push_back(element);
+ }
+ }
+ })
+ .get();
+ return results;
+ });
- size_t fibersRun = 0;
+ auto indexCopy = index;
+ auto result = batchDispatcher.add(std::move(indexCopy));
+ EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
- for (size_t i = 0; i < kNumAllocations; ++i) {
- fiberManager.addTask([&fibersRun] { ++fibersRun; });
- fiberManager.loopUntilNoReady();
+TEST(FiberManager, doubleBatchDispatchTest) {
+ folly::EventBase evb;
+ auto& executor = getFiberManager(evb);
+
+ // Launch multiple fibers with a single id.
+ executor.add([&]() {
+ int totalNumberOfElements = 20;
+ for (int i = 0; i < totalNumberOfElements; i++) {
+ executor.add([=, &executor]() {
+ doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
+ });
}
-
- EXPECT_EQ(10000, fibersRun);
- EXPECT_EQ(0, fiberManager.fibersPoolSize());
- }
+ });
+ evb.loop();
}
-BENCHMARK(FiberManagerAllocateLargeChunk, iters) {
- static const size_t kNumAllocations = 10000;
-
- FiberManager::Options opts;
- opts.maxFibersPoolSize = 0;
+template <typename ExecutorT>
+void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
+ thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
+ executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
+ throw std::runtime_error("Surprise!!");
+ });
- FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
+ EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
+}
- for (size_t iter = 0; iter < iters; ++iter) {
- EXPECT_EQ(0, fiberManager.fibersPoolSize());
+TEST(FiberManager, batchDispatchExceptionHandlingTest) {
+ folly::EventBase evb;
+ auto& executor = getFiberManager(evb);
+
+ // Launch multiple fibers with a single id.
+ executor.add([&]() {
+ int totalNumberOfElements = 5;
+ for (int i = 0; i < totalNumberOfElements; i++) {
+ executor.add(
+ [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
+ }
+ });
+ evb.loop();
+}
- size_t fibersRun = 0;
+/**
+ * Test that we can properly track fiber stack usage.
+ *
+ * This functionality can only be enabled when ASAN is disabled, so avoid
+ * running this test with ASAN.
+ */
+#ifndef FOLLY_SANITIZE_ADDRESS
+TEST(FiberManager, recordStack) {
+ std::thread([] {
+ folly::fibers::FiberManager::Options opts;
+ opts.recordStackEvery = 1;
+
+ FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(fm.loopController());
+
+ constexpr size_t n = 1000;
+ int s = 0;
+ fm.addTask([&]() {
+ int b[n] = {0};
+ for (size_t i = 0; i < n; ++i) {
+ b[i] = i;
+ }
+ for (size_t i = 0; i + 1 < n; ++i) {
+ s += b[i] * b[i + 1];
+ }
+ });
- for (size_t i = 0; i < kNumAllocations; ++i) {
- fiberManager.addTask([&fibersRun] { ++fibersRun; });
- }
+ (void)s;
- fiberManager.loopUntilNoReady();
+ loopController.loop([&]() { loopController.stop(); });
- EXPECT_EQ(10000, fibersRun);
- EXPECT_EQ(0, fiberManager.fibersPoolSize());
- }
+ // Check that we properly accounted fiber stack usage.
+ EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
+ }).join();
}
+#endif