From 7aa24338827eace3302af2b3fa2d34d262cc99e6 Mon Sep 17 00:00:00 2001 From: Ameya Limaye Date: Mon, 31 Oct 2016 09:26:16 -0700 Subject: [PATCH] Implement AtomicBatchDispatcher in folly::fibers Summary: Implement AtomicBatchDispatcher in folly::fibers - Details about how to use the added functionality can be found in the doc comment for class AtomicBatchDispatcher. Reviewed By: andriigrynenko Differential Revision: D4054148 fbshipit-source-id: 090272eeab8c8abb15d5e400e52725853fcfc364 --- folly/Makefile.am | 2 + folly/fibers/AtomicBatchDispatcher-inl.h | 215 +++++++++++++++++ folly/fibers/AtomicBatchDispatcher.h | 198 ++++++++++++++++ folly/fibers/test/FibersTest.cpp | 288 +++++++++++++++++++++++ 4 files changed, 703 insertions(+) create mode 100644 folly/fibers/AtomicBatchDispatcher-inl.h create mode 100644 folly/fibers/AtomicBatchDispatcher.h diff --git a/folly/Makefile.am b/folly/Makefile.am index b7860ee9..0ba093ff 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -515,6 +515,8 @@ if HAVE_BOOST_CONTEXT nobase_follyinclude_HEADERS += \ fibers/AddTasks.h \ fibers/AddTasks-inl.h \ + fibers/AtomicBatchDispatcher.h \ + fibers/AtomicBatchDispatcher-inl.h \ fibers/Baton.h \ fibers/Baton-inl.h \ fibers/BatchDispatcher.h \ diff --git a/folly/fibers/AtomicBatchDispatcher-inl.h b/folly/fibers/AtomicBatchDispatcher-inl.h new file mode 100644 index 00000000..45209cc6 --- /dev/null +++ b/folly/fibers/AtomicBatchDispatcher-inl.h @@ -0,0 +1,215 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace folly { +namespace fibers { + +template +struct AtomicBatchDispatcher::DispatchBaton { + DispatchBaton(DispatchFunctionT&& dispatchFunction) + : expectedCount_(0), dispatchFunction_(std::move(dispatchFunction)) {} + + ~DispatchBaton() { + fulfillPromises(); + } + + void reserve(size_t numEntries) { + optEntries_.reserve(numEntries); + } + + void setError(std::string message) { + optErrorMessage_ = std::move(message); + } + + void setExpectedCount(size_t expectedCount) { + expectedCount_ = expectedCount; + } + + Future getFutureResult(InputT&& input, size_t sequenceNumber) { + if (sequenceNumber >= optEntries_.size()) { + optEntries_.resize(sequenceNumber + 1); + } + folly::Optional& optEntry = optEntries_[sequenceNumber]; + if (optEntry) { + throw std::logic_error( + "Cannot have multiple inputs with same token sequence number"); + } + optEntry = Entry(std::move(input)); + return optEntry->promise.getFuture(); + } + + private: + void setExceptionResults(std::exception_ptr eptr) { + auto exceptionWrapper = exception_wrapper(eptr); + for (auto& optEntry : optEntries_) { + if (optEntry) { + optEntry->promise.setException(exceptionWrapper); + } + } + } + + template + void setExceptionResults( + const TException& ex, + std::exception_ptr eptr = std::exception_ptr()) { + auto exceptionWrapper = + eptr ? exception_wrapper(eptr, ex) : exception_wrapper(ex); + for (auto& optEntry : optEntries_) { + if (optEntry) { + optEntry->promise.setException(exceptionWrapper); + } + } + } + + void fulfillPromises() { + try { + // If an error message is set, set all promises to exception with message + if (optErrorMessage_) { + auto ex = std::logic_error(*optErrorMessage_); + return setExceptionResults(std::move(ex)); + } + + // Create inputs vector and validate entries count same as expectedCount_ + std::vector inputs; + inputs.reserve(expectedCount_); + bool allEntriesFound = (optEntries_.size() == expectedCount_); + if (allEntriesFound) { + for (auto& optEntry : optEntries_) { + if (!optEntry) { + allEntriesFound = false; + break; + } + inputs.emplace_back(std::move(optEntry->input)); + } + } + if (!allEntriesFound) { + auto ex = std::logic_error( + "One or more input tokens destroyed before calling dispatch"); + return setExceptionResults(std::move(ex)); + } + + // Call the user provided batch dispatch function to get all results + // and make sure that we have the expected number of results returned + auto results = dispatchFunction_(std::move(inputs)); + if (results.size() != expectedCount_) { + auto ex = std::logic_error( + "Unexpected number of results returned from dispatch function"); + return setExceptionResults(std::move(ex)); + } + + // Fulfill the promises with the results from the batch dispatch + for (size_t i = 0; i < expectedCount_; ++i) { + optEntries_[i]->promise.setValue(std::move(results[i])); + } + } catch (const std::exception& ex) { + return setExceptionResults(ex, std::current_exception()); + } catch (...) { + return setExceptionResults(std::current_exception()); + } + } + + struct Entry { + InputT input; + folly::Promise promise; + + Entry(Entry&& other) noexcept + : input(std::move(other.input)), promise(std::move(other.promise)) {} + + Entry& operator=(Entry&& other) noexcept { + input = std::move(other.input); + promise = std::move(other.promise); + return *this; + } + + explicit Entry(InputT&& input) : input(std::move(input)) {} + }; + + size_t expectedCount_; + DispatchFunctionT dispatchFunction_; + std::vector> optEntries_; + folly::Optional optErrorMessage_; +}; + +template +AtomicBatchDispatcher::Token::Token( + std::shared_ptr baton, + size_t sequenceNumber) + : baton_(std::move(baton)), SEQUENCE_NUMBER(sequenceNumber) {} + +template +Future AtomicBatchDispatcher::Token::dispatch( + InputT input) { + auto baton = std::move(baton_); + if (!baton) { + throw std::logic_error( + "Dispatch called more than once on the same Token object"); + } + return baton->getFutureResult(std::move(input), SEQUENCE_NUMBER); +} + +template +AtomicBatchDispatcher::AtomicBatchDispatcher( + DispatchFunctionT&& dispatchFunc) + : numTokensIssued_(0), + baton_(std::make_shared(std::move(dispatchFunc))) {} + +template +AtomicBatchDispatcher::~AtomicBatchDispatcher() { + if (baton_) { + baton_->setError( + "AtomicBatchDispatcher destroyed before commit() was called on it"); + commit(); + } +} + +template +void AtomicBatchDispatcher::reserve(size_t numEntries) { + if (!baton_) { + throw std::logic_error("Cannot call reserve(....) after calling commit()"); + } + baton_->reserve(numEntries); +} + +template +auto AtomicBatchDispatcher::getToken() -> Token { + if (!baton_) { + throw std::logic_error("Cannot issue more tokens after calling commit()"); + } + return Token(baton_, numTokensIssued_++); +} + +template +void AtomicBatchDispatcher::commit() { + auto baton = std::move(baton_); + if (!baton) { + throw std::logic_error( + "Cannot call commit() more than once on the same dispatcher"); + } + baton->setExpectedCount(numTokensIssued_); +} + +template +AtomicBatchDispatcher createAtomicBatchDispatcher( + folly::Function(std::vector&&)> dispatchFunc, + size_t initialCapacity) { + auto abd = AtomicBatchDispatcher(std::move(dispatchFunc)); + if (initialCapacity) { + abd.reserve(initialCapacity); + } + return abd; +} + +} // namespace fibers +} // manespace folly diff --git a/folly/fibers/AtomicBatchDispatcher.h b/folly/fibers/AtomicBatchDispatcher.h new file mode 100644 index 00000000..bfeafa27 --- /dev/null +++ b/folly/fibers/AtomicBatchDispatcher.h @@ -0,0 +1,198 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace folly { +namespace fibers { + +/** + * AtomicBatchDispatcher should be used if you want to process fiber tasks in + * parallel, but require to synchronize them at some point. The canonical + * example is to create a database transaction dispatch round. This API notably + * enforces that all tasks in the batch have reached the synchronization point + * before the user provided dispatch function is called with all the inputs + * provided in one function call. It also provides a guarantee that the inputs + * in the vector of inputs passed to the user provided dispatch function will be + * in the same order as the order in which the token for the job was issued. + * + * Use this when you want all the inputs in the batch to be processed by a + * single function call to the user provided dispatch function. + * The user provided dispatch function takes a vector of InputT as input and + * returns a vector of ResultT. + * To use an AtomicBatchDispatcher, create it by providing a dispatch function: + * TO EITHER the constructor of the AtomicBatchDispatcher class + * (can call reserve method on the dispatcher to reserve space (for number of + * inputs expected)), + * OR the createAtomicBatchDispatcher function in folly::fibers namespace + * (optionally specify an initial capacity (for number of inputs expected)). + * The AtomicBatchDispatcher object created using this call (dispatcher), + * is the only object that can issue tokens (Token objects) that are used to + * add an input to the batch. A single Token is issued when the user calls + * the getToken function on the dispatcher. + * Token objects cannot be copied (can only be moved). User can call the public + * dispatch function on the Token providing a single input value. The dispatch + * function returns a folly::Future value that the user can then wait + * on to obtain a ResultT value. The ResultT value will only be available once + * the dispatch function has been called on all the Tokens in the batch and the + * user has called dispatcher.commit() to indicate no more batched transactions + * are to be added. + * User code pertaining to a task can be run between the point where a token for + * the task has been issued and before calling the dispatch function on the + * token. Since this code can potentially throw, the token issued for a task + * should be moved into this processing code in such a way that if an exception + * is thrown and then handled, the token object for the task is destroyed. + * The batch query dispatcher will wait until all tokens have either been + * destroyed or have had the dispatch function called on them. Leaking an + * issued token will cause the batch dispatch to wait forever to happen. + * + * The AtomicBatchDispatcher object is referred to as the dispatcher below. + * + * POSSIBLE ERRORS: + * 1) The dispatcher is destroyed before calling commit on it, for example + * because the user forgot to call commit OR an exception was thrown + * in user code before the call to commit: + * - The future ResultT has an exception of type std::logic_error set for all + * tokens that were issued by the dispatcher (once all tokens are either + * destroyed or have called dispatch) + * 2) Calling the dispatch function more than once on the same Token object + * (or a moved version of the same Token): + * - Subsequent calls to dispatch (after the first one) will throw an + * std::logic_error exception (the batch itself will not have any errors + * and will get processed) + * 3) One/more of the Tokens issued are destroyed before calling dispatch on + * it/them: + * - The future ResultT has an exception of type std::logic_error set for all + * tokens that were issued by the dispatcher (once all tokens are either + * destroyed or have called dispatch) + * 4) dispatcher.getToken() is called after calling dispatcher.commit() + * - the call to getToken() will throw an std::logic_error exception + * (the batch itself will not have any errors and will get processed). + * 5) All tokens were issued and called dispatch, the user provided batch + * dispatch function is called, but that function throws any exception. + * - The future ResultT has exception for all tokens that were issued by + * the dispatcher. The result will contain the wrapped user exception. + * + * EXAMPLE (There are other ways to achieve this, but this is one example): + * - User creates an AtomicBatchDispatcher on stack + * auto dispatcher = + * folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count); + * - User creates "count" number of token objects by calling "getToken" count + * number of times + * std::vector jobs; + * for (size_t i = 0; i < count; ++i) { + * auto token = dispatcher.getToken(); + * jobs.push_back(Job(std::move(token), singleInputValueToProcess); + * } + * - User calls commit() on the dispatcher to indicate that no new tokens will + * be issued for this batch + * dispatcher.commit(); + * - Use any single threaded executor that will process the jobs + * - On each execution (fiber) preprocess a single "Job" that has been moved in + * from the original vector "jobs". This way if the preprocessing throws + * the Job object being processed is destroyed and so is the token. + * - On each execution (fiber) call the dispatch on the token + * auto future = job.token.dispatch(job.input); + * - Save the future returned so that eventually you can wait on the results + * ResultT result; + * try { + * result = future.value(); + * // future.hasValue() is true + * } catch (...) { + * // future.hasException() is true + * } + * } + * + * NOTES: + * - AtomicBatchDispatcher is not thread safe. + * - Works for executors that run tasks on a single thread. + */ +template +class AtomicBatchDispatcher { + private: + struct DispatchBaton; + friend struct DispatchBaton; + + public: + using DispatchFunctionT = + folly::Function(std::vector&&)>; + + class Token { + public: + explicit Token(std::shared_ptr baton, size_t sequenceNumber); + + Future dispatch(InputT input); + + // Allow moving a Token object + Token(Token&&) = default; + Token& operator=(Token&&) = default; + + private: + // Disallow copying a Token object + Token(const Token&) = delete; + Token& operator=(const Token&) = delete; + + std::shared_ptr baton_; + const size_t SEQUENCE_NUMBER; + }; + + explicit AtomicBatchDispatcher(DispatchFunctionT&& dispatchFunc); + + ~AtomicBatchDispatcher(); + + // numEntries is a *hint* about the number of inputs to expect: + // - It is used purely to reserve space for storing vector of inputs etc., + // so that reeallocation and move copy are reduced / not needed. + // - It is provided purely for performance reasons + void reserve(size_t numEntries); + + Token getToken(); + + void commit(); + + // Allow moving an AtomicBatchDispatcher object + AtomicBatchDispatcher(AtomicBatchDispatcher&&) = default; + AtomicBatchDispatcher& operator=(AtomicBatchDispatcher&&) = default; + + private: + // Disallow copying an AtomicBatchDispatcher object + AtomicBatchDispatcher(const AtomicBatchDispatcher&) = delete; + AtomicBatchDispatcher& operator=(const AtomicBatchDispatcher&) = delete; + + size_t numTokensIssued_; + std::shared_ptr baton_; +}; + +// initialCapacity is a *hint* about the number of inputs to expect: +// - It is used purely to reserve space for storing vector of inputs etc., +// so that reeallocation and move copy are reduced / not needed. +// - It is provided purely for performance reasons +template +AtomicBatchDispatcher createAtomicBatchDispatcher( + folly::Function(std::vector&&)> dispatchFunc, + size_t initialCapacity = 0); + +} // namespace fibers +} // namespace folly + +#include diff --git a/folly/fibers/test/FibersTest.cpp b/folly/fibers/test/FibersTest.cpp index e587ab44..859ccda3 100644 --- a/folly/fibers/test/FibersTest.cpp +++ b/folly/fibers/test/FibersTest.cpp @@ -18,10 +18,12 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -1753,6 +1755,292 @@ TEST(FiberManager, batchDispatchExceptionHandlingTest) { evb.loop(); } +namespace AtomicBatchDispatcherTesting { + +using ValueT = size_t; +using ResultT = std::string; +using DispatchFunctionT = + folly::Function(std::vector&&)>; + +#define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests +#if ENABLE_TRACE_IN_TEST +#define OUTPUT_TRACE std::cerr +#else // ENABLE_TRACE_IN_TEST +struct DevNullPiper { + template + DevNullPiper& operator<<(const T&) { + return *this; + } + + DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) { + return *this; + } +} devNullPiper; +#define OUTPUT_TRACE devNullPiper +#endif // ENABLE_TRACE_IN_TEST + +struct Job { + AtomicBatchDispatcher::Token token; + ValueT input; + + void preprocess(FiberManager& executor, bool die) { + // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing + clock_t msecToDoIO = folly::Random::rand32() % 10; + double start = (1000.0 * clock()) / CLOCKS_PER_SEC; + double endAfter = start + msecToDoIO; + while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) { + executor.yield(); + } + if (die) { + throw std::logic_error("Simulating preprocessing failure"); + } + } + + Job(AtomicBatchDispatcher::Token&& t, ValueT i) + : token(std::move(t)), input(i) {} + + Job(Job&&) = default; + Job& operator=(Job&&) = default; +}; + +ResultT processSingleInput(ValueT&& input) { + return folly::to(std::move(input)); +} + +std::vector userDispatchFunc(std::vector&& inputs) { + size_t expectedCount = inputs.size(); + std::vector results; + results.reserve(expectedCount); + for (size_t i = 0; i < expectedCount; ++i) { + results.emplace_back(processSingleInput(std::move(inputs[i]))); + } + return results; +} + +void createJobs( + AtomicBatchDispatcher& atomicBatchDispatcher, + std::vector& jobs, + size_t count) { + jobs.clear(); + for (size_t i = 0; i < count; ++i) { + jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i)); + } +} + +enum class DispatchProblem { + None, + PreprocessThrows, + DuplicateDispatch, +}; + +void dispatchJobs( + FiberManager& executor, + std::vector& jobs, + std::vector>>& results, + DispatchProblem dispatchProblem = DispatchProblem::None, + size_t problemIndex = size_t(-1)) { + EXPECT_TRUE( + dispatchProblem == DispatchProblem::None || problemIndex < jobs.size()); + results.clear(); + results.resize(jobs.size()); + for (size_t i = 0; i < jobs.size(); ++i) { + executor.add( + [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() { + try { + Job job(std::move(jobs[i])); + + if (dispatchProblem == DispatchProblem::PreprocessThrows) { + if (i == problemIndex) { + EXPECT_THROW(job.preprocess(executor, true), std::logic_error); + return; + } + } + + job.preprocess(executor, false); + OUTPUT_TRACE << "Dispatching job #" << i << std::endl; + results[i] = job.token.dispatch(job.input); + OUTPUT_TRACE << "Result future filled for job #" << i << std::endl; + + if (dispatchProblem == DispatchProblem::DuplicateDispatch) { + if (i == problemIndex) { + EXPECT_THROW(job.token.dispatch(job.input), std::logic_error); + } + } + } catch (...) { + OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl; + } + }); + } +} + +void validateResult( + std::vector>>& results, + size_t i) { + try { + OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value() + << std::endl; + } catch (std::exception& e) { + OUTPUT_TRACE << "Exception : " << e.what() << std::endl; + throw; + } +} + +template +void validateResults( + std::vector>>& results, + size_t expectedNumResults) { + size_t numResultsFilled = 0; + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i]) { + continue; + } + ++numResultsFilled; + EXPECT_THROW(validateResult(results, i), TException); + } + EXPECT_EQ(numResultsFilled, expectedNumResults); +} + +void validateResults( + std::vector>>& results, + size_t expectedNumResults) { + size_t numResultsFilled = 0; + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i]) { + continue; + } + ++numResultsFilled; + EXPECT_NO_THROW(validateResult(results, i)); + ValueT expectedInput = i; + EXPECT_EQ( + results[i]->value(), processSingleInput(std::move(expectedInput))); + } + EXPECT_EQ(numResultsFilled, expectedNumResults); +} + +} // AtomicBatchDispatcherTesting + +#define SET_UP_TEST_FUNC \ + using namespace AtomicBatchDispatcherTesting; \ + folly::EventBase evb; \ + auto& executor = getFiberManager(evb); \ + const size_t COUNT = 11; \ + std::vector jobs; \ + jobs.reserve(COUNT); \ + std::vector>> results; \ + results.reserve(COUNT); \ + DispatchFunctionT dispatchFunc + +TEST(FiberManager, ABD_Test) { + SET_UP_TEST_FUNC; + + // + // Testing AtomicBatchDispatcher with explicit call to commit() + // + dispatchFunc = userDispatchFunc; + auto atomicBatchDispatcher = + createAtomicBatchDispatcher(std::move(dispatchFunc)); + createJobs(atomicBatchDispatcher, jobs, COUNT); + dispatchJobs(executor, jobs, results); + atomicBatchDispatcher.commit(); + evb.loop(); + validateResults(results, COUNT); +} + +TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) { + SET_UP_TEST_FUNC; + + // + // Testing AtomicBatchDispatcher destroyed before calling commit. + // Handles error cases for: + // - User might have forgotten to add the call to commit() in the code + // - An unexpected exception got thrown in user code before commit() is called + // + try { + dispatchFunc = userDispatchFunc; + auto atomicBatchDispatcher = + createAtomicBatchDispatcher(std::move(dispatchFunc)); + createJobs(atomicBatchDispatcher, jobs, COUNT); + dispatchJobs(executor, jobs, results); + throw std::runtime_error( + "Unexpected exception in user code before commit called"); + atomicBatchDispatcher.commit(); + } catch (...) { + /* User code handles the exception and does not exit process */ + } + evb.loop(); + validateResults(results, COUNT); +} + +TEST(FiberManager, ABD_PreprocessingFailureTest) { + SET_UP_TEST_FUNC; + + // + // Testing preprocessing failure on a job throws + // + dispatchFunc = userDispatchFunc; + auto atomicBatchDispatcher = + createAtomicBatchDispatcher(std::move(dispatchFunc)); + createJobs(atomicBatchDispatcher, jobs, COUNT); + dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8); + atomicBatchDispatcher.commit(); + evb.loop(); + validateResults(results, COUNT - 1); +} + +TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) { + SET_UP_TEST_FUNC; + + // + // Testing that calling dispatch more than once on the same token throws + // + dispatchFunc = userDispatchFunc; + auto atomicBatchDispatcher = + createAtomicBatchDispatcher(std::move(dispatchFunc)); + createJobs(atomicBatchDispatcher, jobs, COUNT); + dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4); + atomicBatchDispatcher.commit(); + evb.loop(); +} + +TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) { + SET_UP_TEST_FUNC; + + // + // Testing that exception set on attempt to call getToken after commit called + // + dispatchFunc = userDispatchFunc; + auto atomicBatchDispatcher = + createAtomicBatchDispatcher(std::move(dispatchFunc)); + createJobs(atomicBatchDispatcher, jobs, COUNT); + atomicBatchDispatcher.commit(); + EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error); + dispatchJobs(executor, jobs, results); + EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error); + evb.loop(); + validateResults(results, COUNT); + EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error); +} + +TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) { + SET_UP_TEST_FUNC; + + // + // Testing that exception is set if user provided batch dispatch throws + // + dispatchFunc = [](std::vector&& inputs) -> std::vector { + auto results = userDispatchFunc(std::move(inputs)); + throw std::runtime_error("Unexpected exception in user dispatch function"); + return results; + }; + auto atomicBatchDispatcher = + createAtomicBatchDispatcher(std::move(dispatchFunc)); + createJobs(atomicBatchDispatcher, jobs, COUNT); + dispatchJobs(executor, jobs, results); + atomicBatchDispatcher.commit(); + evb.loop(); + validateResults(results, COUNT); +} + /** * Test that we can properly track fiber stack usage. * -- 2.34.1