#include <thread>
#include <vector>
-#include <gtest/gtest.h>
-
#include <folly/Memory.h>
#include <folly/futures/Future.h>
+#include <folly/Conv.h>
#include <folly/fibers/AddTasks.h>
+#include <folly/fibers/BatchDispatcher.h>
#include <folly/fibers/EventBaseLoopController.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerMap.h>
#include <folly/fibers/GenericBaton.h>
+#include <folly/fibers/Semaphore.h>
#include <folly/fibers/SimpleLoopController.h>
#include <folly/fibers/WhenN.h>
+#include <folly/portability/GTest.h>
using namespace folly::fibers;
outerEvb.loopForever();
}
+
+TEST(FiberManager, semaphore) {
+ constexpr size_t kTasks = 10;
+ constexpr size_t kIterations = 10000;
+ constexpr size_t kNumTokens = 10;
+
+ Semaphore sem(kNumTokens);
+ int counterA = 0;
+ int counterB = 0;
+
+ auto task = [&sem, kTasks, kIterations, kNumTokens](
+ int& counter, folly::fibers::Baton& baton) {
+ FiberManager manager(folly::make_unique<EventBaseLoopController>());
+ folly::EventBase evb;
+ dynamic_cast<EventBaseLoopController&>(manager.loopController())
+ .attachEventBase(evb);
+
+ {
+ std::shared_ptr<folly::EventBase> completionCounter(
+ &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
+
+ for (size_t i = 0; i < kTasks; ++i) {
+ manager.addTask([&, completionCounter]() {
+ for (size_t i = 0; i < kIterations; ++i) {
+ sem.wait();
+ ++counter;
+ sem.signal();
+ --counter;
+
+ EXPECT_LT(counter, kNumTokens);
+ EXPECT_GE(counter, 0);
+ }
+ });
+ }
+
+ baton.wait();
+ }
+ evb.loopForever();
+ };
+
+ folly::fibers::Baton batonA;
+ folly::fibers::Baton batonB;
+ std::thread threadA([&] { task(counterA, batonA); });
+ std::thread threadB([&] { task(counterB, batonB); });
+
+ batonA.post();
+ batonB.post();
+ threadA.join();
+ threadB.join();
+
+ EXPECT_LT(counterA, kNumTokens);
+ EXPECT_LT(counterB, kNumTokens);
+ EXPECT_GE(counterA, 0);
+ EXPECT_GE(counterB, 0);
+}
+
+template <typename ExecutorT>
+void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
+ thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
+ executor, [=](std::vector<int>&& batch) {
+ EXPECT_EQ(batchSize, batch.size());
+ std::vector<std::string> results;
+ for (auto& it : batch) {
+ results.push_back(folly::to<std::string>(it));
+ }
+ return results;
+ });
+
+ auto indexCopy = index;
+ auto result = batchDispatcher.add(std::move(indexCopy));
+ EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
+
+TEST(FiberManager, batchDispatchTest) {
+ folly::EventBase evb;
+ auto& executor = getFiberManager(evb);
+
+ // Launch multiple fibers with a single id.
+ executor.add([&]() {
+ int batchSize = 10;
+ for (int i = 0; i < batchSize; i++) {
+ executor.add(
+ [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+ }
+ });
+ evb.loop();
+
+ // Reuse the same BatchDispatcher to batch once again.
+ executor.add([&]() {
+ int batchSize = 10;
+ for (int i = 0; i < batchSize; i++) {
+ executor.add(
+ [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+ }
+ });
+ evb.loop();
+}
+
+template <typename ExecutorT>
+folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
+ ExecutorT& executor,
+ int totalNumberOfElements,
+ std::vector<int> input) {
+ thread_local BatchDispatcher<
+ std::vector<int>,
+ std::vector<std::string>,
+ ExecutorT>
+ batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
+ std::vector<std::vector<std::string>> results;
+ int numberOfElements = 0;
+ for (auto& unit : batch) {
+ numberOfElements += unit.size();
+ std::vector<std::string> result;
+ for (auto& element : unit) {
+ result.push_back(folly::to<std::string>(element));
+ }
+ results.push_back(std::move(result));
+ }
+ EXPECT_EQ(totalNumberOfElements, numberOfElements);
+ return results;
+ });
+
+ return batchDispatcher.add(std::move(input));
+}
+
+/**
+ * Batch values in groups of 5, and then call inner dispatch.
+ */
+template <typename ExecutorT>
+void doubleBatchOuterDispatch(
+ ExecutorT& executor,
+ int totalNumberOfElements,
+ int index) {
+ thread_local BatchDispatcher<int, std::string, ExecutorT>
+ batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
+ EXPECT_EQ(totalNumberOfElements, batch.size());
+ std::vector<std::string> results;
+ std::vector<folly::Future<std::vector<std::string>>>
+ innerDispatchResultFutures;
+
+ std::vector<int> group;
+ for (auto unit : batch) {
+ group.push_back(unit);
+ if (group.size() == 5) {
+ auto localGroup = group;
+ group.clear();
+
+ innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
+ executor, totalNumberOfElements, localGroup));
+ }
+ }
+
+ folly::collectAll(
+ innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
+ .then([&](
+ std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
+ for (auto& unit : innerDispatchResults) {
+ for (auto& element : unit.value()) {
+ results.push_back(element);
+ }
+ }
+ })
+ .get();
+ return results;
+ });
+
+ auto indexCopy = index;
+ auto result = batchDispatcher.add(std::move(indexCopy));
+ EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
+
+TEST(FiberManager, doubleBatchDispatchTest) {
+ folly::EventBase evb;
+ auto& executor = getFiberManager(evb);
+
+ // Launch multiple fibers with a single id.
+ executor.add([&]() {
+ int totalNumberOfElements = 20;
+ for (int i = 0; i < totalNumberOfElements; i++) {
+ executor.add([=, &executor]() {
+ doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
+ });
+ }
+ });
+ evb.loop();
+}
+
+template <typename ExecutorT>
+void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
+ thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
+ executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
+ throw std::runtime_error("Surprise!!");
+ });
+
+ EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
+}
+
+TEST(FiberManager, batchDispatchExceptionHandlingTest) {
+ folly::EventBase evb;
+ auto& executor = getFiberManager(evb);
+
+ // Launch multiple fibers with a single id.
+ executor.add([&]() {
+ int totalNumberOfElements = 5;
+ for (int i = 0; i < totalNumberOfElements; i++) {
+ executor.add(
+ [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
+ }
+ });
+ evb.loop();
+}
+
+/**
+ * Test that we can properly track fiber stack usage.
+ *
+ * This functionality can only be enabled when ASAN is disabled, so avoid
+ * running this test with ASAN.
+ */
+#ifndef FOLLY_SANITIZE_ADDRESS
+TEST(FiberManager, recordStack) {
+ std::thread([] {
+ folly::fibers::FiberManager::Options opts;
+ opts.recordStackEvery = 1;
+
+ FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
+ auto& loopController =
+ dynamic_cast<SimpleLoopController&>(fm.loopController());
+
+ constexpr size_t n = 1000;
+ int s = 0;
+ fm.addTask([&]() {
+ int b[n] = {0};
+ for (size_t i = 0; i < n; ++i) {
+ b[i] = i;
+ }
+ for (size_t i = 0; i + 1 < n; ++i) {
+ s += b[i] * b[i + 1];
+ }
+ });
+
+ (void)s;
+
+ loopController.loop([&]() { loopController.stop(); });
+
+ // Check that we properly accounted fiber stack usage.
+ EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
+ }).join();
+}
+#endif