+namespace AtomicBatchDispatcherTesting {
+
+using ValueT = size_t;
+using ResultT = std::string;
+using DispatchFunctionT =
+ folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
+
+#define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
+#if ENABLE_TRACE_IN_TEST
+#define OUTPUT_TRACE std::cerr
+#else // ENABLE_TRACE_IN_TEST
+struct DevNullPiper {
+ template <typename T>
+ DevNullPiper& operator<<(const T&) {
+ return *this;
+ }
+
+ DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
+ return *this;
+ }
+} devNullPiper;
+#define OUTPUT_TRACE devNullPiper
+#endif // ENABLE_TRACE_IN_TEST
+
+struct Job {
+ AtomicBatchDispatcher<ValueT, ResultT>::Token token;
+ ValueT input;
+
+ void preprocess(FiberManager& executor, bool die) {
+ // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
+ clock_t msecToDoIO = folly::Random::rand32() % 10;
+ double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
+ double endAfter = start + msecToDoIO;
+ while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
+ executor.yield();
+ }
+ if (die) {
+ throw std::logic_error("Simulating preprocessing failure");
+ }
+ }
+
+ Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
+ : token(std::move(t)), input(i) {}
+
+ Job(Job&&) = default;
+ Job& operator=(Job&&) = default;
+};
+
+ResultT processSingleInput(ValueT&& input) {
+ return folly::to<ResultT>(std::move(input));
+}
+
+std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
+ size_t expectedCount = inputs.size();
+ std::vector<ResultT> results;
+ results.reserve(expectedCount);
+ for (size_t i = 0; i < expectedCount; ++i) {
+ results.emplace_back(processSingleInput(std::move(inputs[i])));
+ }
+ return results;
+}
+
+void createJobs(
+ AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
+ std::vector<Job>& jobs,
+ size_t count) {
+ jobs.clear();
+ for (size_t i = 0; i < count; ++i) {
+ jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
+ }
+}
+
+enum class DispatchProblem {
+ None,
+ PreprocessThrows,
+ DuplicateDispatch,
+};
+
+void dispatchJobs(
+ FiberManager& executor,
+ std::vector<Job>& jobs,
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ DispatchProblem dispatchProblem = DispatchProblem::None,
+ size_t problemIndex = size_t(-1)) {
+ EXPECT_TRUE(
+ dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
+ results.clear();
+ results.resize(jobs.size());
+ for (size_t i = 0; i < jobs.size(); ++i) {
+ executor.add(
+ [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
+ try {
+ Job job(std::move(jobs[i]));
+
+ if (dispatchProblem == DispatchProblem::PreprocessThrows) {
+ if (i == problemIndex) {
+ EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
+ return;
+ }
+ }
+
+ job.preprocess(executor, false);
+ OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
+ results[i] = job.token.dispatch(job.input);
+ OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
+
+ if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
+ if (i == problemIndex) {
+ EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
+ }
+ }
+ } catch (...) {
+ OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
+ }
+ });
+ }
+}
+
+void validateResult(
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ size_t i) {
+ try {
+ OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
+ << std::endl;
+ } catch (std::exception& e) {
+ OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
+ throw;
+ }
+}
+
+template <typename TException>
+void validateResults(
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ size_t expectedNumResults) {
+ size_t numResultsFilled = 0;
+ for (size_t i = 0; i < results.size(); ++i) {
+ if (!results[i]) {
+ continue;
+ }
+ ++numResultsFilled;
+ EXPECT_THROW(validateResult(results, i), TException);
+ }
+ EXPECT_EQ(numResultsFilled, expectedNumResults);
+}
+
+void validateResults(
+ std::vector<folly::Optional<folly::Future<ResultT>>>& results,
+ size_t expectedNumResults) {
+ size_t numResultsFilled = 0;
+ for (size_t i = 0; i < results.size(); ++i) {
+ if (!results[i]) {
+ continue;
+ }
+ ++numResultsFilled;
+ EXPECT_NO_THROW(validateResult(results, i));
+ ValueT expectedInput = i;
+ EXPECT_EQ(
+ results[i]->value(), processSingleInput(std::move(expectedInput)));
+ }
+ EXPECT_EQ(numResultsFilled, expectedNumResults);
+}
+
+} // AtomicBatchDispatcherTesting
+
+#define SET_UP_TEST_FUNC \
+ using namespace AtomicBatchDispatcherTesting; \
+ folly::EventBase evb; \
+ auto& executor = getFiberManager(evb); \
+ const size_t COUNT = 11; \
+ std::vector<Job> jobs; \
+ jobs.reserve(COUNT); \
+ std::vector<folly::Optional<folly::Future<ResultT>>> results; \
+ results.reserve(COUNT); \
+ DispatchFunctionT dispatchFunc
+
+TEST(FiberManager, ABD_Test) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing AtomicBatchDispatcher with explicit call to commit()
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+ validateResults(results, COUNT);
+}
+
+TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing AtomicBatchDispatcher destroyed before calling commit.
+ // Handles error cases for:
+ // - User might have forgotten to add the call to commit() in the code
+ // - An unexpected exception got thrown in user code before commit() is called
+ //
+ try {
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results);
+ throw std::runtime_error(
+ "Unexpected exception in user code before commit called");
+ atomicBatchDispatcher.commit();
+ } catch (...) {
+ /* User code handles the exception and does not exit process */
+ }
+ evb.loop();
+ validateResults<std::logic_error>(results, COUNT);
+}
+
+TEST(FiberManager, ABD_PreprocessingFailureTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing preprocessing failure on a job throws
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+ validateResults<std::logic_error>(results, COUNT - 1);
+}
+
+TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing that calling dispatch more than once on the same token throws
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+}
+
+TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing that exception set on attempt to call getToken after commit called
+ //
+ dispatchFunc = userDispatchFunc;
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ atomicBatchDispatcher.commit();
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+ dispatchJobs(executor, jobs, results);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+ evb.loop();
+ validateResults(results, COUNT);
+ EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
+}
+
+TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
+ SET_UP_TEST_FUNC;
+
+ //
+ // Testing that exception is set if user provided batch dispatch throws
+ //
+ dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
+ auto results = userDispatchFunc(std::move(inputs));
+ throw std::runtime_error("Unexpected exception in user dispatch function");
+ return results;
+ };
+ auto atomicBatchDispatcher =
+ createAtomicBatchDispatcher(std::move(dispatchFunc));
+ createJobs(atomicBatchDispatcher, jobs, COUNT);
+ dispatchJobs(executor, jobs, results);
+ atomicBatchDispatcher.commit();
+ evb.loop();
+ validateResults<std::runtime_error>(results, COUNT);
+}
+