2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 template <typename InputT, typename ResultT>
20 struct AtomicBatchDispatcher<InputT, ResultT>::DispatchBaton {
21 DispatchBaton(DispatchFunctionT&& dispatchFunction)
22 : expectedCount_(0), dispatchFunction_(std::move(dispatchFunction)) {}
28 void reserve(size_t numEntries) {
29 optEntries_.reserve(numEntries);
32 void setExceptionWrapper(folly::exception_wrapper&& exWrapper) {
33 exceptionWrapper_ = std::move(exWrapper);
36 void setExpectedCount(size_t expectedCount) {
37 assert(expectedCount_ == 0 || !"expectedCount_ being set more than once");
38 expectedCount_ = expectedCount;
39 optEntries_.resize(expectedCount_);
42 Future<ResultT> getFutureResult(InputT&& input, size_t sequenceNumber) {
43 if (sequenceNumber >= optEntries_.size()) {
44 optEntries_.resize(sequenceNumber + 1);
46 folly::Optional<Entry>& optEntry = optEntries_[sequenceNumber];
47 assert(!optEntry || !"Multiple inputs have the same token sequence number");
48 optEntry = Entry(std::move(input));
49 return optEntry->promise.getFuture();
53 void setExceptionResults(const folly::exception_wrapper& exceptionWrapper) {
54 for (auto& optEntry : optEntries_) {
56 optEntry->promise.setException(exceptionWrapper);
61 void setExceptionResults(std::exception_ptr eptr) {
62 auto exceptionWrapper = exception_wrapper(eptr);
63 return setExceptionResults(exceptionWrapper);
66 template <typename TException>
67 void setExceptionResults(
69 std::exception_ptr eptr = std::exception_ptr()) {
70 auto exceptionWrapper =
71 eptr ? exception_wrapper(eptr, ex) : exception_wrapper(ex);
72 return setExceptionResults(exceptionWrapper);
75 void fulfillPromises() {
77 // If an error message is set, set all promises to exception with message
78 if (exceptionWrapper_) {
79 return setExceptionResults(exceptionWrapper_);
82 // Validate entries count same as expectedCount_
84 optEntries_.size() == expectedCount_ ||
85 !"Entries vector did not have expected size");
86 std::vector<size_t> vecTokensNotDispatched;
87 for (size_t i = 0; i < expectedCount_; ++i) {
88 if (!optEntries_[i]) {
89 vecTokensNotDispatched.push_back(i);
92 if (!vecTokensNotDispatched.empty()) {
93 return setExceptionResults(ABDTokenNotDispatchedException(
94 detail::createABDTokenNotDispatchedExMsg(vecTokensNotDispatched)));
97 // Create the inputs vector
98 std::vector<InputT> inputs;
99 inputs.reserve(expectedCount_);
100 for (auto& optEntry : optEntries_) {
101 inputs.emplace_back(std::move(optEntry->input));
104 // Call the user provided batch dispatch function to get all results
105 // and make sure that we have the expected number of results returned
106 auto results = dispatchFunction_(std::move(inputs));
107 if (results.size() != expectedCount_) {
108 return setExceptionResults(
109 ABDUsageException(detail::createUnexpectedNumResultsABDUsageExMsg(
110 expectedCount_, results.size())));
113 // Fulfill the promises with the results from the batch dispatch
114 for (size_t i = 0; i < expectedCount_; ++i) {
115 optEntries_[i]->promise.setValue(std::move(results[i]));
117 } catch (const std::exception& ex) {
118 // Set exceptions thrown when executing the user provided dispatch func
119 return setExceptionResults(ex, std::current_exception());
121 // Set exceptions thrown when executing the user provided dispatch func
122 return setExceptionResults(std::current_exception());
128 folly::Promise<ResultT> promise;
130 Entry(Entry&& other) noexcept
131 : input(std::move(other.input)), promise(std::move(other.promise)) {}
133 Entry& operator=(Entry&& other) noexcept {
134 input = std::move(other.input);
135 promise = std::move(other.promise);
139 explicit Entry(InputT&& input) : input(std::move(input)) {}
142 size_t expectedCount_;
143 DispatchFunctionT dispatchFunction_;
144 std::vector<folly::Optional<Entry>> optEntries_;
145 folly::exception_wrapper exceptionWrapper_;
148 template <typename InputT, typename ResultT>
149 AtomicBatchDispatcher<InputT, ResultT>::Token::Token(
150 std::shared_ptr<DispatchBaton> baton,
151 size_t sequenceNumber)
152 : baton_(std::move(baton)), sequenceNumber_(sequenceNumber) {}
154 template <typename InputT, typename ResultT>
155 size_t AtomicBatchDispatcher<InputT, ResultT>::Token::sequenceNumber() const {
156 return sequenceNumber_;
159 template <typename InputT, typename ResultT>
160 Future<ResultT> AtomicBatchDispatcher<InputT, ResultT>::Token::dispatch(
162 auto baton = std::move(baton_);
164 throw ABDUsageException(
165 "Dispatch called more than once on the same Token object");
167 return baton->getFutureResult(std::move(input), sequenceNumber_);
170 template <typename InputT, typename ResultT>
171 AtomicBatchDispatcher<InputT, ResultT>::AtomicBatchDispatcher(
172 DispatchFunctionT&& dispatchFunc)
173 : numTokensIssued_(0),
174 baton_(std::make_shared<DispatchBaton>(std::move(dispatchFunc))) {}
176 template <typename InputT, typename ResultT>
177 AtomicBatchDispatcher<InputT, ResultT>::~AtomicBatchDispatcher() {
179 // Set error here rather than throw because we do not want to throw from
180 // the destructor of AtomicBatchDispatcher
181 baton_->setExceptionWrapper(
182 folly::make_exception_wrapper<ABDCommitNotCalledException>());
187 template <typename InputT, typename ResultT>
188 void AtomicBatchDispatcher<InputT, ResultT>::reserve(size_t numEntries) {
190 throw ABDUsageException("Cannot call reserve(....) after calling commit()");
192 baton_->reserve(numEntries);
195 template <typename InputT, typename ResultT>
196 auto AtomicBatchDispatcher<InputT, ResultT>::getToken() -> Token {
198 throw ABDUsageException("Cannot issue more tokens after calling commit()");
200 return Token(baton_, numTokensIssued_++);
203 template <typename InputT, typename ResultT>
204 void AtomicBatchDispatcher<InputT, ResultT>::commit() {
205 auto baton = std::move(baton_);
207 throw ABDUsageException(
208 "Cannot call commit() more than once on the same dispatcher");
210 baton->setExpectedCount(numTokensIssued_);
213 template <typename InputT, typename ResultT>
214 AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
215 folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
216 size_t initialCapacity) {
217 auto abd = AtomicBatchDispatcher<InputT, ResultT>(std::move(dispatchFunc));
218 if (initialCapacity) {
219 abd.reserve(initialCapacity);
224 } // namespace fibers