Apply clang-format to folly/fibers/
[folly.git] / folly / fibers / test / FibersTest.cpp
index e0c5424b337dd6376901bf9d3171a55c52df2964..9717ddaab7671fd7abcf5dc6d04aaa57fa96f0c0 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include <vector>
 
 #include <folly/Memory.h>
+#include <folly/Random.h>
 #include <folly/futures/Future.h>
 
+#include <folly/Conv.h>
 #include <folly/fibers/AddTasks.h>
+#include <folly/fibers/AtomicBatchDispatcher.h>
+#include <folly/fibers/BatchDispatcher.h>
 #include <folly/fibers/EventBaseLoopController.h>
 #include <folly/fibers/FiberManager.h>
 #include <folly/fibers/FiberManagerMap.h>
 #include <folly/fibers/GenericBaton.h>
 #include <folly/fibers/Semaphore.h>
 #include <folly/fibers/SimpleLoopController.h>
+#include <folly/fibers/TimedMutex.h>
 #include <folly/fibers/WhenN.h>
+#include <folly/io/async/ScopedEventBaseThread.h>
 #include <folly/portability/GTest.h>
 
 using namespace folly::fibers;
@@ -38,7 +44,7 @@ TEST(FiberManager, batonTimedWaitTimeout) {
   bool taskAdded = false;
   size_t iterations = 0;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -79,7 +85,7 @@ TEST(FiberManager, batonTimedWaitPost) {
   size_t iterations = 0;
   Baton* baton_ptr;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -114,7 +120,7 @@ TEST(FiberManager, batonTimedWaitTimeoutEvb) {
 
   folly::EventBase evb;
 
-  FiberManager manager(folly::make_unique<EventBaseLoopController>());
+  FiberManager manager(std::make_unique<EventBaseLoopController>());
   dynamic_cast<EventBaseLoopController&>(manager.loopController())
       .attachEventBase(evb);
 
@@ -153,7 +159,7 @@ TEST(FiberManager, batonTimedWaitPostEvb) {
 
   folly::EventBase evb;
 
-  FiberManager manager(folly::make_unique<EventBaseLoopController>());
+  FiberManager manager(std::make_unique<EventBaseLoopController>());
   dynamic_cast<EventBaseLoopController&>(manager.loopController())
       .attachEventBase(evb);
 
@@ -186,7 +192,7 @@ TEST(FiberManager, batonTimedWaitPostEvb) {
 }
 
 TEST(FiberManager, batonTryWait) {
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
 
   // Check if try_wait and post work as expected
   Baton b;
@@ -219,7 +225,7 @@ TEST(FiberManager, batonTryWait) {
 }
 
 TEST(FiberManager, genericBatonFiberWait) {
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
 
   GenericBaton b;
   bool fiberRunning = false;
@@ -248,7 +254,7 @@ TEST(FiberManager, genericBatonFiberWait) {
 }
 
 TEST(FiberManager, genericBatonThreadWait) {
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   GenericBaton b;
   std::atomic<bool> threadWaiting(false);
 
@@ -278,7 +284,7 @@ TEST(FiberManager, addTasksNoncopyable) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -286,12 +292,12 @@ TEST(FiberManager, addTasksNoncopyable) {
     if (!taskAdded) {
       manager.addTask([&]() {
         std::vector<std::function<std::unique_ptr<int>()>> funcs;
-        for (size_t i = 0; i < 3; ++i) {
+        for (int i = 0; i < 3; ++i) {
           funcs.push_back([i, &pendingFibers]() {
             await([&pendingFibers](Promise<int> promise) {
               pendingFibers.push_back(std::move(promise));
             });
-            return folly::make_unique<int>(i * 2 + 1);
+            return std::make_unique<int>(i * 2 + 1);
           });
         }
 
@@ -324,21 +330,20 @@ TEST(FiberManager, awaitThrow) {
   getFiberManager(evb)
       .addTaskFuture([&] {
         EXPECT_THROW(
-          await([](Promise<int> p) {
+            await([](Promise<int> p) {
               p.setValue(42);
               throw ExpectedException();
             }),
-          ExpectedException
-        );
+            ExpectedException);
 
         EXPECT_THROW(
-          await([&](Promise<int> p) {
+            await([&](Promise<int> p) {
               evb.runInEventBaseThread([p = std::move(p)]() mutable {
-                  p.setValue(42);
-                });
+                p.setValue(42);
+              });
               throw ExpectedException();
             }),
-          ExpectedException);
+            ExpectedException);
       })
       .waitVia(&evb);
 }
@@ -347,7 +352,7 @@ TEST(FiberManager, addTasksThrow) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -399,7 +404,7 @@ TEST(FiberManager, addTasksVoid) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -408,7 +413,7 @@ TEST(FiberManager, addTasksVoid) {
       manager.addTask([&]() {
         std::vector<std::function<void()>> funcs;
         for (size_t i = 0; i < 3; ++i) {
-          funcs.push_back([i, &pendingFibers]() {
+          funcs.push_back([&pendingFibers]() {
             await([&pendingFibers](Promise<int> promise) {
               pendingFibers.push_back(std::move(promise));
             });
@@ -441,7 +446,7 @@ TEST(FiberManager, addTasksVoidThrow) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -491,7 +496,7 @@ TEST(FiberManager, addTasksReserve) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -581,7 +586,7 @@ TEST(FiberManager, forEach) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -624,7 +629,7 @@ TEST(FiberManager, collectN) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -664,7 +669,7 @@ TEST(FiberManager, collectNThrow) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -673,12 +678,11 @@ TEST(FiberManager, collectNThrow) {
       manager.addTask([&]() {
         std::vector<std::function<int()>> funcs;
         for (size_t i = 0; i < 3; ++i) {
-          funcs.push_back([i, &pendingFibers]() {
+          funcs.push_back([&pendingFibers]() -> size_t {
             await([&pendingFibers](Promise<int> promise) {
               pendingFibers.push_back(std::move(promise));
             });
             throw std::runtime_error("Runtime");
-            return i * 2 + 1;
           });
         }
 
@@ -704,7 +708,7 @@ TEST(FiberManager, collectNVoid) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -713,7 +717,7 @@ TEST(FiberManager, collectNVoid) {
       manager.addTask([&]() {
         std::vector<std::function<void()>> funcs;
         for (size_t i = 0; i < 3; ++i) {
-          funcs.push_back([i, &pendingFibers]() {
+          funcs.push_back([&pendingFibers]() {
             await([&pendingFibers](Promise<int> promise) {
               pendingFibers.push_back(std::move(promise));
             });
@@ -740,7 +744,7 @@ TEST(FiberManager, collectNVoidThrow) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -749,7 +753,7 @@ TEST(FiberManager, collectNVoidThrow) {
       manager.addTask([&]() {
         std::vector<std::function<void()>> funcs;
         for (size_t i = 0; i < 3; ++i) {
-          funcs.push_back([i, &pendingFibers]() {
+          funcs.push_back([&pendingFibers]() {
             await([&pendingFibers](Promise<int> promise) {
               pendingFibers.push_back(std::move(promise));
             });
@@ -779,7 +783,7 @@ TEST(FiberManager, collectAll) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -818,7 +822,7 @@ TEST(FiberManager, collectAllVoid) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -827,7 +831,7 @@ TEST(FiberManager, collectAllVoid) {
       manager.addTask([&]() {
         std::vector<std::function<void()>> funcs;
         for (size_t i = 0; i < 3; ++i) {
-          funcs.push_back([i, &pendingFibers]() {
+          funcs.push_back([&pendingFibers]() {
             await([&pendingFibers](Promise<int> promise) {
               pendingFibers.push_back(std::move(promise));
             });
@@ -853,7 +857,7 @@ TEST(FiberManager, collectAny) {
   std::vector<Promise<int>> pendingFibers;
   bool taskAdded = false;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -913,7 +917,7 @@ void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
 }
 
 TEST(FiberManager, runInMainContext) {
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -949,7 +953,7 @@ TEST(FiberManager, runInMainContext) {
 }
 
 TEST(FiberManager, addTaskFinally) {
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -976,7 +980,7 @@ TEST(FiberManager, fibersPoolWithinLimit) {
   FiberManager::Options opts;
   opts.maxFibersPoolSize = 5;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
+  FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -1005,7 +1009,7 @@ TEST(FiberManager, fibersPoolOverLimit) {
   FiberManager::Options opts;
   opts.maxFibersPoolSize = 5;
 
-  FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
+  FiberManager manager(std::make_unique<SimpleLoopController>(), opts);
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -1027,7 +1031,7 @@ TEST(FiberManager, fibersPoolOverLimit) {
 }
 
 TEST(FiberManager, remoteFiberBasic) {
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -1065,7 +1069,7 @@ TEST(FiberManager, remoteFiberBasic) {
 }
 
 TEST(FiberManager, addTaskRemoteBasic) {
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
 
   int result[2];
   result[0] = result[1] = 0;
@@ -1106,7 +1110,7 @@ TEST(FiberManager, addTaskRemoteBasic) {
 
 TEST(FiberManager, remoteHasTasks) {
   size_t counter = 0;
-  FiberManager fm(folly::make_unique<SimpleLoopController>());
+  FiberManager fm(std::make_unique<SimpleLoopController>());
   std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
 
   remote.join();
@@ -1122,7 +1126,7 @@ TEST(FiberManager, remoteHasTasks) {
 TEST(FiberManager, remoteHasReadyTasks) {
   int result = 0;
   folly::Optional<Promise<int>> savedPromise;
-  FiberManager fm(folly::make_unique<SimpleLoopController>());
+  FiberManager fm(std::make_unique<SimpleLoopController>());
   std::thread remote([&]() {
     fm.addTaskRemote([&]() {
       result = await(
@@ -1149,8 +1153,7 @@ TEST(FiberManager, remoteHasReadyTasks) {
 
 template <typename Data>
 void testFiberLocal() {
-  FiberManager fm(
-      LocalType<Data>(), folly::make_unique<SimpleLoopController>());
+  FiberManager fm(LocalType<Data>(), std::make_unique<SimpleLoopController>());
 
   fm.addTask([]() {
     EXPECT_EQ(42, local<Data>().value);
@@ -1225,7 +1228,7 @@ TEST(FiberManager, fiberLocalDestructor) {
   };
 
   FiberManager fm(
-      LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
+      LocalType<CrazyData>(), std::make_unique<SimpleLoopController>());
 
   fm.addTask([]() { local<CrazyData>().data = 41; });
 
@@ -1234,7 +1237,7 @@ TEST(FiberManager, fiberLocalDestructor) {
 }
 
 TEST(FiberManager, yieldTest) {
-  FiberManager manager(folly::make_unique<SimpleLoopController>());
+  FiberManager manager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(manager.loopController());
 
@@ -1255,7 +1258,7 @@ TEST(FiberManager, yieldTest) {
 }
 
 TEST(FiberManager, RequestContext) {
-  FiberManager fm(folly::make_unique<SimpleLoopController>());
+  FiberManager fm(std::make_unique<SimpleLoopController>());
 
   bool checkRun1 = false;
   bool checkRun2 = false;
@@ -1353,7 +1356,7 @@ TEST(FiberManager, resizePeriodically) {
   opts.fibersPoolResizePeriodMs = 300;
   opts.maxFibersPoolSize = 5;
 
-  FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
+  FiberManager manager(std::make_unique<EventBaseLoopController>(), opts);
 
   folly::EventBase evb;
   dynamic_cast<EventBaseLoopController&>(manager.loopController())
@@ -1411,7 +1414,7 @@ TEST(FiberManager, resizePeriodically) {
 }
 
 TEST(FiberManager, batonWaitTimeoutHandler) {
-  FiberManager manager(folly::make_unique<EventBaseLoopController>());
+  FiberManager manager(std::make_unique<EventBaseLoopController>());
 
   folly::EventBase evb;
   dynamic_cast<EventBaseLoopController&>(manager.loopController())
@@ -1443,7 +1446,7 @@ TEST(FiberManager, batonWaitTimeoutHandler) {
 }
 
 TEST(FiberManager, batonWaitTimeoutMany) {
-  FiberManager manager(folly::make_unique<EventBaseLoopController>());
+  FiberManager manager(std::make_unique<EventBaseLoopController>());
 
   folly::EventBase evb;
   dynamic_cast<EventBaseLoopController&>(manager.loopController())
@@ -1473,7 +1476,7 @@ TEST(FiberManager, batonWaitTimeoutMany) {
 }
 
 TEST(FiberManager, remoteFutureTest) {
-  FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
+  FiberManager fiberManager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
 
@@ -1491,7 +1494,7 @@ TEST(FiberManager, remoteFutureTest) {
 
 // Test that a void function produes a Future<Unit>.
 TEST(FiberManager, remoteFutureVoidUnitTest) {
-  FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
+  FiberManager fiberManager(std::make_unique<SimpleLoopController>());
   auto& loopController =
       dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
 
@@ -1541,17 +1544,16 @@ TEST(FiberManager, nestedFiberManagers) {
 }
 
 TEST(FiberManager, semaphore) {
-  constexpr size_t kTasks = 10;
-  constexpr size_t kIterations = 10000;
-  constexpr size_t kNumTokens = 10;
+  static constexpr size_t kTasks = 10;
+  static constexpr size_t kIterations = 10000;
+  static constexpr size_t kNumTokens = 10;
 
   Semaphore sem(kNumTokens);
   int counterA = 0;
   int counterB = 0;
 
-  auto task = [&sem, kTasks, kIterations, kNumTokens](
-      int& counter, folly::fibers::Baton& baton) {
-    FiberManager manager(folly::make_unique<EventBaseLoopController>());
+  auto task = [&sem](int& counter, folly::fibers::Baton& baton) {
+    FiberManager manager(std::make_unique<EventBaseLoopController>());
     folly::EventBase evb;
     dynamic_cast<EventBaseLoopController&>(manager.loopController())
         .attachEventBase(evb);
@@ -1562,7 +1564,7 @@ TEST(FiberManager, semaphore) {
 
       for (size_t i = 0; i < kTasks; ++i) {
         manager.addTask([&, completionCounter]() {
-          for (size_t i = 0; i < kIterations; ++i) {
+          for (size_t j = 0; j < kIterations; ++j) {
             sem.wait();
             ++counter;
             sem.signal();
@@ -1595,6 +1597,540 @@ TEST(FiberManager, semaphore) {
   EXPECT_GE(counterB, 0);
 }
 
+template <typename ExecutorT>
+void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
+  thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
+      executor, [=](std::vector<int>&& batch) {
+        EXPECT_EQ(batchSize, batch.size());
+        std::vector<std::string> results;
+        for (auto& it : batch) {
+          results.push_back(folly::to<std::string>(it));
+        }
+        return results;
+      });
+
+  auto indexCopy = index;
+  auto result = batchDispatcher.add(std::move(indexCopy));
+  EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
+
+TEST(FiberManager, batchDispatchTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int batchSize = 10;
+    for (int i = 0; i < batchSize; i++) {
+      executor.add(
+          [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+    }
+  });
+  evb.loop();
+
+  // Reuse the same BatchDispatcher to batch once again.
+  executor.add([&]() {
+    int batchSize = 10;
+    for (int i = 0; i < batchSize; i++) {
+      executor.add(
+          [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
+    }
+  });
+  evb.loop();
+}
+
+template <typename ExecutorT>
+folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
+    ExecutorT& executor,
+    int totalNumberOfElements,
+    std::vector<int> input) {
+  thread_local BatchDispatcher<
+      std::vector<int>,
+      std::vector<std::string>,
+      ExecutorT>
+      batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
+        std::vector<std::vector<std::string>> results;
+        int numberOfElements = 0;
+        for (auto& unit : batch) {
+          numberOfElements += unit.size();
+          std::vector<std::string> result;
+          for (auto& element : unit) {
+            result.push_back(folly::to<std::string>(element));
+          }
+          results.push_back(std::move(result));
+        }
+        EXPECT_EQ(totalNumberOfElements, numberOfElements);
+        return results;
+      });
+
+  return batchDispatcher.add(std::move(input));
+}
+
+/**
+ * Batch values in groups of 5, and then call inner dispatch.
+ */
+template <typename ExecutorT>
+void doubleBatchOuterDispatch(
+    ExecutorT& executor,
+    int totalNumberOfElements,
+    int index) {
+  thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
+      executor, [=, &executor](std::vector<int>&& batch) {
+        EXPECT_EQ(totalNumberOfElements, batch.size());
+        std::vector<std::string> results;
+        std::vector<folly::Future<std::vector<std::string>>>
+            innerDispatchResultFutures;
+
+        std::vector<int> group;
+        for (auto unit : batch) {
+          group.push_back(unit);
+          if (group.size() == 5) {
+            auto localGroup = group;
+            group.clear();
+
+            innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
+                executor, totalNumberOfElements, localGroup));
+          }
+        }
+
+        folly::collectAll(
+            innerDispatchResultFutures.begin(),
+            innerDispatchResultFutures.end())
+            .then([&](std::vector<Try<std::vector<std::string>>>
+                          innerDispatchResults) {
+              for (auto& unit : innerDispatchResults) {
+                for (auto& element : unit.value()) {
+                  results.push_back(element);
+                }
+              }
+            })
+            .get();
+        return results;
+      });
+
+  auto indexCopy = index;
+  auto result = batchDispatcher.add(std::move(indexCopy));
+  EXPECT_EQ(folly::to<std::string>(index), result.get());
+}
+
+TEST(FiberManager, doubleBatchDispatchTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int totalNumberOfElements = 20;
+    for (int i = 0; i < totalNumberOfElements; i++) {
+      executor.add([=, &executor]() {
+        doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
+      });
+    }
+  });
+  evb.loop();
+}
+
+template <typename ExecutorT>
+void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
+  thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
+      executor, [](std::vector<int> &&) -> std::vector<int> {
+        throw std::runtime_error("Surprise!!");
+      });
+
+  EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
+}
+
+TEST(FiberManager, batchDispatchExceptionHandlingTest) {
+  folly::EventBase evb;
+  auto& executor = getFiberManager(evb);
+
+  // Launch multiple fibers with a single id.
+  executor.add([&]() {
+    int totalNumberOfElements = 5;
+    for (int i = 0; i < totalNumberOfElements; i++) {
+      executor.add(
+          [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
+    }
+  });
+  evb.loop();
+}
+
+namespace AtomicBatchDispatcherTesting {
+
+using ValueT = size_t;
+using ResultT = std::string;
+using DispatchFunctionT =
+    folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
+
+#define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
+#if ENABLE_TRACE_IN_TEST
+#define OUTPUT_TRACE std::cerr
+#else // ENABLE_TRACE_IN_TEST
+struct DevNullPiper {
+  template <typename T>
+  DevNullPiper& operator<<(const T&) {
+    return *this;
+  }
+
+  DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
+    return *this;
+  }
+} devNullPiper;
+#define OUTPUT_TRACE devNullPiper
+#endif // ENABLE_TRACE_IN_TEST
+
+struct Job {
+  AtomicBatchDispatcher<ValueT, ResultT>::Token token;
+  ValueT input;
+
+  void preprocess(FiberManager& executor, bool die) {
+    // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
+    clock_t msecToDoIO = folly::Random::rand32() % 10;
+    double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
+    double endAfter = start + msecToDoIO;
+    while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
+      executor.yield();
+    }
+    if (die) {
+      throw std::logic_error("Simulating preprocessing failure");
+    }
+  }
+
+  Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
+      : token(std::move(t)), input(i) {}
+
+  Job(Job&&) = default;
+  Job& operator=(Job&&) = default;
+};
+
+ResultT processSingleInput(ValueT&& input) {
+  return folly::to<ResultT>(std::move(input));
+}
+
+std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
+  size_t expectedCount = inputs.size();
+  std::vector<ResultT> results;
+  results.reserve(expectedCount);
+  for (size_t i = 0; i < expectedCount; ++i) {
+    results.emplace_back(processSingleInput(std::move(inputs[i])));
+  }
+  return results;
+}
+
+void createJobs(
+    AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
+    std::vector<Job>& jobs,
+    size_t count) {
+  jobs.clear();
+  for (size_t i = 0; i < count; ++i) {
+    jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
+  }
+}
+
+enum class DispatchProblem {
+  None,
+  PreprocessThrows,
+  DuplicateDispatch,
+};
+
+void dispatchJobs(
+    FiberManager& executor,
+    std::vector<Job>& jobs,
+    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+    DispatchProblem dispatchProblem = DispatchProblem::None,
+    size_t problemIndex = size_t(-1)) {
+  EXPECT_TRUE(
+      dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
+  results.clear();
+  results.resize(jobs.size());
+  for (size_t i = 0; i < jobs.size(); ++i) {
+    executor.add(
+        [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
+          try {
+            Job job(std::move(jobs[i]));
+
+            if (dispatchProblem == DispatchProblem::PreprocessThrows) {
+              if (i == problemIndex) {
+                EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
+                return;
+              }
+            }
+
+            job.preprocess(executor, false);
+            OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
+            results[i] = job.token.dispatch(job.input);
+            OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
+
+            if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
+              if (i == problemIndex) {
+                EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
+              }
+            }
+          } catch (...) {
+            OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
+          }
+        });
+  }
+}
+
+void validateResult(
+    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+    size_t i) {
+  try {
+    OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
+                 << std::endl;
+  } catch (std::exception& e) {
+    OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
+    throw;
+  }
+}
+
+template <typename TException>
+void validateResults(
+    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+    size_t expectedNumResults) {
+  size_t numResultsFilled = 0;
+  for (size_t i = 0; i < results.size(); ++i) {
+    if (!results[i]) {
+      continue;
+    }
+    ++numResultsFilled;
+    EXPECT_THROW(validateResult(results, i), TException);
+  }
+  EXPECT_EQ(numResultsFilled, expectedNumResults);
+}
+
+void validateResults(
+    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+    size_t expectedNumResults) {
+  size_t numResultsFilled = 0;
+  for (size_t i = 0; i < results.size(); ++i) {
+    if (!results[i]) {
+      continue;
+    }
+    ++numResultsFilled;
+    EXPECT_NO_THROW(validateResult(results, i));
+    ValueT expectedInput = i;
+    EXPECT_EQ(
+        results[i]->value(), processSingleInput(std::move(expectedInput)));
+  }
+  EXPECT_EQ(numResultsFilled, expectedNumResults);
+}
+
+} // AtomicBatchDispatcherTesting
+
+#define SET_UP_TEST_FUNC                                        \
+  using namespace AtomicBatchDispatcherTesting;                 \
+  folly::EventBase evb;                                         \
+  auto& executor = getFiberManager(evb);                        \
+  const size_t COUNT = 11;                                      \
+  std::vector<Job> jobs;                                        \
+  jobs.reserve(COUNT);                                          \
+  std::vector<folly::Optional<folly::Future<ResultT>>> results; \
+  results.reserve(COUNT);                                       \
+  DispatchFunctionT dispatchFunc
+
+TEST(FiberManager, ABD_Test) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing AtomicBatchDispatcher with explicit call to commit()
+  //
+  dispatchFunc = userDispatchFunc;
+  auto atomicBatchDispatcher =
+      createAtomicBatchDispatcher(std::move(dispatchFunc));
+  createJobs(atomicBatchDispatcher, jobs, COUNT);
+  dispatchJobs(executor, jobs, results);
+  atomicBatchDispatcher.commit();
+  evb.loop();
+  validateResults(results, COUNT);
+}
+
+TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing AtomicBatchDispatcher destroyed before calling commit.
+  // Handles error cases for:
+  // - User might have forgotten to add the call to commit() in the code
+  // - An unexpected exception got thrown in user code before commit() is called
+  //
+  try {
+    dispatchFunc = userDispatchFunc;
+    auto atomicBatchDispatcher =
+        createAtomicBatchDispatcher(std::move(dispatchFunc));
+    createJobs(atomicBatchDispatcher, jobs, COUNT);
+    dispatchJobs(executor, jobs, results);
+    throw std::runtime_error(
+        "Unexpected exception in user code before commit called");
+    // atomicBatchDispatcher.commit();
+  } catch (...) {
+    /* User code handles the exception and does not exit process */
+  }
+  evb.loop();
+  validateResults<ABDCommitNotCalledException>(results, COUNT);
+}
+
+TEST(FiberManager, ABD_PreprocessingFailureTest) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing preprocessing failure on a job throws
+  //
+  dispatchFunc = userDispatchFunc;
+  auto atomicBatchDispatcher =
+      createAtomicBatchDispatcher(std::move(dispatchFunc));
+  createJobs(atomicBatchDispatcher, jobs, COUNT);
+  dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
+  atomicBatchDispatcher.commit();
+  evb.loop();
+  validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
+}
+
+TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing that calling dispatch more than once on the same token throws
+  //
+  dispatchFunc = userDispatchFunc;
+  auto atomicBatchDispatcher =
+      createAtomicBatchDispatcher(std::move(dispatchFunc));
+  createJobs(atomicBatchDispatcher, jobs, COUNT);
+  dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
+  atomicBatchDispatcher.commit();
+  evb.loop();
+}
+
+TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing that exception set on attempt to call getToken after commit called
+  //
+  dispatchFunc = userDispatchFunc;
+  auto atomicBatchDispatcher =
+      createAtomicBatchDispatcher(std::move(dispatchFunc));
+  createJobs(atomicBatchDispatcher, jobs, COUNT);
+  atomicBatchDispatcher.commit();
+  EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
+  dispatchJobs(executor, jobs, results);
+  EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
+  evb.loop();
+  validateResults(results, COUNT);
+  EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
+}
+
+TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing that exception is set if user provided batch dispatch throws
+  //
+  dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
+    (void)userDispatchFunc(std::move(inputs));
+    throw std::runtime_error("Unexpected exception in user dispatch function");
+  };
+  auto atomicBatchDispatcher =
+      createAtomicBatchDispatcher(std::move(dispatchFunc));
+  createJobs(atomicBatchDispatcher, jobs, COUNT);
+  dispatchJobs(executor, jobs, results);
+  atomicBatchDispatcher.commit();
+  evb.loop();
+  validateResults<std::runtime_error>(results, COUNT);
+}
+
+TEST(FiberManager, VirtualEventBase) {
+  bool done1{false};
+  bool done2{false};
+  {
+    folly::ScopedEventBaseThread thread;
+
+    auto evb1 =
+        std::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
+    auto& evb2 = thread.getEventBase()->getVirtualEventBase();
+
+    getFiberManager(*evb1).addTaskRemote([&] {
+      Baton baton;
+      baton.timed_wait(std::chrono::milliseconds{100});
+
+      done1 = true;
+    });
+
+    getFiberManager(evb2).addTaskRemote([&] {
+      Baton baton;
+      baton.timed_wait(std::chrono::milliseconds{200});
+
+      done2 = true;
+    });
+
+    EXPECT_FALSE(done1);
+    EXPECT_FALSE(done2);
+
+    evb1.reset();
+    EXPECT_TRUE(done1);
+    EXPECT_FALSE(done2);
+  }
+  EXPECT_TRUE(done2);
+}
+
+TEST(TimedMutex, ThreadFiberDeadlockOrder) {
+  folly::EventBase evb;
+  auto& fm = getFiberManager(evb);
+  TimedMutex mutex;
+
+  mutex.lock();
+  std::thread unlockThread([&] {
+    /* sleep override */ std::this_thread::sleep_for(
+        std::chrono::milliseconds{100});
+    mutex.unlock();
+  });
+
+  fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
+  fm.addTask([&] {
+    runInMainContext([&] {
+      auto locked = mutex.timed_lock(std::chrono::seconds{1});
+      EXPECT_TRUE(locked);
+      if (locked) {
+        mutex.unlock();
+      }
+    });
+  });
+
+  evb.loopOnce();
+  EXPECT_EQ(0, fm.hasTasks());
+
+  unlockThread.join();
+}
+
+TEST(TimedMutex, ThreadFiberDeadlockRace) {
+  folly::EventBase evb;
+  auto& fm = getFiberManager(evb);
+  TimedMutex mutex;
+
+  mutex.lock();
+
+  fm.addTask([&] {
+    auto locked = mutex.timed_lock(std::chrono::seconds{1});
+    EXPECT_TRUE(locked);
+    if (locked) {
+      mutex.unlock();
+    }
+  });
+  fm.addTask([&] {
+    mutex.unlock();
+    runInMainContext([&] {
+      auto locked = mutex.timed_lock(std::chrono::seconds{1});
+      EXPECT_TRUE(locked);
+      if (locked) {
+        mutex.unlock();
+      }
+    });
+  });
+
+  evb.loopOnce();
+  EXPECT_EQ(0, fm.hasTasks());
+}
+
 /**
  * Test that we can properly track fiber stack usage.
  *
@@ -1603,15 +2139,15 @@ TEST(FiberManager, semaphore) {
  */
 #ifndef FOLLY_SANITIZE_ADDRESS
 TEST(FiberManager, recordStack) {
-  std::thread([] {
+  auto f = [] {
     folly::fibers::FiberManager::Options opts;
     opts.recordStackEvery = 1;
 
-    FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
+    FiberManager fm(std::make_unique<SimpleLoopController>(), opts);
     auto& loopController =
         dynamic_cast<SimpleLoopController&>(fm.loopController());
 
-    constexpr size_t n = 1000;
+    static constexpr size_t n = 1000;
     int s = 0;
     fm.addTask([&]() {
       int b[n] = {0};
@@ -1629,6 +2165,7 @@ TEST(FiberManager, recordStack) {
 
     // Check that we properly accounted fiber stack usage.
     EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
-  }).join();
+  };
+  std::thread(f).join();
 }
 #endif