BatchDispatcher exception handling
[folly.git] / folly / fibers / test / FibersTest.cpp
index e962145aeaa5d24970ce4db72510257aba0693bf..eeee0b2e9ed8e38778ebce07a7fc8a25925984be 100644 (file)
 #include <thread>
 #include <vector>
 
-#include <gtest/gtest.h>
-
-#include <folly/Benchmark.h>
 #include <folly/Memory.h>
 #include <folly/futures/Future.h>
 
+#include <folly/Conv.h>
 #include <folly/fibers/AddTasks.h>
+#include <folly/fibers/BatchDispatcher.h>
 #include <folly/fibers/EventBaseLoopController.h>
 #include <folly/fibers/FiberManager.h>
 #include <folly/fibers/FiberManagerMap.h>
 #include <folly/fibers/GenericBaton.h>
+#include <folly/fibers/Semaphore.h>
 #include <folly/fibers/SimpleLoopController.h>
 #include <folly/fibers/WhenN.h>
+#include <folly/portability/GTest.h>
 
 using namespace folly::fibers;
 
@@ -1541,104 +1542,251 @@ TEST(FiberManager, nestedFiberManagers) {
   outerEvb.loopForever();
 }
 
-static size_t sNumAwaits;
-
-void runBenchmark(size_t numAwaits, size_t toSend) {
-  sNumAwaits = numAwaits;
-
-  FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
-  auto& loopController =
-      dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
+TEST(FiberManager, semaphore) {
+  constexpr size_t kTasks = 10;
+  constexpr size_t kIterations = 10000;
+  constexpr size_t kNumTokens = 10;
 
-  std::queue<Promise<int>> pendingRequests;
-  static const size_t maxOutstanding = 5;
+  Semaphore sem(kNumTokens);
+  int counterA = 0;
+  int counterB = 0;
 
-  auto loop = [&fiberManager, &loopController, &pendingRequests, &toSend]() {
-    if (pendingRequests.size() == maxOutstanding || toSend == 0) {
-      if (pendingRequests.empty()) {
-        return;
+  auto task = [&sem, kTasks, kIterations, kNumTokens](
+      int& counter, folly::fibers::Baton& baton) {
+    FiberManager manager(folly::make_unique<EventBaseLoopController>());
+    folly::EventBase evb;
+    dynamic_cast<EventBaseLoopController&>(manager.loopController())
+        .attachEventBase(evb);
+
+    {
+      std::shared_ptr<folly::EventBase> completionCounter(
+          &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
+
+      for (size_t i = 0; i < kTasks; ++i) {
+        manager.addTask([&, completionCounter]() {
+          for (size_t i = 0; i < kIterations; ++i) {
+            sem.wait();
+            ++counter;
+            sem.signal();
+            --counter;
+
+            EXPECT_LT(counter, kNumTokens);
+            EXPECT_GE(counter, 0);
+          }
+        });
       }
-      pendingRequests.front().setValue(0);
-      pendingRequests.pop();
-    } else {
-      fiberManager.addTask([&pendingRequests]() {
-        for (size_t i = 0; i < sNumAwaits; ++i) {
-          auto result = await([&pendingRequests](Promise<int> promise) {
-            pendingRequests.push(std::move(promise));
-          });
-          DCHECK_EQ(result, 0);
-        }
-      });
 
-      if (--toSend == 0) {
-        loopController.stop();
-      }
+      baton.wait();
     }
+    evb.loopForever();
   };
 
-  loopController.loop(std::move(loop));
-}
+  folly::fibers::Baton batonA;
+  folly::fibers::Baton batonB;
+  std::thread threadA([&] { task(counterA, batonA); });
+  std::thread threadB([&] { task(counterB, batonB); });
+
+  batonA.post();
+  batonB.post();
+  threadA.join();
+  threadB.join();
 
-BENCHMARK(FiberManagerBasicOneAwait, iters) {
-  runBenchmark(1, iters);
+  EXPECT_LT(counterA, kNumTokens);
+  EXPECT_LT(counterB, kNumTokens);
+  EXPECT_GE(counterA, 0);
+  EXPECT_GE(counterB, 0);
 }
 
-BENCHMARK(FiberManagerBasicFiveAwaits, iters) {
-  runBenchmark(5, iters);
+template <typename ExecutorT>
+void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
+  thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
+      executor, [=](std::vector<int>&& batch) {
+        EXPECT_EQ(batchSize, batch.size());
+        std::vector<std::string> results;
+        for (auto& it : batch) {
+          results.push_back(folly::to<std::string>(it));
+        }
+        return results;
+      });
+
+  auto indexCopy = index;
+  auto result = batchDispatcher.add(std::move(indexCopy));
+  EXPECT_EQ(folly::to<std::string>(index), result.get());
 }
 
-BENCHMARK(FiberManagerCreateDestroy, iters) {
-  for (size_t i = 0; i < iters; ++i) {
-    folly::EventBase evb;
-    auto& fm = folly::fibers::getFiberManager(evb);
-    fm.addTask([]() {});
-    evb.loop();
-  }
+TEST(FiberManager, batchDispatchTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int batchSize = 10;
+    for (int i = 0; i < batchSize; i++) {
+      executor.add(
+          [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+    }
+  });
+  evb.loop();
+
+  // Reuse the same BatchDispatcher to batch once again.
+  executor.add([&]() {
+    int batchSize = 10;
+    for (int i = 0; i < batchSize; i++) {
+      executor.add(
+          [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+    }
+  });
+  evb.loop();
 }
 
-BENCHMARK(FiberManagerAllocateDeallocatePattern, iters) {
-  static const size_t kNumAllocations = 10000;
+template <typename ExecutorT>
+folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
+    ExecutorT& executor,
+    int totalNumberOfElements,
+    std::vector<int> input) {
+  thread_local BatchDispatcher<
+      std::vector<int>,
+      std::vector<std::string>,
+      ExecutorT>
+  batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
+    std::vector<std::vector<std::string>> results;
+    int numberOfElements = 0;
+    for (auto& unit : batch) {
+      numberOfElements += unit.size();
+      std::vector<std::string> result;
+      for (auto& element : unit) {
+        result.push_back(folly::to<std::string>(element));
+      }
+      results.push_back(std::move(result));
+    }
+    EXPECT_EQ(totalNumberOfElements, numberOfElements);
+    return results;
+  });
 
-  FiberManager::Options opts;
-  opts.maxFibersPoolSize = 0;
+  return batchDispatcher.add(std::move(input));
+}
 
-  FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
+/**
+ * Batch values in groups of 5, and then call inner dispatch.
+ */
+template <typename ExecutorT>
+void doubleBatchOuterDispatch(
+    ExecutorT& executor,
+    int totalNumberOfElements,
+    int index) {
+  thread_local BatchDispatcher<int, std::string, ExecutorT>
+  batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
+    EXPECT_EQ(totalNumberOfElements, batch.size());
+    std::vector<std::string> results;
+    std::vector<folly::Future<std::vector<std::string>>>
+        innerDispatchResultFutures;
+
+    std::vector<int> group;
+    for (auto unit : batch) {
+      group.push_back(unit);
+      if (group.size() == 5) {
+        auto localGroup = group;
+        group.clear();
+
+        innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
+            executor, totalNumberOfElements, localGroup));
+      }
+    }
 
-  for (size_t iter = 0; iter < iters; ++iter) {
-    EXPECT_EQ(0, fiberManager.fibersPoolSize());
+    folly::collectAll(
+        innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
+        .then([&](
+            std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
+          for (auto& unit : innerDispatchResults) {
+            for (auto& element : unit.value()) {
+              results.push_back(element);
+            }
+          }
+        })
+        .get();
+    return results;
+  });
 
-    size_t fibersRun = 0;
+  auto indexCopy = index;
+  auto result = batchDispatcher.add(std::move(indexCopy));
+  EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
 
-    for (size_t i = 0; i < kNumAllocations; ++i) {
-      fiberManager.addTask([&fibersRun] { ++fibersRun; });
-      fiberManager.loopUntilNoReady();
+TEST(FiberManager, doubleBatchDispatchTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int totalNumberOfElements = 20;
+    for (int i = 0; i < totalNumberOfElements; i++) {
+      executor.add([=, &executor]() {
+        doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
+      });
     }
-
-    EXPECT_EQ(10000, fibersRun);
-    EXPECT_EQ(0, fiberManager.fibersPoolSize());
-  }
+  });
+  evb.loop();
 }
 
-BENCHMARK(FiberManagerAllocateLargeChunk, iters) {
-  static const size_t kNumAllocations = 10000;
-
-  FiberManager::Options opts;
-  opts.maxFibersPoolSize = 0;
+template <typename ExecutorT>
+void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
+  thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
+      executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
+        throw std::runtime_error("Surprise!!");
+      });
 
-  FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
+  EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
+}
 
-  for (size_t iter = 0; iter < iters; ++iter) {
-    EXPECT_EQ(0, fiberManager.fibersPoolSize());
+TEST(FiberManager, batchDispatchExceptionHandlingTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int totalNumberOfElements = 5;
+    for (int i = 0; i < totalNumberOfElements; i++) {
+      executor.add(
+          [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
+    }
+  });
+  evb.loop();
+}
 
-    size_t fibersRun = 0;
+/**
+ * Test that we can properly track fiber stack usage.
+ *
+ * This functionality can only be enabled when ASAN is disabled, so avoid
+ * running this test with ASAN.
+ */
+#ifndef FOLLY_SANITIZE_ADDRESS
+TEST(FiberManager, recordStack) {
+  std::thread([] {
+    folly::fibers::FiberManager::Options opts;
+    opts.recordStackEvery = 1;
+
+    FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
+    auto& loopController =
+        dynamic_cast<SimpleLoopController&>(fm.loopController());
+
+    constexpr size_t n = 1000;
+    int s = 0;
+    fm.addTask([&]() {
+      int b[n] = {0};
+      for (size_t i = 0; i < n; ++i) {
+        b[i] = i;
+      }
+      for (size_t i = 0; i + 1 < n; ++i) {
+        s += b[i] * b[i + 1];
+      }
+    });
 
-    for (size_t i = 0; i < kNumAllocations; ++i) {
-      fiberManager.addTask([&fibersRun] { ++fibersRun; });
-    }
+    (void)s;
 
-    fiberManager.loopUntilNoReady();
+    loopController.loop([&]() { loopController.stop(); });
 
-    EXPECT_EQ(10000, fibersRun);
-    EXPECT_EQ(0, fiberManager.fibersPoolSize());
-  }
+    // Check that we properly accounted fiber stack usage.
+    EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
+  }).join();
 }
+#endif