Implement AtomicBatchDispatcher in folly::fibers
authorAmeya Limaye <ameyal@fb.com>
Mon, 31 Oct 2016 16:26:16 +0000 (09:26 -0700)
committerFacebook Github Bot <facebook-github-bot-bot@fb.com>
Mon, 31 Oct 2016 16:38:32 +0000 (09:38 -0700)
Summary:
Implement AtomicBatchDispatcher in folly::fibers
- Details about how to use the added functionality can be found
  in the doc comment for class AtomicBatchDispatcher.

Reviewed By: andriigrynenko

Differential Revision: D4054148

fbshipit-source-id: 090272eeab8c8abb15d5e400e52725853fcfc364

folly/Makefile.am
folly/fibers/AtomicBatchDispatcher-inl.h [new file with mode: 0644]
folly/fibers/AtomicBatchDispatcher.h [new file with mode: 0644]
folly/fibers/test/FibersTest.cpp

index b7860ee9a1c6880006b4801e2532c5844f5f851a..0ba093ff9023a796c735cdb5ccfa7dbb54b34f3f 100644 (file)
@@ -515,6 +515,8 @@ if HAVE_BOOST_CONTEXT
 nobase_follyinclude_HEADERS += \
        fibers/AddTasks.h \
        fibers/AddTasks-inl.h \
+       fibers/AtomicBatchDispatcher.h \
+       fibers/AtomicBatchDispatcher-inl.h \
        fibers/Baton.h \
        fibers/Baton-inl.h \
        fibers/BatchDispatcher.h \
diff --git a/folly/fibers/AtomicBatchDispatcher-inl.h b/folly/fibers/AtomicBatchDispatcher-inl.h
new file mode 100644 (file)
index 0000000..45209cc
--- /dev/null
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+namespace folly {
+namespace fibers {
+
+template <typename InputT, typename ResultT>
+struct AtomicBatchDispatcher<InputT, ResultT>::DispatchBaton {
+  DispatchBaton(DispatchFunctionT&& dispatchFunction)
+      : expectedCount_(0), dispatchFunction_(std::move(dispatchFunction)) {}
+
+  ~DispatchBaton() {
+    fulfillPromises();
+  }
+
+  void reserve(size_t numEntries) {
+    optEntries_.reserve(numEntries);
+  }
+
+  void setError(std::string message) {
+    optErrorMessage_ = std::move(message);
+  }
+
+  void setExpectedCount(size_t expectedCount) {
+    expectedCount_ = expectedCount;
+  }
+
+  Future<ResultT> getFutureResult(InputT&& input, size_t sequenceNumber) {
+    if (sequenceNumber >= optEntries_.size()) {
+      optEntries_.resize(sequenceNumber + 1);
+    }
+    folly::Optional<Entry>& optEntry = optEntries_[sequenceNumber];
+    if (optEntry) {
+      throw std::logic_error(
+          "Cannot have multiple inputs with same token sequence number");
+    }
+    optEntry = Entry(std::move(input));
+    return optEntry->promise.getFuture();
+  }
+
+ private:
+  void setExceptionResults(std::exception_ptr eptr) {
+    auto exceptionWrapper = exception_wrapper(eptr);
+    for (auto& optEntry : optEntries_) {
+      if (optEntry) {
+        optEntry->promise.setException(exceptionWrapper);
+      }
+    }
+  }
+
+  template <typename TException>
+  void setExceptionResults(
+      const TException& ex,
+      std::exception_ptr eptr = std::exception_ptr()) {
+    auto exceptionWrapper =
+        eptr ? exception_wrapper(eptr, ex) : exception_wrapper(ex);
+    for (auto& optEntry : optEntries_) {
+      if (optEntry) {
+        optEntry->promise.setException(exceptionWrapper);
+      }
+    }
+  }
+
+  void fulfillPromises() {
+    try {
+      // If an error message is set, set all promises to exception with message
+      if (optErrorMessage_) {
+        auto ex = std::logic_error(*optErrorMessage_);
+        return setExceptionResults(std::move(ex));
+      }
+
+      // Create inputs vector and validate entries count same as expectedCount_
+      std::vector<InputT> inputs;
+      inputs.reserve(expectedCount_);
+      bool allEntriesFound = (optEntries_.size() == expectedCount_);
+      if (allEntriesFound) {
+        for (auto& optEntry : optEntries_) {
+          if (!optEntry) {
+            allEntriesFound = false;
+            break;
+          }
+          inputs.emplace_back(std::move(optEntry->input));
+        }
+      }
+      if (!allEntriesFound) {
+        auto ex = std::logic_error(
+            "One or more input tokens destroyed before calling dispatch");
+        return setExceptionResults(std::move(ex));
+      }
+
+      // Call the user provided batch dispatch function to get all results
+      // and make sure that we have the expected number of results returned
+      auto results = dispatchFunction_(std::move(inputs));
+      if (results.size() != expectedCount_) {
+        auto ex = std::logic_error(
+            "Unexpected number of results returned from dispatch function");
+        return setExceptionResults(std::move(ex));
+      }
+
+      // Fulfill the promises with the results from the batch dispatch
+      for (size_t i = 0; i < expectedCount_; ++i) {
+        optEntries_[i]->promise.setValue(std::move(results[i]));
+      }
+    } catch (const std::exception& ex) {
+      return setExceptionResults(ex, std::current_exception());
+    } catch (...) {
+      return setExceptionResults(std::current_exception());
+    }
+  }
+
+  struct Entry {
+    InputT input;
+    folly::Promise<ResultT> promise;
+
+    Entry(Entry&& other) noexcept
+        : input(std::move(other.input)), promise(std::move(other.promise)) {}
+
+    Entry& operator=(Entry&& other) noexcept {
+      input = std::move(other.input);
+      promise = std::move(other.promise);
+      return *this;
+    }
+
+    explicit Entry(InputT&& input) : input(std::move(input)) {}
+  };
+
+  size_t expectedCount_;
+  DispatchFunctionT dispatchFunction_;
+  std::vector<folly::Optional<Entry>> optEntries_;
+  folly::Optional<std::string> optErrorMessage_;
+};
+
+template <typename InputT, typename ResultT>
+AtomicBatchDispatcher<InputT, ResultT>::Token::Token(
+    std::shared_ptr<DispatchBaton> baton,
+    size_t sequenceNumber)
+    : baton_(std::move(baton)), SEQUENCE_NUMBER(sequenceNumber) {}
+
+template <typename InputT, typename ResultT>
+Future<ResultT> AtomicBatchDispatcher<InputT, ResultT>::Token::dispatch(
+    InputT input) {
+  auto baton = std::move(baton_);
+  if (!baton) {
+    throw std::logic_error(
+        "Dispatch called more than once on the same Token object");
+  }
+  return baton->getFutureResult(std::move(input), SEQUENCE_NUMBER);
+}
+
+template <typename InputT, typename ResultT>
+AtomicBatchDispatcher<InputT, ResultT>::AtomicBatchDispatcher(
+    DispatchFunctionT&& dispatchFunc)
+    : numTokensIssued_(0),
+      baton_(std::make_shared<DispatchBaton>(std::move(dispatchFunc))) {}
+
+template <typename InputT, typename ResultT>
+AtomicBatchDispatcher<InputT, ResultT>::~AtomicBatchDispatcher() {
+  if (baton_) {
+    baton_->setError(
+        "AtomicBatchDispatcher destroyed before commit() was called on it");
+    commit();
+  }
+}
+
+template <typename InputT, typename ResultT>
+void AtomicBatchDispatcher<InputT, ResultT>::reserve(size_t numEntries) {
+  if (!baton_) {
+    throw std::logic_error("Cannot call reserve(....) after calling commit()");
+  }
+  baton_->reserve(numEntries);
+}
+
+template <typename InputT, typename ResultT>
+auto AtomicBatchDispatcher<InputT, ResultT>::getToken() -> Token {
+  if (!baton_) {
+    throw std::logic_error("Cannot issue more tokens after calling commit()");
+  }
+  return Token(baton_, numTokensIssued_++);
+}
+
+template <typename InputT, typename ResultT>
+void AtomicBatchDispatcher<InputT, ResultT>::commit() {
+  auto baton = std::move(baton_);
+  if (!baton) {
+    throw std::logic_error(
+        "Cannot call commit() more than once on the same dispatcher");
+  }
+  baton->setExpectedCount(numTokensIssued_);
+}
+
+template <typename InputT, typename ResultT>
+AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
+    folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
+    size_t initialCapacity) {
+  auto abd = AtomicBatchDispatcher<InputT, ResultT>(std::move(dispatchFunc));
+  if (initialCapacity) {
+    abd.reserve(initialCapacity);
+  }
+  return abd;
+}
+
+} // namespace fibers
+} // manespace folly
diff --git a/folly/fibers/AtomicBatchDispatcher.h b/folly/fibers/AtomicBatchDispatcher.h
new file mode 100644 (file)
index 0000000..bfeafa2
--- /dev/null
@@ -0,0 +1,198 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/Function.h>
+#include <folly/Optional.h>
+#include <folly/futures/Future.h>
+#include <folly/futures/Promise.h>
+#include <memory>
+#include <utility>
+#include <vector>
+
+namespace folly {
+namespace fibers {
+
+/**
+ * AtomicBatchDispatcher should be used if you want to process fiber tasks in
+ * parallel, but require to synchronize them at some point. The canonical
+ * example is to create a database transaction dispatch round. This API notably
+ * enforces that all tasks in the batch have reached the synchronization point
+ * before the user provided dispatch function is called with all the inputs
+ * provided in one function call. It also provides a guarantee that the inputs
+ * in the vector of inputs passed to the user provided dispatch function will be
+ * in the same order as the order in which the token for the job was issued.
+ *
+ * Use this when you want all the inputs in the batch to be processed by a
+ * single function call to the user provided dispatch function.
+ * The user provided dispatch function takes a vector of InputT as input and
+ * returns a vector of ResultT.
+ * To use an AtomicBatchDispatcher, create it by providing a dispatch function:
+ * TO EITHER the constructor of the AtomicBatchDispatcher class
+ * (can call reserve method on the dispatcher to reserve space (for number of
+ *  inputs expected)),
+ * OR the createAtomicBatchDispatcher function in folly::fibers namespace
+ *    (optionally specify an initial capacity (for number of inputs expected)).
+ * The AtomicBatchDispatcher object created using this call (dispatcher),
+ * is the only object that can issue tokens (Token objects) that are used to
+ * add an input to the batch. A single Token is issued when the user calls
+ * the getToken function on the dispatcher.
+ * Token objects cannot be copied (can only be moved). User can call the public
+ * dispatch function on the Token providing a single input value. The dispatch
+ * function returns a folly::Future<ResultT> value that the user can then wait
+ * on to obtain a ResultT value. The ResultT value will only be available once
+ * the dispatch function has been called on all the Tokens in the batch and the
+ * user has called dispatcher.commit() to indicate no more batched transactions
+ * are to be added.
+ * User code pertaining to a task can be run between the point where a token for
+ * the task has been issued and before calling the dispatch function on the
+ * token. Since this code can potentially throw, the token issued for a task
+ * should be moved into this processing code in such a way that if an exception
+ * is thrown and then handled, the token object for the task is destroyed.
+ * The batch query dispatcher will wait until all tokens have either been
+ * destroyed or have had the dispatch function called on them. Leaking an
+ * issued token will cause the batch dispatch to wait forever to happen.
+ *
+ * The AtomicBatchDispatcher object is referred to as the dispatcher below.
+ *
+ * POSSIBLE ERRORS:
+ * 1) The dispatcher is destroyed before calling commit on it, for example
+ *    because the user forgot to call commit OR an exception was thrown
+ *    in user code before the call to commit:
+ *    - The future ResultT has an exception of type std::logic_error set for all
+ *      tokens that were issued by the dispatcher (once all tokens are either
+ *      destroyed or have called dispatch)
+ * 2) Calling the dispatch function more than once on the same Token object
+ *    (or a moved version of the same Token):
+ *    - Subsequent calls to dispatch (after the first one) will throw an
+ *      std::logic_error exception (the batch itself will not have any errors
+ *      and will get processed)
+ * 3) One/more of the Tokens issued are destroyed before calling dispatch on
+ *    it/them:
+ *    - The future ResultT has an exception of type std::logic_error set for all
+ *      tokens that were issued by the dispatcher (once all tokens are either
+ *      destroyed or have called dispatch)
+ * 4) dispatcher.getToken() is called after calling dispatcher.commit()
+ *    - the call to getToken() will throw an std::logic_error exception
+ *      (the batch itself will not have any errors and will get processed).
+ * 5) All tokens were issued and called dispatch, the user provided batch
+ *    dispatch function is called, but that function throws any exception.
+ *    - The future ResultT has exception for all tokens that were issued by
+ *      the dispatcher. The result will contain the wrapped user exception.
+ *
+ * EXAMPLE (There are other ways to achieve this, but this is one example):
+ * - User creates an AtomicBatchDispatcher on stack
+ *     auto dispatcher =
+ *         folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count);
+ * - User creates "count" number of token objects by calling "getToken" count
+ *   number of times
+ *     std::vector<Job> jobs;
+ *     for (size_t i = 0; i < count; ++i) {
+ *       auto token = dispatcher.getToken();
+ *       jobs.push_back(Job(std::move(token), singleInputValueToProcess);
+ *     }
+ * - User calls commit() on the dispatcher to indicate that no new tokens will
+ *   be issued for this batch
+ *     dispatcher.commit();
+ * - Use any single threaded executor that will process the jobs
+ * - On each execution (fiber) preprocess a single "Job" that has been moved in
+ *   from the original vector "jobs". This way if the preprocessing throws
+ *   the Job object being processed is destroyed and so is the token.
+ * - On each execution (fiber) call the dispatch on the token
+ *     auto future = job.token.dispatch(job.input);
+ * - Save the future returned so that eventually you can wait on the results
+ *     ResultT result;
+ *     try {
+ *       result = future.value();
+ *       // future.hasValue() is true
+ *     } catch (...) {
+ *       // future.hasException() is true
+ *       <DO WHATEVER YOU WANT IN CASE OF ERROR> }
+ *     }
+ *
+ * NOTES:
+ * - AtomicBatchDispatcher is not thread safe.
+ * - Works for executors that run tasks on a single thread.
+ */
+template <typename InputT, typename ResultT>
+class AtomicBatchDispatcher {
+ private:
+  struct DispatchBaton;
+  friend struct DispatchBaton;
+
+ public:
+  using DispatchFunctionT =
+      folly::Function<std::vector<ResultT>(std::vector<InputT>&&)>;
+
+  class Token {
+   public:
+    explicit Token(std::shared_ptr<DispatchBaton> baton, size_t sequenceNumber);
+
+    Future<ResultT> dispatch(InputT input);
+
+    // Allow moving a Token object
+    Token(Token&&) = default;
+    Token& operator=(Token&&) = default;
+
+   private:
+    // Disallow copying a Token object
+    Token(const Token&) = delete;
+    Token& operator=(const Token&) = delete;
+
+    std::shared_ptr<DispatchBaton> baton_;
+    const size_t SEQUENCE_NUMBER;
+  };
+
+  explicit AtomicBatchDispatcher(DispatchFunctionT&& dispatchFunc);
+
+  ~AtomicBatchDispatcher();
+
+  // numEntries is a *hint* about the number of inputs to expect:
+  // - It is used purely to reserve space for storing vector of inputs etc.,
+  //   so that reeallocation and move copy are reduced / not needed.
+  // - It is provided purely for performance reasons
+  void reserve(size_t numEntries);
+
+  Token getToken();
+
+  void commit();
+
+  // Allow moving an AtomicBatchDispatcher object
+  AtomicBatchDispatcher(AtomicBatchDispatcher&&) = default;
+  AtomicBatchDispatcher& operator=(AtomicBatchDispatcher&&) = default;
+
+ private:
+  // Disallow copying an AtomicBatchDispatcher object
+  AtomicBatchDispatcher(const AtomicBatchDispatcher&) = delete;
+  AtomicBatchDispatcher& operator=(const AtomicBatchDispatcher&) = delete;
+
+  size_t numTokensIssued_;
+  std::shared_ptr<DispatchBaton> baton_;
+};
+
+// initialCapacity is a *hint* about the number of inputs to expect:
+// - It is used purely to reserve space for storing vector of inputs etc.,
+//   so that reeallocation and move copy are reduced / not needed.
+// - It is provided purely for performance reasons
+template <typename InputT, typename ResultT>
+AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
+    folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
+    size_t initialCapacity = 0);
+
+} // namespace fibers
+} // namespace folly
+
+#include <folly/fibers/AtomicBatchDispatcher-inl.h>
index e587ab44dff73ead067766582f7e7d11845d5501..859ccda3ee33859bb5e0e55192162afa3961371d 100644 (file)
 #include <vector>
 
 #include <folly/Memory.h>
+#include <folly/Random.h>
 #include <folly/futures/Future.h>
 
 #include <folly/Conv.h>
 #include <folly/fibers/AddTasks.h>
+#include <folly/fibers/AtomicBatchDispatcher.h>
 #include <folly/fibers/BatchDispatcher.h>
 #include <folly/fibers/EventBaseLoopController.h>
 #include <folly/fibers/FiberManager.h>
@@ -1753,6 +1755,292 @@ TEST(FiberManager, batchDispatchExceptionHandlingTest) {
   evb.loop();
 }
 
+namespace AtomicBatchDispatcherTesting {
+
+using ValueT = size_t;
+using ResultT = std::string;
+using DispatchFunctionT =
+    folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
+
+#define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
+#if ENABLE_TRACE_IN_TEST
+#define OUTPUT_TRACE std::cerr
+#else // ENABLE_TRACE_IN_TEST
+struct DevNullPiper {
+  template <typename T>
+  DevNullPiper& operator<<(const T&) {
+    return *this;
+  }
+
+  DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
+    return *this;
+  }
+} devNullPiper;
+#define OUTPUT_TRACE devNullPiper
+#endif // ENABLE_TRACE_IN_TEST
+
+struct Job {
+  AtomicBatchDispatcher<ValueT, ResultT>::Token token;
+  ValueT input;
+
+  void preprocess(FiberManager& executor, bool die) {
+    // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
+    clock_t msecToDoIO = folly::Random::rand32() % 10;
+    double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
+    double endAfter = start + msecToDoIO;
+    while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
+      executor.yield();
+    }
+    if (die) {
+      throw std::logic_error("Simulating preprocessing failure");
+    }
+  }
+
+  Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
+      : token(std::move(t)), input(i) {}
+
+  Job(Job&&) = default;
+  Job& operator=(Job&&) = default;
+};
+
+ResultT processSingleInput(ValueT&& input) {
+  return folly::to<ResultT>(std::move(input));
+}
+
+std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
+  size_t expectedCount = inputs.size();
+  std::vector<ResultT> results;
+  results.reserve(expectedCount);
+  for (size_t i = 0; i < expectedCount; ++i) {
+    results.emplace_back(processSingleInput(std::move(inputs[i])));
+  }
+  return results;
+}
+
+void createJobs(
+    AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
+    std::vector<Job>& jobs,
+    size_t count) {
+  jobs.clear();
+  for (size_t i = 0; i < count; ++i) {
+    jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
+  }
+}
+
+enum class DispatchProblem {
+  None,
+  PreprocessThrows,
+  DuplicateDispatch,
+};
+
+void dispatchJobs(
+    FiberManager& executor,
+    std::vector<Job>& jobs,
+    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+    DispatchProblem dispatchProblem = DispatchProblem::None,
+    size_t problemIndex = size_t(-1)) {
+  EXPECT_TRUE(
+      dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
+  results.clear();
+  results.resize(jobs.size());
+  for (size_t i = 0; i < jobs.size(); ++i) {
+    executor.add(
+        [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
+          try {
+            Job job(std::move(jobs[i]));
+
+            if (dispatchProblem == DispatchProblem::PreprocessThrows) {
+              if (i == problemIndex) {
+                EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
+                return;
+              }
+            }
+
+            job.preprocess(executor, false);
+            OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
+            results[i] = job.token.dispatch(job.input);
+            OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
+
+            if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
+              if (i == problemIndex) {
+                EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
+              }
+            }
+          } catch (...) {
+            OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
+          }
+        });
+  }
+}
+
+void validateResult(
+    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+    size_t i) {
+  try {
+    OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
+                 << std::endl;
+  } catch (std::exception& e) {
+    OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
+    throw;
+  }
+}
+
+template <typename TException>
+void validateResults(
+    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+    size_t expectedNumResults) {
+  size_t numResultsFilled = 0;
+  for (size_t i = 0; i < results.size(); ++i) {
+    if (!results[i]) {
+      continue;
+    }
+    ++numResultsFilled;
+    EXPECT_THROW(validateResult(results, i), TException);
+  }
+  EXPECT_EQ(numResultsFilled, expectedNumResults);
+}
+
+void validateResults(
+    std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+    size_t expectedNumResults) {
+  size_t numResultsFilled = 0;
+  for (size_t i = 0; i < results.size(); ++i) {
+    if (!results[i]) {
+      continue;
+    }
+    ++numResultsFilled;
+    EXPECT_NO_THROW(validateResult(results, i));
+    ValueT expectedInput = i;
+    EXPECT_EQ(
+        results[i]->value(), processSingleInput(std::move(expectedInput)));
+  }
+  EXPECT_EQ(numResultsFilled, expectedNumResults);
+}
+
+} // AtomicBatchDispatcherTesting
+
+#define SET_UP_TEST_FUNC                                        \
+  using namespace AtomicBatchDispatcherTesting;                 \
+  folly::EventBase evb;                                         \
+  auto& executor = getFiberManager(evb);                        \
+  const size_t COUNT = 11;                                      \
+  std::vector<Job> jobs;                                        \
+  jobs.reserve(COUNT);                                          \
+  std::vector<folly::Optional<folly::Future<ResultT>>> results; \
+  results.reserve(COUNT);                                       \
+  DispatchFunctionT dispatchFunc
+
+TEST(FiberManager, ABD_Test) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing AtomicBatchDispatcher with explicit call to commit()
+  //
+  dispatchFunc = userDispatchFunc;
+  auto atomicBatchDispatcher =
+      createAtomicBatchDispatcher(std::move(dispatchFunc));
+  createJobs(atomicBatchDispatcher, jobs, COUNT);
+  dispatchJobs(executor, jobs, results);
+  atomicBatchDispatcher.commit();
+  evb.loop();
+  validateResults(results, COUNT);
+}
+
+TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing AtomicBatchDispatcher destroyed before calling commit.
+  // Handles error cases for:
+  // - User might have forgotten to add the call to commit() in the code
+  // - An unexpected exception got thrown in user code before commit() is called
+  //
+  try {
+    dispatchFunc = userDispatchFunc;
+    auto atomicBatchDispatcher =
+        createAtomicBatchDispatcher(std::move(dispatchFunc));
+    createJobs(atomicBatchDispatcher, jobs, COUNT);
+    dispatchJobs(executor, jobs, results);
+    throw std::runtime_error(
+        "Unexpected exception in user code before commit called");
+    atomicBatchDispatcher.commit();
+  } catch (...) {
+    /* User code handles the exception and does not exit process */
+  }
+  evb.loop();
+  validateResults<std::logic_error>(results, COUNT);
+}
+
+TEST(FiberManager, ABD_PreprocessingFailureTest) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing preprocessing failure on a job throws
+  //
+  dispatchFunc = userDispatchFunc;
+  auto atomicBatchDispatcher =
+      createAtomicBatchDispatcher(std::move(dispatchFunc));
+  createJobs(atomicBatchDispatcher, jobs, COUNT);
+  dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
+  atomicBatchDispatcher.commit();
+  evb.loop();
+  validateResults<std::logic_error>(results, COUNT - 1);
+}
+
+TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing that calling dispatch more than once on the same token throws
+  //
+  dispatchFunc = userDispatchFunc;
+  auto atomicBatchDispatcher =
+      createAtomicBatchDispatcher(std::move(dispatchFunc));
+  createJobs(atomicBatchDispatcher, jobs, COUNT);
+  dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
+  atomicBatchDispatcher.commit();
+  evb.loop();
+}
+
+TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing that exception set on attempt to call getToken after commit called
+  //
+  dispatchFunc = userDispatchFunc;
+  auto atomicBatchDispatcher =
+      createAtomicBatchDispatcher(std::move(dispatchFunc));
+  createJobs(atomicBatchDispatcher, jobs, COUNT);
+  atomicBatchDispatcher.commit();
+  EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+  dispatchJobs(executor, jobs, results);
+  EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+  evb.loop();
+  validateResults(results, COUNT);
+  EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+}
+
+TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
+  SET_UP_TEST_FUNC;
+
+  //
+  // Testing that exception is set if user provided batch dispatch throws
+  //
+  dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
+    auto results = userDispatchFunc(std::move(inputs));
+    throw std::runtime_error("Unexpected exception in user dispatch function");
+    return results;
+  };
+  auto atomicBatchDispatcher =
+      createAtomicBatchDispatcher(std::move(dispatchFunc));
+  createJobs(atomicBatchDispatcher, jobs, COUNT);
+  dispatchJobs(executor, jobs, results);
+  atomicBatchDispatcher.commit();
+  evb.loop();
+  validateResults<std::runtime_error>(results, COUNT);
+}
+
 /**
  * Test that we can properly track fiber stack usage.
  *