apply clang-tidy modernize-use-override
[folly.git] / folly / fibers / AtomicBatchDispatcher.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/Function.h>
19 #include <folly/Optional.h>
20 #include <folly/fibers/detail/AtomicBatchDispatcher.h>
21 #include <folly/futures/Future.h>
22 #include <folly/futures/Promise.h>
23 #include <memory>
24 #include <stdexcept>
25 #include <string>
26 #include <utility>
27 #include <vector>
28
29 namespace folly {
30 namespace fibers {
31
32 /**
33  * An exception class that gets thrown when the AtomicBatchDispatcher is used
34  * incorrectly. This is indicative of a bug in the user code.
35  * Examples are, multiple dispatch calls on the same token, trying to get more
36  * tokens from the dispatcher after commit has been called, etc.
37  */
38 class ABDUsageException : public std::logic_error {
39   using std::logic_error::logic_error;
40 };
41
42 /**
43  * An exception class that gets set on the promise for dispatched tokens, when
44  * the AtomicBatchDispatcher was destroyed before commit was called on it.
45  */
46 class ABDCommitNotCalledException : public std::runtime_error {
47  public:
48   ABDCommitNotCalledException()
49       : std::runtime_error(
50             "AtomicBatchDispatcher destroyed before commit() was called") {}
51 };
52
53 /**
54  * An exception class that gets set on the promise for dispatched tokens, when
55  * one or more other tokens in the batch were destroyed before dispatch was
56  * called on them.
57  * Only here so that the caller can distinguish the real failure cause
58  * rather than these subsequently thrown exceptions.
59  */
60 class ABDTokenNotDispatchedException : public std::runtime_error {
61   using std::runtime_error::runtime_error;
62 };
63
64 /**
65  * AtomicBatchDispatcher should be used if you want to process fiber tasks in
66  * parallel, but require to synchronize them at some point. The canonical
67  * example is to create a database transaction dispatch round. This API notably
68  * enforces that all tasks in the batch have reached the synchronization point
69  * before the user provided dispatch function is called with all the inputs
70  * provided in one function call. It also provides a guarantee that the inputs
71  * in the vector of inputs passed to the user provided dispatch function will be
72  * in the same order as the order in which the token for the job was issued.
73  *
74  * Use this when you want all the inputs in the batch to be processed by a
75  * single function call to the user provided dispatch function.
76  * The user provided dispatch function takes a vector of InputT as input and
77  * returns a vector of ResultT.
78  * To use an AtomicBatchDispatcher, create it by providing a dispatch function:
79  * TO EITHER the constructor of the AtomicBatchDispatcher class
80  * (can call reserve method on the dispatcher to reserve space (for number of
81  *  inputs expected)),
82  * OR the createAtomicBatchDispatcher function in folly::fibers namespace
83  *    (optionally specify an initial capacity (for number of inputs expected)).
84  * The AtomicBatchDispatcher object created using this call (dispatcher),
85  * is the only object that can issue tokens (Token objects) that are used to
86  * add an input to the batch. A single Token is issued when the user calls
87  * the getToken function on the dispatcher.
88  * Token objects cannot be copied (can only be moved). User can call the public
89  * dispatch function on the Token providing a single input value. The dispatch
90  * function returns a folly::Future<ResultT> value that the user can then wait
91  * on to obtain a ResultT value. The ResultT value will only be available once
92  * the dispatch function has been called on all the Tokens in the batch and the
93  * user has called dispatcher.commit() to indicate no more batched transactions
94  * are to be added.
95  * User code pertaining to a task can be run between the point where a token for
96  * the task has been issued and before calling the dispatch function on the
97  * token. Since this code can potentially throw, the token issued for a task
98  * should be moved into this processing code in such a way that if an exception
99  * is thrown and then handled, the token object for the task is destroyed.
100  * The batch query dispatcher will wait until all tokens have either been
101  * destroyed or have had the dispatch function called on them. Leaking an
102  * issued token will cause the batch dispatch to wait forever to happen.
103  *
104  * The AtomicBatchDispatcher object is referred to as the dispatcher below.
105  *
106  * POSSIBLE ERRORS:
107  * 1) The dispatcher is destroyed before calling commit on it, for example
108  *    because the user forgot to call commit OR an exception was thrown
109  *    in user code before the call to commit:
110  *    - The future ResultT has an exception of type ABDCommitNotCalledException
111  *      set for all tokens that were issued by the dispatcher (once all tokens
112  *      are either destroyed or have called dispatch)
113  * 2) Calling the dispatch function more than once on the same Token object
114  *    (or a moved version of the same Token):
115  *    - Subsequent calls to dispatch (after the first one) will throw an
116  *      ABDUsageException exception (the batch itself will not have any errors
117  *      and will get processed)
118  * 3) One/more of the Tokens issued are destroyed before calling dispatch on
119  *    it/them:
120  *    - The future ResultT has an ABDTokenNotDispatchedException set for all
121  *      tokens that were issued by the dispatcher (once all tokens are either
122  *      destroyed or have called dispatch)
123  * 4) dispatcher.getToken() is called after calling dispatcher.commit()
124  *    - the call to getToken() will throw an ABDUsageException exception
125  *      (the batch itself will not have any errors and will get processed).
126  * 5) All tokens were issued and called dispatch, the user provided batch
127  *    dispatch function is called, but that function throws any exception.
128  *    - The future ResultT has exception for all tokens that were issued by
129  *      the dispatcher. The result will contain the wrapped user exception.
130  *
131  * EXAMPLE (There are other ways to achieve this, but this is one example):
132  * - User creates an AtomicBatchDispatcher on stack
133  *     auto dispatcher =
134  *         folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count);
135  * - User creates "count" number of token objects by calling "getToken" count
136  *   number of times
137  *     std::vector<Job> jobs;
138  *     for (size_t i = 0; i < count; ++i) {
139  *       auto token = dispatcher.getToken();
140  *       jobs.push_back(Job(std::move(token), singleInputValueToProcess);
141  *     }
142  * - User calls commit() on the dispatcher to indicate that no new tokens will
143  *   be issued for this batch
144  *     dispatcher.commit();
145  * - Use any single threaded executor that will process the jobs
146  * - On each execution (fiber) preprocess a single "Job" that has been moved in
147  *   from the original vector "jobs". This way if the preprocessing throws
148  *   the Job object being processed is destroyed and so is the token.
149  * - On each execution (fiber) call the dispatch on the token
150  *     auto future = job.token.dispatch(job.input);
151  * - Save the future returned so that eventually you can wait on the results
152  *     ResultT result;
153  *     try {
154  *       result = future.value();
155  *       // future.hasValue() is true
156  *     } catch (...) {
157  *       // future.hasException() is true
158  *       <DO WHATEVER YOU WANT IN CASE OF ERROR> }
159  *     }
160  *
161  * NOTES:
162  * - AtomicBatchDispatcher is not thread safe.
163  * - Works for executors that run tasks on a single thread.
164  */
165 template <typename InputT, typename ResultT>
166 class AtomicBatchDispatcher {
167  private:
168   struct DispatchBaton;
169   friend struct DispatchBaton;
170
171  public:
172   using DispatchFunctionT =
173       folly::Function<std::vector<ResultT>(std::vector<InputT>&&)>;
174
175   class Token {
176    public:
177     explicit Token(std::shared_ptr<DispatchBaton> baton, size_t sequenceNumber);
178
179     Future<ResultT> dispatch(InputT input);
180
181     // Allow moving a Token object
182     Token(Token&&) = default;
183     Token& operator=(Token&&) = default;
184
185     size_t sequenceNumber() const;
186
187    private:
188     // Disallow copying a Token object
189     Token(const Token&) = delete;
190     Token& operator=(const Token&) = delete;
191
192     std::shared_ptr<DispatchBaton> baton_;
193     size_t sequenceNumber_;
194   };
195
196   explicit AtomicBatchDispatcher(DispatchFunctionT&& dispatchFunc);
197
198   ~AtomicBatchDispatcher();
199
200   // numEntries is a *hint* about the number of inputs to expect:
201   // - It is used purely to reserve space for storing vector of inputs etc.,
202   //   so that reeallocation and move copy are reduced / not needed.
203   // - It is provided purely for performance reasons
204   void reserve(size_t numEntries);
205
206   Token getToken();
207
208   void commit();
209
210   // Allow moving an AtomicBatchDispatcher object
211   AtomicBatchDispatcher(AtomicBatchDispatcher&&) = default;
212   AtomicBatchDispatcher& operator=(AtomicBatchDispatcher&&) = default;
213
214  private:
215   // Disallow copying an AtomicBatchDispatcher object
216   AtomicBatchDispatcher(const AtomicBatchDispatcher&) = delete;
217   AtomicBatchDispatcher& operator=(const AtomicBatchDispatcher&) = delete;
218
219   size_t numTokensIssued_;
220   std::shared_ptr<DispatchBaton> baton_;
221 };
222
223 // initialCapacity is a *hint* about the number of inputs to expect:
224 // - It is used purely to reserve space for storing vector of inputs etc.,
225 //   so that reeallocation and move copy are reduced / not needed.
226 // - It is provided purely for performance reasons
227 template <typename InputT, typename ResultT>
228 AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
229     folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
230     size_t initialCapacity = 0);
231
232 } // namespace fibers
233 } // namespace folly
234
235 #include <folly/fibers/AtomicBatchDispatcher-inl.h>