fix gbd debugging script
[folly.git] / folly / fibers / AtomicBatchDispatcher-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 namespace folly {
17 namespace fibers {
18
19 template <typename InputT, typename ResultT>
20 struct AtomicBatchDispatcher<InputT, ResultT>::DispatchBaton {
21   DispatchBaton(DispatchFunctionT&& dispatchFunction)
22       : expectedCount_(0), dispatchFunction_(std::move(dispatchFunction)) {}
23
24   ~DispatchBaton() {
25     fulfillPromises();
26   }
27
28   void reserve(size_t numEntries) {
29     optEntries_.reserve(numEntries);
30   }
31
32   void setError(std::string message) {
33     optErrorMessage_ = std::move(message);
34   }
35
36   void setExpectedCount(size_t expectedCount) {
37     expectedCount_ = expectedCount;
38   }
39
40   Future<ResultT> getFutureResult(InputT&& input, size_t sequenceNumber) {
41     if (sequenceNumber >= optEntries_.size()) {
42       optEntries_.resize(sequenceNumber + 1);
43     }
44     folly::Optional<Entry>& optEntry = optEntries_[sequenceNumber];
45     if (optEntry) {
46       throw std::logic_error(
47           "Cannot have multiple inputs with same token sequence number");
48     }
49     optEntry = Entry(std::move(input));
50     return optEntry->promise.getFuture();
51   }
52
53  private:
54   void setExceptionResults(std::exception_ptr eptr) {
55     auto exceptionWrapper = exception_wrapper(eptr);
56     for (auto& optEntry : optEntries_) {
57       if (optEntry) {
58         optEntry->promise.setException(exceptionWrapper);
59       }
60     }
61   }
62
63   template <typename TException>
64   void setExceptionResults(
65       const TException& ex,
66       std::exception_ptr eptr = std::exception_ptr()) {
67     auto exceptionWrapper =
68         eptr ? exception_wrapper(eptr, ex) : exception_wrapper(ex);
69     for (auto& optEntry : optEntries_) {
70       if (optEntry) {
71         optEntry->promise.setException(exceptionWrapper);
72       }
73     }
74   }
75
76   void fulfillPromises() {
77     try {
78       // If an error message is set, set all promises to exception with message
79       if (optErrorMessage_) {
80         auto ex = std::logic_error(*optErrorMessage_);
81         return setExceptionResults(std::move(ex));
82       }
83
84       // Create inputs vector and validate entries count same as expectedCount_
85       std::vector<InputT> inputs;
86       inputs.reserve(expectedCount_);
87       bool allEntriesFound = (optEntries_.size() == expectedCount_);
88       if (allEntriesFound) {
89         for (auto& optEntry : optEntries_) {
90           if (!optEntry) {
91             allEntriesFound = false;
92             break;
93           }
94           inputs.emplace_back(std::move(optEntry->input));
95         }
96       }
97       if (!allEntriesFound) {
98         auto ex = std::logic_error(
99             "One or more input tokens destroyed before calling dispatch");
100         return setExceptionResults(std::move(ex));
101       }
102
103       // Call the user provided batch dispatch function to get all results
104       // and make sure that we have the expected number of results returned
105       auto results = dispatchFunction_(std::move(inputs));
106       if (results.size() != expectedCount_) {
107         auto ex = std::logic_error(
108             "Unexpected number of results returned from dispatch function");
109         return setExceptionResults(std::move(ex));
110       }
111
112       // Fulfill the promises with the results from the batch dispatch
113       for (size_t i = 0; i < expectedCount_; ++i) {
114         optEntries_[i]->promise.setValue(std::move(results[i]));
115       }
116     } catch (const std::exception& ex) {
117       return setExceptionResults(ex, std::current_exception());
118     } catch (...) {
119       return setExceptionResults(std::current_exception());
120     }
121   }
122
123   struct Entry {
124     InputT input;
125     folly::Promise<ResultT> promise;
126
127     Entry(Entry&& other) noexcept
128         : input(std::move(other.input)), promise(std::move(other.promise)) {}
129
130     Entry& operator=(Entry&& other) noexcept {
131       input = std::move(other.input);
132       promise = std::move(other.promise);
133       return *this;
134     }
135
136     explicit Entry(InputT&& input) : input(std::move(input)) {}
137   };
138
139   size_t expectedCount_;
140   DispatchFunctionT dispatchFunction_;
141   std::vector<folly::Optional<Entry>> optEntries_;
142   folly::Optional<std::string> optErrorMessage_;
143 };
144
145 template <typename InputT, typename ResultT>
146 AtomicBatchDispatcher<InputT, ResultT>::Token::Token(
147     std::shared_ptr<DispatchBaton> baton,
148     size_t sequenceNumber)
149     : baton_(std::move(baton)), sequenceNumber_(sequenceNumber) {}
150
151 template <typename InputT, typename ResultT>
152 size_t AtomicBatchDispatcher<InputT, ResultT>::Token::sequenceNumber() const {
153   return sequenceNumber_;
154 }
155
156 template <typename InputT, typename ResultT>
157 Future<ResultT> AtomicBatchDispatcher<InputT, ResultT>::Token::dispatch(
158     InputT input) {
159   auto baton = std::move(baton_);
160   if (!baton) {
161     throw std::logic_error(
162         "Dispatch called more than once on the same Token object");
163   }
164   return baton->getFutureResult(std::move(input), sequenceNumber_);
165 }
166
167 template <typename InputT, typename ResultT>
168 AtomicBatchDispatcher<InputT, ResultT>::AtomicBatchDispatcher(
169     DispatchFunctionT&& dispatchFunc)
170     : numTokensIssued_(0),
171       baton_(std::make_shared<DispatchBaton>(std::move(dispatchFunc))) {}
172
173 template <typename InputT, typename ResultT>
174 AtomicBatchDispatcher<InputT, ResultT>::~AtomicBatchDispatcher() {
175   if (baton_) {
176     baton_->setError(
177         "AtomicBatchDispatcher destroyed before commit() was called on it");
178     commit();
179   }
180 }
181
182 template <typename InputT, typename ResultT>
183 void AtomicBatchDispatcher<InputT, ResultT>::reserve(size_t numEntries) {
184   if (!baton_) {
185     throw std::logic_error("Cannot call reserve(....) after calling commit()");
186   }
187   baton_->reserve(numEntries);
188 }
189
190 template <typename InputT, typename ResultT>
191 auto AtomicBatchDispatcher<InputT, ResultT>::getToken() -> Token {
192   if (!baton_) {
193     throw std::logic_error("Cannot issue more tokens after calling commit()");
194   }
195   return Token(baton_, numTokensIssued_++);
196 }
197
198 template <typename InputT, typename ResultT>
199 void AtomicBatchDispatcher<InputT, ResultT>::commit() {
200   auto baton = std::move(baton_);
201   if (!baton) {
202     throw std::logic_error(
203         "Cannot call commit() more than once on the same dispatcher");
204   }
205   baton->setExpectedCount(numTokensIssued_);
206 }
207
208 template <typename InputT, typename ResultT>
209 AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
210     folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
211     size_t initialCapacity) {
212   auto abd = AtomicBatchDispatcher<InputT, ResultT>(std::move(dispatchFunc));
213   if (initialCapacity) {
214     abd.reserve(initialCapacity);
215   }
216   return abd;
217 }
218
219 } // namespace fibers
220 } // manespace folly