Fix copyright lines
[folly.git] / folly / fibers / AtomicBatchDispatcher-inl.h
1 /*
2  * Copyright 2016-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 namespace folly {
17 namespace fibers {
18
19 template <typename InputT, typename ResultT>
20 struct AtomicBatchDispatcher<InputT, ResultT>::DispatchBaton {
21   DispatchBaton(DispatchFunctionT&& dispatchFunction)
22       : expectedCount_(0), dispatchFunction_(std::move(dispatchFunction)) {}
23
24   ~DispatchBaton() {
25     fulfillPromises();
26   }
27
28   void reserve(size_t numEntries) {
29     optEntries_.reserve(numEntries);
30   }
31
32   void setExceptionWrapper(folly::exception_wrapper&& exWrapper) {
33     exceptionWrapper_ = std::move(exWrapper);
34   }
35
36   void setExpectedCount(size_t expectedCount) {
37     assert(expectedCount_ == 0 || !"expectedCount_ being set more than once");
38     expectedCount_ = expectedCount;
39     optEntries_.resize(expectedCount_);
40   }
41
42   Future<ResultT> getFutureResult(InputT&& input, size_t sequenceNumber) {
43     if (sequenceNumber >= optEntries_.size()) {
44       optEntries_.resize(sequenceNumber + 1);
45     }
46     folly::Optional<Entry>& optEntry = optEntries_[sequenceNumber];
47     assert(!optEntry || !"Multiple inputs have the same token sequence number");
48     optEntry = Entry(std::move(input));
49     return optEntry->promise.getFuture();
50   }
51
52  private:
53   void setExceptionResults(const folly::exception_wrapper& exceptionWrapper) {
54     for (auto& optEntry : optEntries_) {
55       if (optEntry) {
56         optEntry->promise.setException(exceptionWrapper);
57       }
58     }
59   }
60
61   void setExceptionResults(std::exception_ptr eptr) {
62     auto exceptionWrapper = exception_wrapper(eptr);
63     return setExceptionResults(exceptionWrapper);
64   }
65
66   template <typename TException>
67   void setExceptionResults(
68       const TException& ex,
69       std::exception_ptr eptr = std::exception_ptr()) {
70     auto exceptionWrapper =
71         eptr ? exception_wrapper(eptr, ex) : exception_wrapper(ex);
72     return setExceptionResults(exceptionWrapper);
73   }
74
75   void fulfillPromises() {
76     try {
77       // If an error message is set, set all promises to exception with message
78       if (exceptionWrapper_) {
79         return setExceptionResults(exceptionWrapper_);
80       }
81
82       // Validate entries count same as expectedCount_
83       assert(
84           optEntries_.size() == expectedCount_ ||
85           !"Entries vector did not have expected size");
86       std::vector<size_t> vecTokensNotDispatched;
87       for (size_t i = 0; i < expectedCount_; ++i) {
88         if (!optEntries_[i]) {
89           vecTokensNotDispatched.push_back(i);
90         }
91       }
92       if (!vecTokensNotDispatched.empty()) {
93         return setExceptionResults(ABDTokenNotDispatchedException(
94             detail::createABDTokenNotDispatchedExMsg(vecTokensNotDispatched)));
95       }
96
97       // Create the inputs vector
98       std::vector<InputT> inputs;
99       inputs.reserve(expectedCount_);
100       for (auto& optEntry : optEntries_) {
101         inputs.emplace_back(std::move(optEntry->input));
102       }
103
104       // Call the user provided batch dispatch function to get all results
105       // and make sure that we have the expected number of results returned
106       auto results = dispatchFunction_(std::move(inputs));
107       if (results.size() != expectedCount_) {
108         return setExceptionResults(
109             ABDUsageException(detail::createUnexpectedNumResultsABDUsageExMsg(
110                 expectedCount_, results.size())));
111       }
112
113       // Fulfill the promises with the results from the batch dispatch
114       for (size_t i = 0; i < expectedCount_; ++i) {
115         optEntries_[i]->promise.setValue(std::move(results[i]));
116       }
117     } catch (const std::exception& ex) {
118       // Set exceptions thrown when executing the user provided dispatch func
119       return setExceptionResults(ex, std::current_exception());
120     } catch (...) {
121       // Set exceptions thrown when executing the user provided dispatch func
122       return setExceptionResults(std::current_exception());
123     }
124   }
125
126   struct Entry {
127     InputT input;
128     folly::Promise<ResultT> promise;
129
130     Entry(Entry&& other) noexcept
131         : input(std::move(other.input)), promise(std::move(other.promise)) {}
132
133     Entry& operator=(Entry&& other) noexcept {
134       input = std::move(other.input);
135       promise = std::move(other.promise);
136       return *this;
137     }
138
139     explicit Entry(InputT&& input) : input(std::move(input)) {}
140   };
141
142   size_t expectedCount_;
143   DispatchFunctionT dispatchFunction_;
144   std::vector<folly::Optional<Entry>> optEntries_;
145   folly::exception_wrapper exceptionWrapper_;
146 };
147
148 template <typename InputT, typename ResultT>
149 AtomicBatchDispatcher<InputT, ResultT>::Token::Token(
150     std::shared_ptr<DispatchBaton> baton,
151     size_t sequenceNumber)
152     : baton_(std::move(baton)), sequenceNumber_(sequenceNumber) {}
153
154 template <typename InputT, typename ResultT>
155 size_t AtomicBatchDispatcher<InputT, ResultT>::Token::sequenceNumber() const {
156   return sequenceNumber_;
157 }
158
159 template <typename InputT, typename ResultT>
160 Future<ResultT> AtomicBatchDispatcher<InputT, ResultT>::Token::dispatch(
161     InputT input) {
162   auto baton = std::move(baton_);
163   if (!baton) {
164     throw ABDUsageException(
165         "Dispatch called more than once on the same Token object");
166   }
167   return baton->getFutureResult(std::move(input), sequenceNumber_);
168 }
169
170 template <typename InputT, typename ResultT>
171 AtomicBatchDispatcher<InputT, ResultT>::AtomicBatchDispatcher(
172     DispatchFunctionT&& dispatchFunc)
173     : numTokensIssued_(0),
174       baton_(std::make_shared<DispatchBaton>(std::move(dispatchFunc))) {}
175
176 template <typename InputT, typename ResultT>
177 AtomicBatchDispatcher<InputT, ResultT>::~AtomicBatchDispatcher() {
178   if (baton_) {
179     // Set error here rather than throw because we do not want to throw from
180     // the destructor of AtomicBatchDispatcher
181     baton_->setExceptionWrapper(
182         folly::make_exception_wrapper<ABDCommitNotCalledException>());
183     commit();
184   }
185 }
186
187 template <typename InputT, typename ResultT>
188 void AtomicBatchDispatcher<InputT, ResultT>::reserve(size_t numEntries) {
189   if (!baton_) {
190     throw ABDUsageException("Cannot call reserve(....) after calling commit()");
191   }
192   baton_->reserve(numEntries);
193 }
194
195 template <typename InputT, typename ResultT>
196 auto AtomicBatchDispatcher<InputT, ResultT>::getToken() -> Token {
197   if (!baton_) {
198     throw ABDUsageException("Cannot issue more tokens after calling commit()");
199   }
200   return Token(baton_, numTokensIssued_++);
201 }
202
203 template <typename InputT, typename ResultT>
204 void AtomicBatchDispatcher<InputT, ResultT>::commit() {
205   auto baton = std::move(baton_);
206   if (!baton) {
207     throw ABDUsageException(
208         "Cannot call commit() more than once on the same dispatcher");
209   }
210   baton->setExpectedCount(numTokensIssued_);
211 }
212
213 template <typename InputT, typename ResultT>
214 AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
215     folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
216     size_t initialCapacity) {
217   auto abd = AtomicBatchDispatcher<InputT, ResultT>(std::move(dispatchFunc));
218   if (initialCapacity) {
219     abd.reserve(initialCapacity);
220   }
221   return abd;
222 }
223
224 } // namespace fibers
225 } // namespace folly