BatchDispatcher exception handling
[folly.git] / folly / fibers / test / FibersTest.cpp
index dcdd5d790f6cd2ee08680bdde2cf9e6cfd2adcaa..eeee0b2e9ed8e38778ebce07a7fc8a25925984be 100644 (file)
 #include <thread>
 #include <vector>
 
-#include <gtest/gtest.h>
-
 #include <folly/Memory.h>
 #include <folly/futures/Future.h>
 
+#include <folly/Conv.h>
 #include <folly/fibers/AddTasks.h>
+#include <folly/fibers/BatchDispatcher.h>
 #include <folly/fibers/EventBaseLoopController.h>
 #include <folly/fibers/FiberManager.h>
 #include <folly/fibers/FiberManagerMap.h>
 #include <folly/fibers/GenericBaton.h>
+#include <folly/fibers/Semaphore.h>
 #include <folly/fibers/SimpleLoopController.h>
 #include <folly/fibers/WhenN.h>
+#include <folly/portability/GTest.h>
 
 using namespace folly::fibers;
 
@@ -1539,3 +1541,252 @@ TEST(FiberManager, nestedFiberManagers) {
 
   outerEvb.loopForever();
 }
+
+TEST(FiberManager, semaphore) {
+  constexpr size_t kTasks = 10;
+  constexpr size_t kIterations = 10000;
+  constexpr size_t kNumTokens = 10;
+
+  Semaphore sem(kNumTokens);
+  int counterA = 0;
+  int counterB = 0;
+
+  auto task = [&sem, kTasks, kIterations, kNumTokens](
+      int& counter, folly::fibers::Baton& baton) {
+    FiberManager manager(folly::make_unique<EventBaseLoopController>());
+    folly::EventBase evb;
+    dynamic_cast<EventBaseLoopController&>(manager.loopController())
+        .attachEventBase(evb);
+
+    {
+      std::shared_ptr<folly::EventBase> completionCounter(
+          &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
+
+      for (size_t i = 0; i < kTasks; ++i) {
+        manager.addTask([&, completionCounter]() {
+          for (size_t i = 0; i < kIterations; ++i) {
+            sem.wait();
+            ++counter;
+            sem.signal();
+            --counter;
+
+            EXPECT_LT(counter, kNumTokens);
+            EXPECT_GE(counter, 0);
+          }
+        });
+      }
+
+      baton.wait();
+    }
+    evb.loopForever();
+  };
+
+  folly::fibers::Baton batonA;
+  folly::fibers::Baton batonB;
+  std::thread threadA([&] { task(counterA, batonA); });
+  std::thread threadB([&] { task(counterB, batonB); });
+
+  batonA.post();
+  batonB.post();
+  threadA.join();
+  threadB.join();
+
+  EXPECT_LT(counterA, kNumTokens);
+  EXPECT_LT(counterB, kNumTokens);
+  EXPECT_GE(counterA, 0);
+  EXPECT_GE(counterB, 0);
+}
+
+template <typename ExecutorT>
+void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
+  thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
+      executor, [=](std::vector<int>&& batch) {
+        EXPECT_EQ(batchSize, batch.size());
+        std::vector<std::string> results;
+        for (auto& it : batch) {
+          results.push_back(folly::to<std::string>(it));
+        }
+        return results;
+      });
+
+  auto indexCopy = index;
+  auto result = batchDispatcher.add(std::move(indexCopy));
+  EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
+
+TEST(FiberManager, batchDispatchTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int batchSize = 10;
+    for (int i = 0; i < batchSize; i++) {
+      executor.add(
+          [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+    }
+  });
+  evb.loop();
+
+  // Reuse the same BatchDispatcher to batch once again.
+  executor.add([&]() {
+    int batchSize = 10;
+    for (int i = 0; i < batchSize; i++) {
+      executor.add(
+          [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+    }
+  });
+  evb.loop();
+}
+
+template <typename ExecutorT>
+folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
+    ExecutorT& executor,
+    int totalNumberOfElements,
+    std::vector<int> input) {
+  thread_local BatchDispatcher<
+      std::vector<int>,
+      std::vector<std::string>,
+      ExecutorT>
+  batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
+    std::vector<std::vector<std::string>> results;
+    int numberOfElements = 0;
+    for (auto& unit : batch) {
+      numberOfElements += unit.size();
+      std::vector<std::string> result;
+      for (auto& element : unit) {
+        result.push_back(folly::to<std::string>(element));
+      }
+      results.push_back(std::move(result));
+    }
+    EXPECT_EQ(totalNumberOfElements, numberOfElements);
+    return results;
+  });
+
+  return batchDispatcher.add(std::move(input));
+}
+
+/**
+ * Batch values in groups of 5, and then call inner dispatch.
+ */
+template <typename ExecutorT>
+void doubleBatchOuterDispatch(
+    ExecutorT& executor,
+    int totalNumberOfElements,
+    int index) {
+  thread_local BatchDispatcher<int, std::string, ExecutorT>
+  batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
+    EXPECT_EQ(totalNumberOfElements, batch.size());
+    std::vector<std::string> results;
+    std::vector<folly::Future<std::vector<std::string>>>
+        innerDispatchResultFutures;
+
+    std::vector<int> group;
+    for (auto unit : batch) {
+      group.push_back(unit);
+      if (group.size() == 5) {
+        auto localGroup = group;
+        group.clear();
+
+        innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
+            executor, totalNumberOfElements, localGroup));
+      }
+    }
+
+    folly::collectAll(
+        innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
+        .then([&](
+            std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
+          for (auto& unit : innerDispatchResults) {
+            for (auto& element : unit.value()) {
+              results.push_back(element);
+            }
+          }
+        })
+        .get();
+    return results;
+  });
+
+  auto indexCopy = index;
+  auto result = batchDispatcher.add(std::move(indexCopy));
+  EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
+
+TEST(FiberManager, doubleBatchDispatchTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int totalNumberOfElements = 20;
+    for (int i = 0; i < totalNumberOfElements; i++) {
+      executor.add([=, &executor]() {
+        doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
+      });
+    }
+  });
+  evb.loop();
+}
+
+template <typename ExecutorT>
+void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
+  thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
+      executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
+        throw std::runtime_error("Surprise!!");
+      });
+
+  EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
+}
+
+TEST(FiberManager, batchDispatchExceptionHandlingTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int totalNumberOfElements = 5;
+    for (int i = 0; i < totalNumberOfElements; i++) {
+      executor.add(
+          [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
+    }
+  });
+  evb.loop();
+}
+
+/**
+ * Test that we can properly track fiber stack usage.
+ *
+ * This functionality can only be enabled when ASAN is disabled, so avoid
+ * running this test with ASAN.
+ */
+#ifndef FOLLY_SANITIZE_ADDRESS
+TEST(FiberManager, recordStack) {
+  std::thread([] {
+    folly::fibers::FiberManager::Options opts;
+    opts.recordStackEvery = 1;
+
+    FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
+    auto& loopController =
+        dynamic_cast<SimpleLoopController&>(fm.loopController());
+
+    constexpr size_t n = 1000;
+    int s = 0;
+    fm.addTask([&]() {
+      int b[n] = {0};
+      for (size_t i = 0; i < n; ++i) {
+        b[i] = i;
+      }
+      for (size_t i = 0; i + 1 < n; ++i) {
+        s += b[i] * b[i + 1];
+      }
+    });
+
+    (void)s;
+
+    loopController.loop([&]() { loopController.stop(); });
+
+    // Check that we properly accounted fiber stack usage.
+    EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
+  }).join();
+}
+#endif